Whamcloud - gitweb
LU-2444 build: fix 'error handling' issues
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
566                                    "but the max index is %d.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 /* Header and tail are added back to lrh_len in
648                    llog_lvfs_write_rec */
649                 rec->lrh_len = cfg_len;
650                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
651                                 rec->lrh_index);
652                 if (!rc)
653                          mml->mml_modified++;
654         }
655
656         RETURN(rc);
657 }
658
659 /**
660  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
661  * Return code:
662  * 0 - modified successfully,
663  * 1 - no modification was done
664  * negative - error
665  */
666 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
667                       struct fs_db *fsdb, struct mgs_target_info *mti,
668                       char *logname, char *devname, char *comment, int flags)
669 {
670         struct llog_handle *loghandle;
671         struct llog_ctxt *ctxt;
672         struct mgs_modify_lookup *mml;
673         int rc;
674
675         ENTRY;
676
677         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
678         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
679                flags);
680
681         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
682         LASSERT(ctxt != NULL);
683         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
684         if (rc < 0) {
685                 if (rc == -ENOENT)
686                         rc = 0;
687                 GOTO(out_pop, rc);
688         }
689
690         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
691         if (rc)
692                 GOTO(out_close, rc);
693
694         if (llog_get_size(loghandle) <= 1)
695                 GOTO(out_close, rc = 0);
696
697         OBD_ALLOC_PTR(mml);
698         if (!mml)
699                 GOTO(out_close, rc = -ENOMEM);
700         strcpy(mml->mml_marker.cm_comment, comment);
701         strcpy(mml->mml_marker.cm_tgtname, devname);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710         OBD_FREE_PTR(mml);
711
712 out_close:
713         llog_close(env, loghandle);
714 out_pop:
715         if (rc < 0)
716                 CERROR("%s: modify %s/%s failed: rc = %d\n",
717                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /** This structure is passed to mgs_replace_handler */
723 struct mgs_replace_uuid_lookup {
724         /* Nids are replaced for this target device */
725         struct mgs_target_info target;
726         /* Temporary modified llog */
727         struct llog_handle *temp_llh;
728         /* Flag is set if in target block*/
729         int in_target_device;
730         /* Nids already added. Just skip (multiple nids) */
731         int device_nids_added;
732         /* Flag is set if this block should not be copied */
733         int skip_it;
734 };
735
736 /**
737  * Check: a) if block should be skipped
738  * b) is it target block
739  *
740  * \param[in] lcfg
741  * \param[in] mrul
742  *
743  * \retval 0 should not to be skipped
744  * \retval 1 should to be skipped
745  */
746 static int check_markers(struct lustre_cfg *lcfg,
747                          struct mgs_replace_uuid_lookup *mrul)
748 {
749          struct cfg_marker *marker;
750
751         /* Track markers. Find given device */
752         if (lcfg->lcfg_command == LCFG_MARKER) {
753                 marker = lustre_cfg_buf(lcfg, 1);
754                 /* Clean llog from records marked as CM_EXCLUDE.
755                    CM_SKIP records are used for "active" command
756                    and can be restored if needed */
757                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
758                     (CM_EXCLUDE | CM_START)) {
759                         mrul->skip_it = 1;
760                         return 1;
761                 }
762
763                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
764                     (CM_EXCLUDE | CM_END)) {
765                         mrul->skip_it = 0;
766                         return 1;
767                 }
768
769                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
770                         LASSERT(!(marker->cm_flags & CM_START) ||
771                                 !(marker->cm_flags & CM_END));
772                         if (marker->cm_flags & CM_START) {
773                                 mrul->in_target_device = 1;
774                                 mrul->device_nids_added = 0;
775                         } else if (marker->cm_flags & CM_END)
776                                 mrul->in_target_device = 0;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
784                        struct lustre_cfg *lcfg)
785 {
786         struct llog_rec_hdr      rec;
787         int                      buflen, rc;
788
789         if (!lcfg || !llh)
790                 return -ENOMEM;
791
792         LASSERT(llh->lgh_ctxt);
793
794         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
795                                 lcfg->lcfg_buflens);
796         rec.lrh_len = llog_data_len(buflen);
797         rec.lrh_type = OBD_CFG_REC;
798
799         /* idx = -1 means append */
800         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
801         if (rc)
802                 CERROR("failed %d\n", rc);
803         return rc;
804 }
805
806 static int record_base(const struct lu_env *env, struct llog_handle *llh,
807                      char *cfgname, lnet_nid_t nid, int cmd,
808                      char *s1, char *s2, char *s3, char *s4)
809 {
810         struct mgs_thread_info *mgi = mgs_env_info(env);
811         struct lustre_cfg     *lcfg;
812         int rc;
813
814         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
815                cmd, s1, s2, s3, s4);
816
817         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
818         if (s1)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
820         if (s2)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
822         if (s3)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
824         if (s4)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
826
827         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         lcfg->lcfg_nid = nid;
831
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835
836         if (rc) {
837                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
838                        cmd, s1, s2, s3, s4);
839         }
840         return rc;
841 }
842
843 static inline int record_add_uuid(const struct lu_env *env,
844                                   struct llog_handle *llh,
845                                   uint64_t nid, char *uuid)
846 {
847         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
848 }
849
850 static inline int record_add_conn(const struct lu_env *env,
851                                   struct llog_handle *llh,
852                                   char *devname, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
855 }
856
857 static inline int record_attach(const struct lu_env *env,
858                                 struct llog_handle *llh, char *devname,
859                                 char *type, char *uuid)
860 {
861         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
862 }
863
864 static inline int record_setup(const struct lu_env *env,
865                                struct llog_handle *llh, char *devname,
866                                char *s1, char *s2, char *s3, char *s4)
867 {
868         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
869 }
870
871 /**
872  * \retval <0 record processing error
873  * \retval n record is processed. No need copy original one.
874  * \retval 0 record is not processed.
875  */
876 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
877                            struct mgs_replace_uuid_lookup *mrul)
878 {
879         int nids_added = 0;
880         lnet_nid_t nid;
881         char *ptr;
882         int rc;
883
884         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
885                 /* LCFG_ADD_UUID command found. Let's skip original command
886                    and add passed nids */
887                 ptr = mrul->target.mti_params;
888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
889                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
890                                "device %s\n", libcfs_nid2str(nid),
891                                 mrul->target.mti_params,
892                                 mrul->target.mti_svname);
893                         rc = record_add_uuid(env,
894                                              mrul->temp_llh, nid,
895                                              mrul->target.mti_params);
896                         if (!rc)
897                                 nids_added++;
898                 }
899
900                 if (nids_added == 0) {
901                         CERROR("No new nids were added, nid %s with uuid %s, "
902                                "device %s\n", libcfs_nid2str(nid),
903                                mrul->target.mti_params,
904                                mrul->target.mti_svname);
905                         RETURN(-ENXIO);
906                 } else {
907                         mrul->device_nids_added = 1;
908                 }
909
910                 return nids_added;
911         }
912
913         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
914                 /* LCFG_SETUP command found. UUID should be changed */
915                 rc = record_setup(env,
916                                   mrul->temp_llh,
917                                   /* devname the same */
918                                   lustre_cfg_string(lcfg, 0),
919                                   /* s1 is not changed */
920                                   lustre_cfg_string(lcfg, 1),
921                                   /* new uuid should be
922                                   the full nidlist */
923                                   mrul->target.mti_params,
924                                   /* s3 is not changed */
925                                   lustre_cfg_string(lcfg, 3),
926                                   /* s4 is not changed */
927                                   lustre_cfg_string(lcfg, 4));
928                 return rc ? rc : 1;
929         }
930
931         /* Another commands in target device block */
932         return 0;
933 }
934
935 /**
936  * Handler that called for every record in llog.
937  * Records are processed in order they placed in llog.
938  *
939  * \param[in] llh       log to be processed
940  * \param[in] rec       current record
941  * \param[in] data      mgs_replace_uuid_lookup structure
942  *
943  * \retval 0    success
944  */
945 static int mgs_replace_handler(const struct lu_env *env,
946                                struct llog_handle *llh,
947                                struct llog_rec_hdr *rec,
948                                void *data)
949 {
950         struct llog_rec_hdr local_rec = *rec;
951         struct mgs_replace_uuid_lookup *mrul;
952         struct lustre_cfg *lcfg = REC_DATA(rec);
953         int cfg_len = REC_DATA_LEN(rec);
954         int rc;
955         ENTRY;
956
957         mrul = (struct mgs_replace_uuid_lookup *)data;
958
959         if (rec->lrh_type != OBD_CFG_REC) {
960                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
961                        rec->lrh_type, lcfg->lcfg_command,
962                        lustre_cfg_string(lcfg, 0),
963                        lustre_cfg_string(lcfg, 1));
964                 RETURN(-EINVAL);
965         }
966
967         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
968         if (rc) {
969                 /* Do not copy any invalidated records */
970                 GOTO(skip_out, rc = 0);
971         }
972
973         rc = check_markers(lcfg, mrul);
974         if (rc || mrul->skip_it)
975                 GOTO(skip_out, rc = 0);
976
977         /* Write to new log all commands outside target device block */
978         if (!mrul->in_target_device)
979                 GOTO(copy_out, rc = 0);
980
981         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
982            (failover nids) for this target, assuming that if then
983            primary is changing then so is the failover */
984         if (mrul->device_nids_added &&
985             (lcfg->lcfg_command == LCFG_ADD_UUID ||
986              lcfg->lcfg_command == LCFG_ADD_CONN))
987                 GOTO(skip_out, rc = 0);
988
989         rc = process_command(env, lcfg, mrul);
990         if (rc < 0)
991                 RETURN(rc);
992
993         if (rc)
994                 RETURN(0);
995 copy_out:
996         /* Record is placed in temporary llog as is */
997         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
998         rc = llog_write(env, mrul->temp_llh, &local_rec, NULL, 0,
999                         (void *)lcfg, -1);
1000
1001         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1002                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1003                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1004         RETURN(rc);
1005
1006 skip_out:
1007         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1008                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1009                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1010         RETURN(rc);
1011 }
1012
1013 static int mgs_backup_llog(const struct lu_env *env,
1014                            struct obd_device *mgs,
1015                            char *fsname, char *backup)
1016 {
1017         struct obd_uuid *uuid;
1018         struct llog_handle *orig_llh, *bak_llh;
1019         struct llog_ctxt *lctxt;
1020         int rc, rc2;
1021         ENTRY;
1022
1023         lctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         if (!lctxt) {
1025                 CERROR("%s: missing llog context\n", mgs->obd_name);
1026                 GOTO(out, rc = -EINVAL);
1027         }
1028
1029         /* Make sure there's no old backup log */
1030         rc = llog_erase(env, lctxt, NULL, backup);
1031         if (rc < 0 && rc != -ENOENT)
1032                 GOTO(out_put, rc);
1033
1034         /* open backup log */
1035         rc = llog_open_create(env, lctxt, &bak_llh, NULL, backup);
1036         if (rc) {
1037                 CERROR("%s: backup logfile open %s: rc = %d\n",
1038                        mgs->obd_name, backup, rc);
1039                 GOTO(out_put, rc);
1040         }
1041
1042         /* set the log header uuid */
1043         OBD_ALLOC_PTR(uuid);
1044         if (uuid == NULL)
1045                 GOTO(out_put, rc = -ENOMEM);
1046         obd_str2uuid(uuid, backup);
1047         rc = llog_init_handle(env, bak_llh, LLOG_F_IS_PLAIN, uuid);
1048         OBD_FREE_PTR(uuid);
1049         if (rc)
1050                 GOTO(out_close1, rc);
1051
1052         /* open original log */
1053         rc = llog_open(env, lctxt, &orig_llh, NULL, fsname,
1054                        LLOG_OPEN_EXISTS);
1055         if (rc < 0) {
1056                 if (rc == -ENOENT)
1057                         rc = 0;
1058                 GOTO(out_close1, rc);
1059         }
1060
1061         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close2, rc);
1064
1065         /* Copy remote log */
1066         rc = llog_process(env, orig_llh, llog_copy_handler,
1067                           (void *)bak_llh, NULL);
1068
1069 out_close2:
1070         rc2 = llog_close(env, orig_llh);
1071         if (!rc)
1072                 rc = rc2;
1073 out_close1:
1074         rc2 = llog_close(env, bak_llh);
1075         if (!rc)
1076                 rc = rc2;
1077 out_put:
1078         if (lctxt)
1079                 llog_ctxt_put(lctxt);
1080 out:
1081         if (rc)
1082                 CERROR("%s: Failed to backup log %s: rc = %d\n",
1083                        mgs->obd_name, fsname, rc);
1084         RETURN(rc);
1085 }
1086
1087 static int mgs_log_is_empty(const struct lu_env *env, struct mgs_device *mgs,
1088                             char *name);
1089
1090 static int mgs_replace_nids_log(const struct lu_env *env,
1091                                 struct obd_device *mgs, struct fs_db *fsdb,
1092                                 char *logname, char *devname, char *nids)
1093 {
1094         struct llog_handle *orig_llh, *backup_llh;
1095         struct llog_ctxt *ctxt;
1096         struct mgs_replace_uuid_lookup *mrul;
1097         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1098         char *backup;
1099         int rc, rc2;
1100         ENTRY;
1101
1102         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1103
1104         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1105         LASSERT(ctxt != NULL);
1106
1107         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1108                 /* Log is empty. Nothing to replace */
1109                 GOTO(out_put, rc = 0);
1110         }
1111
1112         OBD_ALLOC(backup, strlen(logname) + 5);
1113         if (backup == NULL)
1114                 GOTO(out_put, rc = -ENOMEM);
1115
1116         sprintf(backup, "%s.bak", logname);
1117
1118         rc = mgs_backup_llog(env, mgs, logname, backup);
1119         if (rc < 0) {
1120                 CERROR("%s: can't make backup for %s: rc = %d\n",
1121                        mgs->obd_name, logname, rc);
1122                 GOTO(out_free,rc);
1123         }
1124
1125         /* Now erase original log file. Connections are not allowed.
1126            Backup is already saved */
1127         rc = llog_erase(env, ctxt, NULL, logname);
1128         if (rc < 0 && rc != -ENOENT)
1129                 GOTO(out_free, rc);
1130
1131         /* open local log */
1132         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1133         if (rc)
1134                 GOTO(out_restore, rc);
1135
1136         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1137         if (rc)
1138                 GOTO(out_closel, rc);
1139
1140         /* open backup llog */
1141         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1142                        LLOG_OPEN_EXISTS);
1143         if (rc)
1144                 GOTO(out_closel, rc);
1145
1146         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1147         if (rc)
1148                 GOTO(out_close, rc);
1149
1150         if (llog_get_size(backup_llh) <= 1)
1151                 GOTO(out_close, rc = 0);
1152
1153         OBD_ALLOC_PTR(mrul);
1154         if (!mrul)
1155                 GOTO(out_close, rc = -ENOMEM);
1156         /* devname is only needed information to replace UUID records */
1157         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1158         /* parse nids later */
1159         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1160         /* Copy records to this temporary llog */
1161         mrul->temp_llh = orig_llh;
1162
1163         rc = llog_process(env, backup_llh, mgs_replace_handler,
1164                           (void *)mrul, NULL);
1165         OBD_FREE_PTR(mrul);
1166 out_close:
1167         rc2 = llog_close(NULL, backup_llh);
1168         if (!rc)
1169                 rc = rc2;
1170 out_closel:
1171         rc2 = llog_close(NULL, orig_llh);
1172         if (!rc)
1173                 rc = rc2;
1174
1175 out_restore:
1176         if (rc) {
1177                 CERROR("%s: llog should be restored: rc = %d\n",
1178                        mgs->obd_name, rc);
1179                 rc2 = mgs_backup_llog(env, mgs, backup, logname);
1180                 if (rc2 < 0)
1181                         CERROR("%s: can't restore backup %s: rc = %d\n",
1182                                mgs->obd_name, logname, rc2);
1183         }
1184
1185 out_free:
1186         OBD_FREE(backup, strlen(backup) + 5);
1187
1188 out_put:
1189         llog_ctxt_put(ctxt);
1190
1191         if (rc)
1192                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1193                        mgs->obd_name, logname, rc);
1194
1195         RETURN(rc);
1196 }
1197
1198 /**
1199  * Parse device name and get file system name and/or device index
1200  *
1201  * \param[in]   devname device name (ex. lustre-MDT0000)
1202  * \param[out]  fsname  file system name(optional)
1203  * \param[out]  index   device index(optional)
1204  *
1205  * \retval 0    success
1206  */
1207 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1208 {
1209         char *ptr;
1210         ENTRY;
1211
1212         /* Extract fsname */
1213         ptr = strrchr(devname, '-');
1214
1215         if (fsname) {
1216                 if (!ptr) {
1217                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1218                                devname);
1219                         RETURN(-EINVAL);
1220                 }
1221                 memset(fsname, 0, MTI_NAME_MAXLEN);
1222                 strncpy(fsname, devname, ptr - devname);
1223                 fsname[MTI_NAME_MAXLEN - 1] = 0;
1224         }
1225
1226         if (index) {
1227                 if (server_name2index(ptr, index, NULL) < 0) {
1228                         CDEBUG(D_MGS, "Device name with wrong index\n");
1229                         RETURN(-EINVAL);
1230                 }
1231         }
1232
1233         RETURN(0);
1234 }
1235
1236 static int only_mgs_is_running(struct obd_device *mgs_obd)
1237 {
1238         /* TDB: Is global variable with devices count exists? */
1239         int num_devices = get_devices_count();
1240         /* osd, MGS and MGC + self_export
1241            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1242         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1243 }
1244
1245 static int name_create_mdt(char **logname, char *fsname, int i)
1246 {
1247         char mdt_index[9];
1248
1249         sprintf(mdt_index, "-MDT%04x", i);
1250         return name_create(logname, fsname, mdt_index);
1251 }
1252
1253 /**
1254  * Replace nids for \a device to \a nids values
1255  *
1256  * \param obd           MGS obd device
1257  * \param devname       nids need to be replaced for this device
1258  * (ex. lustre-OST0000)
1259  * \param nids          nids list (ex. nid1,nid2,nid3)
1260  *
1261  * \retval 0    success
1262  */
1263 int mgs_replace_nids(const struct lu_env *env,
1264                      struct mgs_device *mgs,
1265                      char *devname, char *nids)
1266 {
1267         /* Assume fsname is part of device name */
1268         char fsname[MTI_NAME_MAXLEN];
1269         int rc;
1270         __u32 index;
1271         char *logname;
1272         struct fs_db *fsdb;
1273         unsigned int i;
1274         int conn_state;
1275         struct obd_device *mgs_obd = mgs->mgs_obd;
1276         ENTRY;
1277
1278         /* We can only change NIDs if no other nodes are connected */
1279         spin_lock(&mgs_obd->obd_dev_lock);
1280         conn_state = mgs_obd->obd_no_conn;
1281         mgs_obd->obd_no_conn = 1;
1282         spin_unlock(&mgs_obd->obd_dev_lock);
1283
1284         /* We can not change nids if not only MGS is started */
1285         if (!only_mgs_is_running(mgs_obd)) {
1286                 CERROR("Only MGS is allowed to be started\n");
1287                 GOTO(out, rc = -EINPROGRESS);
1288         }
1289
1290         /* Get fsname and index*/
1291         rc = mgs_parse_devname(devname, fsname, &index);
1292         if (rc)
1293                 GOTO(out, rc);
1294
1295         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1296         if (rc) {
1297                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1298                 GOTO(out, rc);
1299         }
1300
1301         /* Process client llogs */
1302         name_create(&logname, fsname, "-client");
1303         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1304         name_destroy(&logname);
1305         if (rc) {
1306                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1307                        fsname, devname, rc);
1308                 GOTO(out, rc);
1309         }
1310
1311         /* Process MDT llogs */
1312         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1313                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1314                         continue;
1315                 name_create_mdt(&logname, fsname, i);
1316                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1317                 name_destroy(&logname);
1318                 if (rc)
1319                         GOTO(out, rc);
1320         }
1321
1322 out:
1323         spin_lock(&mgs_obd->obd_dev_lock);
1324         mgs_obd->obd_no_conn = conn_state;
1325         spin_unlock(&mgs_obd->obd_dev_lock);
1326
1327         RETURN(rc);
1328 }
1329
1330 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1331                             char *devname, struct lov_desc *desc)
1332 {
1333         struct mgs_thread_info *mgi = mgs_env_info(env);
1334         struct lustre_cfg *lcfg;
1335         int rc;
1336
1337         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1338         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1339         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1340         if (!lcfg)
1341                 return -ENOMEM;
1342         rc = record_lcfg(env, llh, lcfg);
1343
1344         lustre_cfg_free(lcfg);
1345         return rc;
1346 }
1347
1348 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1349                             char *devname, struct lmv_desc *desc)
1350 {
1351         struct mgs_thread_info *mgi = mgs_env_info(env);
1352         struct lustre_cfg *lcfg;
1353         int rc;
1354
1355         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1356         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1357         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1358
1359         rc = record_lcfg(env, llh, lcfg);
1360
1361         lustre_cfg_free(lcfg);
1362         return rc;
1363 }
1364
1365 static inline int record_mdc_add(const struct lu_env *env,
1366                                  struct llog_handle *llh,
1367                                  char *logname, char *mdcuuid,
1368                                  char *mdtuuid, char *index,
1369                                  char *gen)
1370 {
1371         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1372                            mdtuuid,index,gen,mdcuuid);
1373 }
1374
1375 static inline int record_lov_add(const struct lu_env *env,
1376                                  struct llog_handle *llh,
1377                                  char *lov_name, char *ost_uuid,
1378                                  char *index, char *gen)
1379 {
1380         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1381                            ost_uuid, index, gen, 0);
1382 }
1383
1384 static inline int record_mount_opt(const struct lu_env *env,
1385                                    struct llog_handle *llh,
1386                                    char *profile, char *lov_name,
1387                                    char *mdc_name)
1388 {
1389         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1390                            profile,lov_name,mdc_name,0);
1391 }
1392
1393 static int record_marker(const struct lu_env *env,
1394                          struct llog_handle *llh,
1395                          struct fs_db *fsdb, __u32 flags,
1396                          char *tgtname, char *comment)
1397 {
1398         struct mgs_thread_info *mgi = mgs_env_info(env);
1399         struct lustre_cfg *lcfg;
1400         int rc;
1401
1402         if (flags & CM_START)
1403                 fsdb->fsdb_gen++;
1404         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1405         mgi->mgi_marker.cm_flags = flags;
1406         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1407         strncpy(mgi->mgi_marker.cm_tgtname, tgtname,
1408                 sizeof(mgi->mgi_marker.cm_tgtname));
1409         strncpy(mgi->mgi_marker.cm_comment, comment,
1410                 sizeof(mgi->mgi_marker.cm_comment));
1411         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1412         mgi->mgi_marker.cm_canceltime = 0;
1413         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1414         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1415                             sizeof(mgi->mgi_marker));
1416         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1417         if (!lcfg)
1418                 return -ENOMEM;
1419         rc = record_lcfg(env, llh, lcfg);
1420
1421         lustre_cfg_free(lcfg);
1422         return rc;
1423 }
1424
1425 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1426                             struct llog_handle **llh, char *name)
1427 {
1428         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1429         struct llog_ctxt        *ctxt;
1430         int                      rc = 0;
1431
1432         if (*llh)
1433                 GOTO(out, rc = -EBUSY);
1434
1435         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1436         if (!ctxt)
1437                 GOTO(out, rc = -ENODEV);
1438         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1439
1440         rc = llog_open_create(env, ctxt, llh, NULL, name);
1441         if (rc)
1442                 GOTO(out_ctxt, rc);
1443         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1444         if (rc)
1445                 llog_close(env, *llh);
1446 out_ctxt:
1447         llog_ctxt_put(ctxt);
1448 out:
1449         if (rc) {
1450                 CERROR("%s: can't start log %s: rc = %d\n",
1451                        mgs->mgs_obd->obd_name, name, rc);
1452                 *llh = NULL;
1453         }
1454         RETURN(rc);
1455 }
1456
1457 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1458 {
1459         int rc;
1460
1461         rc = llog_close(env, *llh);
1462         *llh = NULL;
1463
1464         return rc;
1465 }
1466
1467 static int mgs_log_is_empty(const struct lu_env *env,
1468                             struct mgs_device *mgs, char *name)
1469 {
1470         struct llog_handle *llh;
1471         struct llog_ctxt *ctxt;
1472         int rc = 0;
1473
1474         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1475         LASSERT(ctxt != NULL);
1476         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1477         if (rc < 0) {
1478                 if (rc == -ENOENT)
1479                         rc = 0;
1480                 GOTO(out_ctxt, rc);
1481         }
1482
1483         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1484         if (rc)
1485                 GOTO(out_close, rc);
1486         rc = llog_get_size(llh);
1487
1488 out_close:
1489         llog_close(env, llh);
1490 out_ctxt:
1491         llog_ctxt_put(ctxt);
1492         /* header is record 1 */
1493         return (rc <= 1);
1494 }
1495
1496 /******************** config "macros" *********************/
1497
1498 /* write an lcfg directly into a log (with markers) */
1499 static int mgs_write_log_direct(const struct lu_env *env,
1500                                 struct mgs_device *mgs, struct fs_db *fsdb,
1501                                 char *logname, struct lustre_cfg *lcfg,
1502                                 char *devname, char *comment)
1503 {
1504         struct llog_handle *llh = NULL;
1505         int rc;
1506         ENTRY;
1507
1508         if (!lcfg)
1509                 RETURN(-ENOMEM);
1510
1511         rc = record_start_log(env, mgs, &llh, logname);
1512         if (rc)
1513                 RETURN(rc);
1514
1515         /* FIXME These should be a single journal transaction */
1516         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1517         if (rc)
1518                 GOTO(out_end, rc);
1519         rc = record_lcfg(env, llh, lcfg);
1520         if (rc)
1521                 GOTO(out_end, rc);
1522         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1523         if (rc)
1524                 GOTO(out_end, rc);
1525 out_end:
1526         record_end_log(env, &llh);
1527         RETURN(rc);
1528 }
1529
1530 /* write the lcfg in all logs for the given fs */
1531 int mgs_write_log_direct_all(const struct lu_env *env,
1532                              struct mgs_device *mgs,
1533                              struct fs_db *fsdb,
1534                              struct mgs_target_info *mti,
1535                              struct lustre_cfg *lcfg,
1536                              char *devname, char *comment,
1537                              int server_only)
1538 {
1539         cfs_list_t list;
1540         struct mgs_direntry *dirent, *n;
1541         char *fsname = mti->mti_fsname;
1542         char *logname;
1543         int rc = 0, len = strlen(fsname);
1544         ENTRY;
1545
1546         /* We need to set params for any future logs
1547            as well. FIXME Append this file to every new log.
1548            Actually, we should store as params (text), not llogs.  Or
1549            in a database. */
1550         rc = name_create(&logname, fsname, "-params");
1551         if (rc)
1552                 RETURN(rc);
1553         if (mgs_log_is_empty(env, mgs, logname)) {
1554                 struct llog_handle *llh = NULL;
1555                 rc = record_start_log(env, mgs, &llh, logname);
1556                 record_end_log(env, &llh);
1557         }
1558         name_destroy(&logname);
1559         if (rc)
1560                 RETURN(rc);
1561
1562         /* Find all the logs in the CONFIGS directory */
1563         rc = class_dentry_readdir(env, mgs, &list);
1564         if (rc)
1565                 RETURN(rc);
1566
1567         /* Could use fsdb index maps instead of directory listing */
1568         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1569                 cfs_list_del(&dirent->list);
1570                 /* don't write to sptlrpc rule log */
1571                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1572                         goto next;
1573
1574                 /* caller wants write server logs only */
1575                 if (server_only && strstr(dirent->name, "-client") != NULL)
1576                         goto next;
1577
1578                 if (strncmp(fsname, dirent->name, len) == 0) {
1579                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1580                         /* Erase any old settings of this same parameter */
1581                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1582                                         devname, comment, CM_SKIP);
1583                         if (rc < 0)
1584                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1585                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1586                         /* Write the new one */
1587                         if (lcfg) {
1588                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1589                                                           dirent->name,
1590                                                           lcfg, devname,
1591                                                           comment);
1592                                 if (rc)
1593                                         CERROR("%s: writing log %s: rc = %d\n",
1594                                                mgs->mgs_obd->obd_name,
1595                                                dirent->name, rc);
1596                         }
1597                 }
1598 next:
1599                 mgs_direntry_free(dirent);
1600         }
1601
1602         RETURN(rc);
1603 }
1604
1605 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1606                                     struct mgs_device *mgs,
1607                                     struct fs_db *fsdb,
1608                                     struct mgs_target_info *mti,
1609                                     int index, char *logname);
1610 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1611                                     struct mgs_device *mgs,
1612                                     struct fs_db *fsdb,
1613                                     struct mgs_target_info *mti,
1614                                     char *logname, char *suffix, char *lovname,
1615                                     enum lustre_sec_part sec_part, int flags);
1616 static int name_create_mdt_and_lov(char **logname, char **lovname,
1617                                    struct fs_db *fsdb, int i);
1618
1619 static int add_param(char *params, char *key, char *val)
1620 {
1621         char *start = params + strlen(params);
1622         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1623         int keylen = 0;
1624
1625         if (key != NULL)
1626                 keylen = strlen(key);
1627         if (start + 1 + keylen + strlen(val) >= end) {
1628                 CERROR("params are too long: %s %s%s\n",
1629                        params, key != NULL ? key : "", val);
1630                 return -EINVAL;
1631         }
1632
1633         sprintf(start, " %s%s", key != NULL ? key : "", val);
1634         return 0;
1635 }
1636
1637 static int mgs_steal_llog_handler(const struct lu_env *env,
1638                                   struct llog_handle *llh,
1639                                   struct llog_rec_hdr *rec, void *data)
1640 {
1641         struct mgs_device *mgs;
1642         struct obd_device *obd;
1643         struct mgs_target_info *mti, *tmti;
1644         struct fs_db *fsdb;
1645         int cfg_len = rec->lrh_len;
1646         char *cfg_buf = (char*) (rec + 1);
1647         struct lustre_cfg *lcfg;
1648         int rc = 0;
1649         struct llog_handle *mdt_llh = NULL;
1650         static int got_an_osc_or_mdc = 0;
1651         /* 0: not found any osc/mdc;
1652            1: found osc;
1653            2: found mdc;
1654         */
1655         static int last_step = -1;
1656
1657         ENTRY;
1658
1659         mti = ((struct temp_comp*)data)->comp_mti;
1660         tmti = ((struct temp_comp*)data)->comp_tmti;
1661         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1662         obd = ((struct temp_comp *)data)->comp_obd;
1663         mgs = lu2mgs_dev(obd->obd_lu_dev);
1664         LASSERT(mgs);
1665
1666         if (rec->lrh_type != OBD_CFG_REC) {
1667                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1668                 RETURN(-EINVAL);
1669         }
1670
1671         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1672         if (rc) {
1673                 CERROR("Insane cfg\n");
1674                 RETURN(rc);
1675         }
1676
1677         lcfg = (struct lustre_cfg *)cfg_buf;
1678
1679         if (lcfg->lcfg_command == LCFG_MARKER) {
1680                 struct cfg_marker *marker;
1681                 marker = lustre_cfg_buf(lcfg, 1);
1682                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1683                     (marker->cm_flags & CM_START) &&
1684                      !(marker->cm_flags & CM_SKIP)) {
1685                         got_an_osc_or_mdc = 1;
1686                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1687                                 sizeof(tmti->mti_svname));
1688                         rc = record_start_log(env, mgs, &mdt_llh,
1689                                               mti->mti_svname);
1690                         if (rc)
1691                                 RETURN(rc);
1692                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1693                                            mti->mti_svname,"add osc(copied)");
1694                         record_end_log(env, &mdt_llh);
1695                         last_step = marker->cm_step;
1696                         RETURN(rc);
1697                 }
1698                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1699                     (marker->cm_flags & CM_END) &&
1700                      !(marker->cm_flags & CM_SKIP)) {
1701                         LASSERT(last_step == marker->cm_step);
1702                         last_step = -1;
1703                         got_an_osc_or_mdc = 0;
1704                         rc = record_start_log(env, mgs, &mdt_llh,
1705                                               mti->mti_svname);
1706                         if (rc)
1707                                 RETURN(rc);
1708                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1709                                            mti->mti_svname,"add osc(copied)");
1710                         record_end_log(env, &mdt_llh);
1711                         RETURN(rc);
1712                 }
1713                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1714                     (marker->cm_flags & CM_START) &&
1715                      !(marker->cm_flags & CM_SKIP)) {
1716                         got_an_osc_or_mdc = 2;
1717                         last_step = marker->cm_step;
1718                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1719                                strlen(marker->cm_tgtname));
1720
1721                         RETURN(rc);
1722                 }
1723                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1724                     (marker->cm_flags & CM_END) &&
1725                      !(marker->cm_flags & CM_SKIP)) {
1726                         LASSERT(last_step == marker->cm_step);
1727                         last_step = -1;
1728                         got_an_osc_or_mdc = 0;
1729                         RETURN(rc);
1730                 }
1731         }
1732
1733         if (got_an_osc_or_mdc == 0 || last_step < 0)
1734                 RETURN(rc);
1735
1736         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1737                 uint64_t nodenid = lcfg->lcfg_nid;
1738
1739                 if (strlen(tmti->mti_uuid) == 0) {
1740                         /* target uuid not set, this config record is before
1741                          * LCFG_SETUP, this nid is one of target node nid.
1742                          */
1743                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1744                         tmti->mti_nid_count++;
1745                 } else {
1746                         /* failover node nid */
1747                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1748                                        libcfs_nid2str(nodenid));
1749                 }
1750
1751                 RETURN(rc);
1752         }
1753
1754         if (lcfg->lcfg_command == LCFG_SETUP) {
1755                 char *target;
1756
1757                 target = lustre_cfg_string(lcfg, 1);
1758                 memcpy(tmti->mti_uuid, target, strlen(target));
1759                 RETURN(rc);
1760         }
1761
1762         /* ignore client side sptlrpc_conf_log */
1763         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1764                 RETURN(rc);
1765
1766         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1767                 int index;
1768
1769                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1770                         RETURN (-EINVAL);
1771
1772                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1773                        strlen(mti->mti_fsname));
1774                 tmti->mti_stripe_index = index;
1775
1776                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1777                                               mti->mti_stripe_index,
1778                                               mti->mti_svname);
1779                 memset(tmti, 0, sizeof(*tmti));
1780                 RETURN(rc);
1781         }
1782
1783         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1784                 int index;
1785                 char mdt_index[9];
1786                 char *logname, *lovname;
1787
1788                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1789                                              mti->mti_stripe_index);
1790                 if (rc)
1791                         RETURN(rc);
1792                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1793
1794                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1795                         name_destroy(&logname);
1796                         name_destroy(&lovname);
1797                         RETURN(-EINVAL);
1798                 }
1799
1800                 tmti->mti_stripe_index = index;
1801                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1802                                          mdt_index, lovname,
1803                                          LUSTRE_SP_MDT, 0);
1804                 name_destroy(&logname);
1805                 name_destroy(&lovname);
1806                 RETURN(rc);
1807         }
1808         RETURN(rc);
1809 }
1810
1811 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1812 /* stealed from mgs_get_fsdb_from_llog*/
1813 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1814                                               struct mgs_device *mgs,
1815                                               char *client_name,
1816                                               struct temp_comp* comp)
1817 {
1818         struct llog_handle *loghandle;
1819         struct mgs_target_info *tmti;
1820         struct llog_ctxt *ctxt;
1821         int rc;
1822
1823         ENTRY;
1824
1825         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1826         LASSERT(ctxt != NULL);
1827
1828         OBD_ALLOC_PTR(tmti);
1829         if (tmti == NULL)
1830                 GOTO(out_ctxt, rc = -ENOMEM);
1831
1832         comp->comp_tmti = tmti;
1833         comp->comp_obd = mgs->mgs_obd;
1834
1835         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1836                        LLOG_OPEN_EXISTS);
1837         if (rc < 0) {
1838                 if (rc == -ENOENT)
1839                         rc = 0;
1840                 GOTO(out_pop, rc);
1841         }
1842
1843         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1844         if (rc)
1845                 GOTO(out_close, rc);
1846
1847         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1848                                   (void *)comp, NULL, false);
1849         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1850 out_close:
1851         llog_close(env, loghandle);
1852 out_pop:
1853         OBD_FREE_PTR(tmti);
1854 out_ctxt:
1855         llog_ctxt_put(ctxt);
1856         RETURN(rc);
1857 }
1858
1859 /* lmv is the second thing for client logs */
1860 /* copied from mgs_write_log_lov. Please refer to that.  */
1861 static int mgs_write_log_lmv(const struct lu_env *env,
1862                              struct mgs_device *mgs,
1863                              struct fs_db *fsdb,
1864                              struct mgs_target_info *mti,
1865                              char *logname, char *lmvname)
1866 {
1867         struct llog_handle *llh = NULL;
1868         struct lmv_desc *lmvdesc;
1869         char *uuid;
1870         int rc = 0;
1871         ENTRY;
1872
1873         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1874
1875         OBD_ALLOC_PTR(lmvdesc);
1876         if (lmvdesc == NULL)
1877                 RETURN(-ENOMEM);
1878         lmvdesc->ld_active_tgt_count = 0;
1879         lmvdesc->ld_tgt_count = 0;
1880         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1881         uuid = (char *)lmvdesc->ld_uuid.uuid;
1882
1883         rc = record_start_log(env, mgs, &llh, logname);
1884         if (rc)
1885                 GOTO(out_free, rc);
1886         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1887         if (rc)
1888                 GOTO(out_end, rc);
1889         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1890         if (rc)
1891                 GOTO(out_end, rc);
1892         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1893         if (rc)
1894                 GOTO(out_end, rc);
1895         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1896         if (rc)
1897                 GOTO(out_end, rc);
1898 out_end:
1899         record_end_log(env, &llh);
1900 out_free:
1901         OBD_FREE_PTR(lmvdesc);
1902         RETURN(rc);
1903 }
1904
1905 /* lov is the first thing in the mdt and client logs */
1906 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1907                              struct fs_db *fsdb, struct mgs_target_info *mti,
1908                              char *logname, char *lovname)
1909 {
1910         struct llog_handle *llh = NULL;
1911         struct lov_desc *lovdesc;
1912         char *uuid;
1913         int rc = 0;
1914         ENTRY;
1915
1916         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1917
1918         /*
1919         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1920         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1921               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1922         */
1923
1924         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1925         OBD_ALLOC_PTR(lovdesc);
1926         if (lovdesc == NULL)
1927                 RETURN(-ENOMEM);
1928         lovdesc->ld_magic = LOV_DESC_MAGIC;
1929         lovdesc->ld_tgt_count = 0;
1930         /* Defaults.  Can be changed later by lcfg config_param */
1931         lovdesc->ld_default_stripe_count = 1;
1932         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1933         lovdesc->ld_default_stripe_size = 1024 * 1024;
1934         lovdesc->ld_default_stripe_offset = -1;
1935         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1936         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1937         /* can these be the same? */
1938         uuid = (char *)lovdesc->ld_uuid.uuid;
1939
1940         /* This should always be the first entry in a log.
1941         rc = mgs_clear_log(obd, logname); */
1942         rc = record_start_log(env, mgs, &llh, logname);
1943         if (rc)
1944                 GOTO(out_free, rc);
1945         /* FIXME these should be a single journal transaction */
1946         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1947         if (rc)
1948                 GOTO(out_end, rc);
1949         rc = record_attach(env, llh, lovname, "lov", uuid);
1950         if (rc)
1951                 GOTO(out_end, rc);
1952         rc = record_lov_setup(env, llh, lovname, lovdesc);
1953         if (rc)
1954                 GOTO(out_end, rc);
1955         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1956         if (rc)
1957                 GOTO(out_end, rc);
1958         EXIT;
1959 out_end:
1960         record_end_log(env, &llh);
1961 out_free:
1962         OBD_FREE_PTR(lovdesc);
1963         return rc;
1964 }
1965
1966 /* add failnids to open log */
1967 static int mgs_write_log_failnids(const struct lu_env *env,
1968                                   struct mgs_target_info *mti,
1969                                   struct llog_handle *llh,
1970                                   char *cliname)
1971 {
1972         char *failnodeuuid = NULL;
1973         char *ptr = mti->mti_params;
1974         lnet_nid_t nid;
1975         int rc = 0;
1976
1977         /*
1978         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1979         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1980         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1981         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1982         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1983         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1984         */
1985
1986         /* Pull failnid info out of params string */
1987         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1988                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1989                         if (failnodeuuid == NULL) {
1990                                 /* We don't know the failover node name,
1991                                    so just use the first nid as the uuid */
1992                                 rc = name_create(&failnodeuuid,
1993                                                  libcfs_nid2str(nid), "");
1994                                 if (rc)
1995                                         return rc;
1996                         }
1997                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1998                                "client %s\n", libcfs_nid2str(nid),
1999                                failnodeuuid, cliname);
2000                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2001                 }
2002                 if (failnodeuuid)
2003                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2004         }
2005
2006         name_destroy(&failnodeuuid);
2007         return rc;
2008 }
2009
2010 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2011                                     struct mgs_device *mgs,
2012                                     struct fs_db *fsdb,
2013                                     struct mgs_target_info *mti,
2014                                     char *logname, char *lmvname)
2015 {
2016         struct llog_handle *llh = NULL;
2017         char *mdcname = NULL;
2018         char *nodeuuid = NULL;
2019         char *mdcuuid = NULL;
2020         char *lmvuuid = NULL;
2021         char index[6];
2022         int i, rc;
2023         ENTRY;
2024
2025         if (mgs_log_is_empty(env, mgs, logname)) {
2026                 CERROR("log is empty! Logical error\n");
2027                 RETURN(-EINVAL);
2028         }
2029
2030         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2031                mti->mti_svname, logname, lmvname);
2032
2033         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2034         if (rc)
2035                 RETURN(rc);
2036         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2037         if (rc)
2038                 GOTO(out_free, rc);
2039         rc = name_create(&mdcuuid, mdcname, "_UUID");
2040         if (rc)
2041                 GOTO(out_free, rc);
2042         rc = name_create(&lmvuuid, lmvname, "_UUID");
2043         if (rc)
2044                 GOTO(out_free, rc);
2045
2046         rc = record_start_log(env, mgs, &llh, logname);
2047         if (rc)
2048                 GOTO(out_free, rc);
2049         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2050                            "add mdc");
2051         if (rc)
2052                 GOTO(out_end, rc);
2053         for (i = 0; i < mti->mti_nid_count; i++) {
2054                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2055                        libcfs_nid2str(mti->mti_nids[i]));
2056
2057                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2058                 if (rc)
2059                         GOTO(out_end, rc);
2060         }
2061
2062         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2063         if (rc)
2064                 GOTO(out_end, rc);
2065         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2066         if (rc)
2067                 GOTO(out_end, rc);
2068         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2069         if (rc)
2070                 GOTO(out_end, rc);
2071         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2072         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2073                             index, "1");
2074         if (rc)
2075                 GOTO(out_end, rc);
2076         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2077                            "add mdc");
2078         if (rc)
2079                 GOTO(out_end, rc);
2080 out_end:
2081         record_end_log(env, &llh);
2082 out_free:
2083         name_destroy(&lmvuuid);
2084         name_destroy(&mdcuuid);
2085         name_destroy(&mdcname);
2086         name_destroy(&nodeuuid);
2087         RETURN(rc);
2088 }
2089
2090 static inline int name_create_lov(char **lovname, char *mdtname,
2091                                   struct fs_db *fsdb, int index)
2092 {
2093         /* COMPAT_180 */
2094         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2095                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2096         else
2097                 return name_create(lovname, mdtname, "-mdtlov");
2098 }
2099
2100 static int name_create_mdt_and_lov(char **logname, char **lovname,
2101                                    struct fs_db *fsdb, int i)
2102 {
2103         int rc;
2104
2105         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2106         if (rc)
2107                 return rc;
2108         /* COMPAT_180 */
2109         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2110                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2111         else
2112                 rc = name_create(lovname, *logname, "-mdtlov");
2113         if (rc) {
2114                 name_destroy(logname);
2115                 *logname = NULL;
2116         }
2117         return rc;
2118 }
2119
2120 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2121                                       struct fs_db *fsdb, int i)
2122 {
2123         char suffix[16];
2124
2125         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2126                 sprintf(suffix, "-osc");
2127         else
2128                 sprintf(suffix, "-osc-MDT%04x", i);
2129         return name_create(oscname, ostname, suffix);
2130 }
2131
2132 /* add new mdc to already existent MDS */
2133 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2134                                     struct mgs_device *mgs,
2135                                     struct fs_db *fsdb,
2136                                     struct mgs_target_info *mti,
2137                                     int mdt_index, char *logname)
2138 {
2139         struct llog_handle      *llh = NULL;
2140         char    *nodeuuid = NULL;
2141         char    *ospname = NULL;
2142         char    *lovuuid = NULL;
2143         char    *mdtuuid = NULL;
2144         char    *svname = NULL;
2145         char    *mdtname = NULL;
2146         char    *lovname = NULL;
2147         char    index_str[16];
2148         int     i, rc;
2149
2150         ENTRY;
2151         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2152                 CERROR("log is empty! Logical error\n");
2153                 RETURN (-EINVAL);
2154         }
2155
2156         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2157                logname);
2158
2159         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2160         if (rc)
2161                 RETURN(rc);
2162
2163         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2164         if (rc)
2165                 GOTO(out_destory, rc);
2166
2167         rc = name_create(&svname, mdtname, "-osp");
2168         if (rc)
2169                 GOTO(out_destory, rc);
2170
2171         sprintf(index_str, "-MDT%04x", mdt_index);
2172         rc = name_create(&ospname, svname, index_str);
2173         if (rc)
2174                 GOTO(out_destory, rc);
2175
2176         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2177         if (rc)
2178                 GOTO(out_destory, rc);
2179
2180         rc = name_create(&lovuuid, lovname, "_UUID");
2181         if (rc)
2182                 GOTO(out_destory, rc);
2183
2184         rc = name_create(&mdtuuid, mdtname, "_UUID");
2185         if (rc)
2186                 GOTO(out_destory, rc);
2187
2188         rc = record_start_log(env, mgs, &llh, logname);
2189         if (rc)
2190                 GOTO(out_destory, rc);
2191
2192         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2193                            "add osp");
2194         if (rc)
2195                 GOTO(out_destory, rc);
2196
2197         for (i = 0; i < mti->mti_nid_count; i++) {
2198                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2199                        libcfs_nid2str(mti->mti_nids[i]));
2200                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2201                 if (rc)
2202                         GOTO(out_end, rc);
2203         }
2204
2205         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2206         if (rc)
2207                 GOTO(out_end, rc);
2208
2209         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2210                           NULL, NULL);
2211         if (rc)
2212                 GOTO(out_end, rc);
2213
2214         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2215         if (rc)
2216                 GOTO(out_end, rc);
2217
2218         /* Add mdc(osp) to lod */
2219         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2220                  mti->mti_stripe_index);
2221         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2222                          index_str, "1", NULL);
2223         if (rc)
2224                 GOTO(out_end, rc);
2225
2226         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2227         if (rc)
2228                 GOTO(out_end, rc);
2229
2230 out_end:
2231         record_end_log(env, &llh);
2232
2233 out_destory:
2234         name_destroy(&mdtuuid);
2235         name_destroy(&lovuuid);
2236         name_destroy(&lovname);
2237         name_destroy(&ospname);
2238         name_destroy(&svname);
2239         name_destroy(&nodeuuid);
2240         name_destroy(&mdtname);
2241         RETURN(rc);
2242 }
2243
2244 static int mgs_write_log_mdt0(const struct lu_env *env,
2245                               struct mgs_device *mgs,
2246                               struct fs_db *fsdb,
2247                               struct mgs_target_info *mti)
2248 {
2249         char *log = mti->mti_svname;
2250         struct llog_handle *llh = NULL;
2251         char *uuid, *lovname;
2252         char mdt_index[6];
2253         char *ptr = mti->mti_params;
2254         int rc = 0, failout = 0;
2255         ENTRY;
2256
2257         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2258         if (uuid == NULL)
2259                 RETURN(-ENOMEM);
2260
2261         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2262                 failout = (strncmp(ptr, "failout", 7) == 0);
2263
2264         rc = name_create(&lovname, log, "-mdtlov");
2265         if (rc)
2266                 GOTO(out_free, rc);
2267         if (mgs_log_is_empty(env, mgs, log)) {
2268                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2269                 if (rc)
2270                         GOTO(out_lod, rc);
2271         }
2272
2273         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2274
2275         rc = record_start_log(env, mgs, &llh, log);
2276         if (rc)
2277                 GOTO(out_lod, rc);
2278
2279         /* add MDT itself */
2280
2281         /* FIXME this whole fn should be a single journal transaction */
2282         sprintf(uuid, "%s_UUID", log);
2283         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2284         if (rc)
2285                 GOTO(out_lod, rc);
2286         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2287         if (rc)
2288                 GOTO(out_end, rc);
2289         rc = record_mount_opt(env, llh, log, lovname, NULL);
2290         if (rc)
2291                 GOTO(out_end, rc);
2292         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2293                         failout ? "n" : "f");
2294         if (rc)
2295                 GOTO(out_end, rc);
2296         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2297         if (rc)
2298                 GOTO(out_end, rc);
2299 out_end:
2300         record_end_log(env, &llh);
2301 out_lod:
2302         name_destroy(&lovname);
2303 out_free:
2304         OBD_FREE(uuid, sizeof(struct obd_uuid));
2305         RETURN(rc);
2306 }
2307
2308 /* envelope method for all layers log */
2309 static int mgs_write_log_mdt(const struct lu_env *env,
2310                              struct mgs_device *mgs,
2311                              struct fs_db *fsdb,
2312                              struct mgs_target_info *mti)
2313 {
2314         struct mgs_thread_info *mgi = mgs_env_info(env);
2315         struct llog_handle *llh = NULL;
2316         char *cliname;
2317         int rc, i = 0;
2318         ENTRY;
2319
2320         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2321
2322         if (mti->mti_uuid[0] == '\0') {
2323                 /* Make up our own uuid */
2324                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2325                          "%s_UUID", mti->mti_svname);
2326         }
2327
2328         /* add mdt */
2329         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2330         if (rc)
2331                 RETURN(rc);
2332         /* Append the mdt info to the client log */
2333         rc = name_create(&cliname, mti->mti_fsname, "-client");
2334         if (rc)
2335                 RETURN(rc);
2336
2337         if (mgs_log_is_empty(env, mgs, cliname)) {
2338                 /* Start client log */
2339                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2340                                        fsdb->fsdb_clilov);
2341                 if (rc)
2342                         GOTO(out_free, rc);
2343                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2344                                        fsdb->fsdb_clilmv);
2345                 if (rc)
2346                         GOTO(out_free, rc);
2347         }
2348
2349         /*
2350         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2351         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2352         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2353         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2354         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2355         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2356         */
2357
2358                 /* copy client info about lov/lmv */
2359                 mgi->mgi_comp.comp_mti = mti;
2360                 mgi->mgi_comp.comp_fsdb = fsdb;
2361
2362                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2363                                                         &mgi->mgi_comp);
2364                 if (rc)
2365                         GOTO(out_free, rc);
2366                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2367                                               fsdb->fsdb_clilmv);
2368                 if (rc)
2369                         GOTO(out_free, rc);
2370
2371                 /* add mountopts */
2372                 rc = record_start_log(env, mgs, &llh, cliname);
2373                 if (rc)
2374                         GOTO(out_free, rc);
2375
2376                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2377                                    "mount opts");
2378                 if (rc)
2379                         GOTO(out_end, rc);
2380                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2381                                       fsdb->fsdb_clilmv);
2382                 if (rc)
2383                         GOTO(out_end, rc);
2384                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2385                                    "mount opts");
2386
2387         if (rc)
2388                 GOTO(out_end, rc);
2389
2390         /* for_all_existing_mdt except current one */
2391         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2392                 if (i !=  mti->mti_stripe_index &&
2393                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2394                         char *logname;
2395
2396                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2397                         if (rc)
2398                                 GOTO(out_end, rc);
2399
2400                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2401                                                       i, logname);
2402                         name_destroy(&logname);
2403                         if (rc)
2404                                 GOTO(out_end, rc);
2405                 }
2406         }
2407 out_end:
2408         record_end_log(env, &llh);
2409 out_free:
2410         name_destroy(&cliname);
2411         RETURN(rc);
2412 }
2413
2414 /* Add the ost info to the client/mdt lov */
2415 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2416                                     struct mgs_device *mgs, struct fs_db *fsdb,
2417                                     struct mgs_target_info *mti,
2418                                     char *logname, char *suffix, char *lovname,
2419                                     enum lustre_sec_part sec_part, int flags)
2420 {
2421         struct llog_handle *llh = NULL;
2422         char *nodeuuid = NULL;
2423         char *oscname = NULL;
2424         char *oscuuid = NULL;
2425         char *lovuuid = NULL;
2426         char *svname = NULL;
2427         char index[6];
2428         int i, rc;
2429
2430         ENTRY;
2431         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2432                mti->mti_svname, logname);
2433
2434         if (mgs_log_is_empty(env, mgs, logname)) {
2435                 CERROR("log is empty! Logical error\n");
2436                 RETURN (-EINVAL);
2437         }
2438
2439         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2440         if (rc)
2441                 RETURN(rc);
2442         rc = name_create(&svname, mti->mti_svname, "-osc");
2443         if (rc)
2444                 GOTO(out_free, rc);
2445
2446         /* for the system upgraded from old 1.8, keep using the old osc naming
2447          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2448         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2449                 rc = name_create(&oscname, svname, "");
2450         else
2451                 rc = name_create(&oscname, svname, suffix);
2452         if (rc)
2453                 GOTO(out_free, rc);
2454
2455         rc = name_create(&oscuuid, oscname, "_UUID");
2456         if (rc)
2457                 GOTO(out_free, rc);
2458         rc = name_create(&lovuuid, lovname, "_UUID");
2459         if (rc)
2460                 GOTO(out_free, rc);
2461
2462
2463         /*
2464         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2465         multihomed (#4)
2466         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2467         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2468         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2469         failover (#6,7)
2470         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2471         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2472         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2473         */
2474
2475         rc = record_start_log(env, mgs, &llh, logname);
2476         if (rc)
2477                 GOTO(out_free, rc);
2478
2479         /* FIXME these should be a single journal transaction */
2480         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2481                            "add osc");
2482         if (rc)
2483                 GOTO(out_end, rc);
2484
2485         /* NB: don't change record order, because upon MDT steal OSC config
2486          * from client, it treats all nids before LCFG_SETUP as target nids
2487          * (multiple interfaces), while nids after as failover node nids.
2488          * See mgs_steal_llog_handler() LCFG_ADD_UUID.
2489          */
2490         for (i = 0; i < mti->mti_nid_count; i++) {
2491                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2492                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2493                 if (rc)
2494                         GOTO(out_end, rc);
2495         }
2496         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2497         if (rc)
2498                 GOTO(out_end, rc);
2499         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2500         if (rc)
2501                 GOTO(out_end, rc);
2502         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2503         if (rc)
2504                 GOTO(out_end, rc);
2505
2506         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2507
2508         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2509         if (rc)
2510                 GOTO(out_end, rc);
2511         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2512                            "add osc");
2513         if (rc)
2514                 GOTO(out_end, rc);
2515 out_end:
2516         record_end_log(env, &llh);
2517 out_free:
2518         name_destroy(&lovuuid);
2519         name_destroy(&oscuuid);
2520         name_destroy(&oscname);
2521         name_destroy(&svname);
2522         name_destroy(&nodeuuid);
2523         RETURN(rc);
2524 }
2525
2526 static int mgs_write_log_ost(const struct lu_env *env,
2527                              struct mgs_device *mgs, struct fs_db *fsdb,
2528                              struct mgs_target_info *mti)
2529 {
2530         struct llog_handle *llh = NULL;
2531         char *logname, *lovname;
2532         char *ptr = mti->mti_params;
2533         int rc, flags = 0, failout = 0, i;
2534         ENTRY;
2535
2536         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2537
2538         /* The ost startup log */
2539
2540         /* If the ost log already exists, that means that someone reformatted
2541            the ost and it called target_add again. */
2542         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2543                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2544                                    "exists, yet the server claims it never "
2545                                    "registered. It may have been reformatted, "
2546                                    "or the index changed. writeconf the MDT to "
2547                                    "regenerate all logs.\n", mti->mti_svname);
2548                 RETURN(-EALREADY);
2549         }
2550
2551         /*
2552         attach obdfilter ost1 ost1_UUID
2553         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2554         */
2555         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2556                 failout = (strncmp(ptr, "failout", 7) == 0);
2557         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2558         if (rc)
2559                 RETURN(rc);
2560         /* FIXME these should be a single journal transaction */
2561         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2562         if (rc)
2563                 GOTO(out_end, rc);
2564         if (*mti->mti_uuid == '\0')
2565                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2566                          "%s_UUID", mti->mti_svname);
2567         rc = record_attach(env, llh, mti->mti_svname,
2568                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2569         if (rc)
2570                 GOTO(out_end, rc);
2571         rc = record_setup(env, llh, mti->mti_svname,
2572                           "dev"/*ignored*/, "type"/*ignored*/,
2573                           failout ? "n" : "f", 0/*options*/);
2574         if (rc)
2575                 GOTO(out_end, rc);
2576         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2577         if (rc)
2578                 GOTO(out_end, rc);
2579 out_end:
2580         record_end_log(env, &llh);
2581         if (rc)
2582                 RETURN(rc);
2583         /* We also have to update the other logs where this osc is part of
2584            the lov */
2585
2586         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2587                 /* If we're upgrading, the old mdt log already has our
2588                    entry. Let's do a fake one for fun. */
2589                 /* Note that we can't add any new failnids, since we don't
2590                    know the old osc names. */
2591                 flags = CM_SKIP | CM_UPGRADE146;
2592
2593         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2594                 /* If the update flag isn't set, don't update client/mdt
2595                    logs. */
2596                 flags |= CM_SKIP;
2597                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2598                               "the MDT first to regenerate it.\n",
2599                               mti->mti_svname);
2600         }
2601
2602         /* Add ost to all MDT lov defs */
2603         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2604                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2605                         char mdt_index[9];
2606
2607                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2608                                                      i);
2609                         if (rc)
2610                                 RETURN(rc);
2611                         sprintf(mdt_index, "-MDT%04x", i);
2612                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2613                                                       logname, mdt_index,
2614                                                       lovname, LUSTRE_SP_MDT,
2615                                                       flags);
2616                         name_destroy(&logname);
2617                         name_destroy(&lovname);
2618                         if (rc)
2619                                 RETURN(rc);
2620                 }
2621         }
2622
2623         /* Append ost info to the client log */
2624         rc = name_create(&logname, mti->mti_fsname, "-client");
2625         if (rc)
2626                 RETURN(rc);
2627         if (mgs_log_is_empty(env, mgs, logname)) {
2628                 /* Start client log */
2629                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2630                                        fsdb->fsdb_clilov);
2631                 if (rc)
2632                         GOTO(out_free, rc);
2633                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2634                                        fsdb->fsdb_clilmv);
2635                 if (rc)
2636                         GOTO(out_free, rc);
2637         }
2638         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2639                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2640 out_free:
2641         name_destroy(&logname);
2642         RETURN(rc);
2643 }
2644
2645 static __inline__ int mgs_param_empty(char *ptr)
2646 {
2647         char *tmp;
2648
2649         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2650                 return 1;
2651         return 0;
2652 }
2653
2654 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2655                                           struct mgs_device *mgs,
2656                                           struct fs_db *fsdb,
2657                                           struct mgs_target_info *mti,
2658                                           char *logname, char *cliname)
2659 {
2660         int rc;
2661         struct llog_handle *llh = NULL;
2662
2663         if (mgs_param_empty(mti->mti_params)) {
2664                 /* Remove _all_ failnids */
2665                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2666                                 mti->mti_svname, "add failnid", CM_SKIP);
2667                 return rc < 0 ? rc : 0;
2668         }
2669
2670         /* Otherwise failover nids are additive */
2671         rc = record_start_log(env, mgs, &llh, logname);
2672         if (rc)
2673                 return rc;
2674                 /* FIXME this should be a single journal transaction */
2675         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2676                            "add failnid");
2677         if (rc)
2678                 goto out_end;
2679         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2680         if (rc)
2681                 goto out_end;
2682         rc = record_marker(env, llh, fsdb, CM_END,
2683                            mti->mti_svname, "add failnid");
2684 out_end:
2685         record_end_log(env, &llh);
2686         return rc;
2687 }
2688
2689
2690 /* Add additional failnids to an existing log.
2691    The mdc/osc must have been added to logs first */
2692 /* tcp nids must be in dotted-quad ascii -
2693    we can't resolve hostnames from the kernel. */
2694 static int mgs_write_log_add_failnid(const struct lu_env *env,
2695                                      struct mgs_device *mgs,
2696                                      struct fs_db *fsdb,
2697                                      struct mgs_target_info *mti)
2698 {
2699         char *logname, *cliname;
2700         int rc;
2701         ENTRY;
2702
2703         /* FIXME we currently can't erase the failnids
2704          * given when a target first registers, since they aren't part of
2705          * an "add uuid" stanza */
2706
2707         /* Verify that we know about this target */
2708         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2709                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2710                                    "yet. It must be started before failnids "
2711                                    "can be added.\n", mti->mti_svname);
2712                 RETURN(-ENOENT);
2713         }
2714
2715         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2716         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2717                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2718         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2719                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2720         } else {
2721                 RETURN(-EINVAL);
2722         }
2723         if (rc)
2724                 RETURN(rc);
2725         /* Add failover nids to the client log */
2726         rc = name_create(&logname, mti->mti_fsname, "-client");
2727         if (rc) {
2728                 name_destroy(&cliname);
2729                 RETURN(rc);
2730         }
2731         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2732         name_destroy(&logname);
2733         name_destroy(&cliname);
2734         if (rc)
2735                 RETURN(rc);
2736
2737         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2738                 /* Add OST failover nids to the MDT logs as well */
2739                 int i;
2740
2741                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2742                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2743                                 continue;
2744                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2745                         if (rc)
2746                                 RETURN(rc);
2747                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2748                                                  fsdb, i);
2749                         if (rc) {
2750                                 name_destroy(&logname);
2751                                 RETURN(rc);
2752                         }
2753                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2754                                                             mti, logname,
2755                                                             cliname);
2756                         name_destroy(&cliname);
2757                         name_destroy(&logname);
2758                         if (rc)
2759                                 RETURN(rc);
2760                 }
2761         }
2762
2763         RETURN(rc);
2764 }
2765
2766 static int mgs_wlp_lcfg(const struct lu_env *env,
2767                         struct mgs_device *mgs, struct fs_db *fsdb,
2768                         struct mgs_target_info *mti,
2769                         char *logname, struct lustre_cfg_bufs *bufs,
2770                         char *tgtname, char *ptr)
2771 {
2772         char comment[MTI_NAME_MAXLEN];
2773         char *tmp;
2774         struct lustre_cfg *lcfg;
2775         int rc, del;
2776
2777         /* Erase any old settings of this same parameter */
2778         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2779         comment[MTI_NAME_MAXLEN - 1] = 0;
2780         /* But don't try to match the value. */
2781         if ((tmp = strchr(comment, '=')))
2782             *tmp = 0;
2783         /* FIXME we should skip settings that are the same as old values */
2784         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2785         if (rc < 0)
2786                 return rc;
2787         del = mgs_param_empty(ptr);
2788
2789         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2790                       "Sett" : "Modify", tgtname, comment, logname);
2791         if (del)
2792                 return rc;
2793
2794         lustre_cfg_bufs_reset(bufs, tgtname);
2795         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2796         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2797         if (!lcfg)
2798                 return -ENOMEM;
2799         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2800         lustre_cfg_free(lcfg);
2801         return rc;
2802 }
2803
2804 /* write global variable settings into log */
2805 static int mgs_write_log_sys(const struct lu_env *env,
2806                              struct mgs_device *mgs, struct fs_db *fsdb,
2807                              struct mgs_target_info *mti, char *sys, char *ptr)
2808 {
2809         struct mgs_thread_info *mgi = mgs_env_info(env);
2810         struct lustre_cfg *lcfg;
2811         char *tmp, sep;
2812         int rc, cmd, convert = 1;
2813
2814         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2815                 cmd = LCFG_SET_TIMEOUT;
2816         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2817                 cmd = LCFG_SET_LDLM_TIMEOUT;
2818         /* Check for known params here so we can return error to lctl */
2819         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2820                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2821                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2822                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2823                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2824                 cmd = LCFG_PARAM;
2825         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2826                 convert = 0; /* Don't convert string value to integer */
2827                 cmd = LCFG_PARAM;
2828         } else {
2829                 return -EINVAL;
2830         }
2831
2832         if (mgs_param_empty(ptr))
2833                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2834         else
2835                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2836
2837         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2838         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2839         if (!convert && *tmp != '\0')
2840                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2841         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2842         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2843         /* truncate the comment to the parameter name */
2844         ptr = tmp - 1;
2845         sep = *ptr;
2846         *ptr = '\0';
2847         /* modify all servers and clients */
2848         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2849                                       *tmp == '\0' ? NULL : lcfg,
2850                                       mti->mti_fsname, sys, 0);
2851         if (rc == 0 && *tmp != '\0') {
2852                 switch (cmd) {
2853                 case LCFG_SET_TIMEOUT:
2854                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2855                                 class_process_config(lcfg);
2856                         break;
2857                 case LCFG_SET_LDLM_TIMEOUT:
2858                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2859                                 class_process_config(lcfg);
2860                         break;
2861                 default:
2862                         break;
2863                 }
2864         }
2865         *ptr = sep;
2866         lustre_cfg_free(lcfg);
2867         return rc;
2868 }
2869
2870 /* write quota settings into log */
2871 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2872                                struct fs_db *fsdb, struct mgs_target_info *mti,
2873                                char *quota, char *ptr)
2874 {
2875         struct mgs_thread_info  *mgi = mgs_env_info(env);
2876         struct lustre_cfg       *lcfg;
2877         char                    *tmp;
2878         char                     sep;
2879         int                      rc, cmd = LCFG_PARAM;
2880
2881         /* support only 'meta' and 'data' pools so far */
2882         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2883             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2884                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2885                        "& quota.ost are)\n", ptr);
2886                 return -EINVAL;
2887         }
2888
2889         if (*tmp == '\0') {
2890                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2891         } else {
2892                 CDEBUG(D_MGS, "global '%s'\n", quota);
2893
2894                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2895                     strcmp(tmp, "none") != 0) {
2896                         CERROR("enable option(%s) isn't supported\n", tmp);
2897                         return -EINVAL;
2898                 }
2899         }
2900
2901         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2902         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2903         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2904         /* truncate the comment to the parameter name */
2905         ptr = tmp - 1;
2906         sep = *ptr;
2907         *ptr = '\0';
2908
2909         /* XXX we duplicated quota enable information in all server
2910          *     config logs, it should be moved to a separate config
2911          *     log once we cleanup the config log for global param. */
2912         /* modify all servers */
2913         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2914                                       *tmp == '\0' ? NULL : lcfg,
2915                                       mti->mti_fsname, quota, 1);
2916         *ptr = sep;
2917         lustre_cfg_free(lcfg);
2918         return rc < 0 ? rc : 0;
2919 }
2920
2921 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2922                                    struct mgs_device *mgs,
2923                                    struct fs_db *fsdb,
2924                                    struct mgs_target_info *mti,
2925                                    char *param)
2926 {
2927         struct mgs_thread_info *mgi = mgs_env_info(env);
2928         struct llog_handle     *llh = NULL;
2929         char                   *logname;
2930         char                   *comment, *ptr;
2931         struct lustre_cfg      *lcfg;
2932         int                     rc, len;
2933         ENTRY;
2934
2935         /* get comment */
2936         ptr = strchr(param, '=');
2937         LASSERT(ptr);
2938         len = ptr - param;
2939
2940         OBD_ALLOC(comment, len + 1);
2941         if (comment == NULL)
2942                 RETURN(-ENOMEM);
2943         strncpy(comment, param, len);
2944         comment[len] = '\0';
2945
2946         /* prepare lcfg */
2947         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2948         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2949         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2950         if (lcfg == NULL)
2951                 GOTO(out_comment, rc = -ENOMEM);
2952
2953         /* construct log name */
2954         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2955         if (rc)
2956                 GOTO(out_lcfg, rc);
2957
2958         if (mgs_log_is_empty(env, mgs, logname)) {
2959                 rc = record_start_log(env, mgs, &llh, logname);
2960                 if (rc)
2961                         GOTO(out, rc);
2962                 record_end_log(env, &llh);
2963         }
2964
2965         /* obsolete old one */
2966         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2967                         comment, CM_SKIP);
2968         if (rc < 0)
2969                 GOTO(out, rc);
2970         /* write the new one */
2971         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2972                                   mti->mti_svname, comment);
2973         if (rc)
2974                 CERROR("err %d writing log %s\n", rc, logname);
2975 out:
2976         name_destroy(&logname);
2977 out_lcfg:
2978         lustre_cfg_free(lcfg);
2979 out_comment:
2980         OBD_FREE(comment, len + 1);
2981         RETURN(rc);
2982 }
2983
2984 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2985                                         char *param)
2986 {
2987         char    *ptr;
2988
2989         /* disable the adjustable udesc parameter for now, i.e. use default
2990          * setting that client always ship udesc to MDT if possible. to enable
2991          * it simply remove the following line */
2992         goto error_out;
2993
2994         ptr = strchr(param, '=');
2995         if (ptr == NULL)
2996                 goto error_out;
2997         *ptr++ = '\0';
2998
2999         if (strcmp(param, PARAM_SRPC_UDESC))
3000                 goto error_out;
3001
3002         if (strcmp(ptr, "yes") == 0) {
3003                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3004                 CWARN("Enable user descriptor shipping from client to MDT\n");
3005         } else if (strcmp(ptr, "no") == 0) {
3006                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3007                 CWARN("Disable user descriptor shipping from client to MDT\n");
3008         } else {
3009                 *(ptr - 1) = '=';
3010                 goto error_out;
3011         }
3012         return 0;
3013
3014 error_out:
3015         CERROR("Invalid param: %s\n", param);
3016         return -EINVAL;
3017 }
3018
3019 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3020                                   const char *svname,
3021                                   char *param)
3022 {
3023         struct sptlrpc_rule      rule;
3024         struct sptlrpc_rule_set *rset;
3025         int                      rc;
3026         ENTRY;
3027
3028         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3029                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3030                 RETURN(-EINVAL);
3031         }
3032
3033         if (strncmp(param, PARAM_SRPC_UDESC,
3034                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3035                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3036         }
3037
3038         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3039                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3040                 RETURN(-EINVAL);
3041         }
3042
3043         param += sizeof(PARAM_SRPC_FLVR) - 1;
3044
3045         rc = sptlrpc_parse_rule(param, &rule);
3046         if (rc)
3047                 RETURN(rc);
3048
3049         /* mgs rules implies must be mgc->mgs */
3050         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3051                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3052                      rule.sr_from != LUSTRE_SP_ANY) ||
3053                     (rule.sr_to != LUSTRE_SP_MGS &&
3054                      rule.sr_to != LUSTRE_SP_ANY))
3055                         RETURN(-EINVAL);
3056         }
3057
3058         /* preapre room for this coming rule. svcname format should be:
3059          * - fsname: general rule
3060          * - fsname-tgtname: target-specific rule
3061          */
3062         if (strchr(svname, '-')) {
3063                 struct mgs_tgt_srpc_conf *tgtconf;
3064                 int                       found = 0;
3065
3066                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3067                      tgtconf = tgtconf->mtsc_next) {
3068                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3069                                 found = 1;
3070                                 break;
3071                         }
3072                 }
3073
3074                 if (!found) {
3075                         int name_len;
3076
3077                         OBD_ALLOC_PTR(tgtconf);
3078                         if (tgtconf == NULL)
3079                                 RETURN(-ENOMEM);
3080
3081                         name_len = strlen(svname);
3082
3083                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3084                         if (tgtconf->mtsc_tgt == NULL) {
3085                                 OBD_FREE_PTR(tgtconf);
3086                                 RETURN(-ENOMEM);
3087                         }
3088                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3089
3090                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3091                         fsdb->fsdb_srpc_tgt = tgtconf;
3092                 }
3093
3094                 rset = &tgtconf->mtsc_rset;
3095         } else {
3096                 rset = &fsdb->fsdb_srpc_gen;
3097         }
3098
3099         rc = sptlrpc_rule_set_merge(rset, &rule);
3100
3101         RETURN(rc);
3102 }
3103
3104 static int mgs_srpc_set_param(const struct lu_env *env,
3105                               struct mgs_device *mgs,
3106                               struct fs_db *fsdb,
3107                               struct mgs_target_info *mti,
3108                               char *param)
3109 {
3110         char                   *copy;
3111         int                     rc, copy_size;
3112         ENTRY;
3113
3114 #ifndef HAVE_GSS
3115         RETURN(-EINVAL);
3116 #endif
3117         /* keep a copy of original param, which could be destroied
3118          * during parsing */
3119         copy_size = strlen(param) + 1;
3120         OBD_ALLOC(copy, copy_size);
3121         if (copy == NULL)
3122                 return -ENOMEM;
3123         memcpy(copy, param, copy_size);
3124
3125         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3126         if (rc)
3127                 goto out_free;
3128
3129         /* previous steps guaranteed the syntax is correct */
3130         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3131         if (rc)
3132                 goto out_free;
3133
3134         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3135                 /*
3136                  * for mgs rules, make them effective immediately.
3137                  */
3138                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3139                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3140                                                  &fsdb->fsdb_srpc_gen);
3141         }
3142
3143 out_free:
3144         OBD_FREE(copy, copy_size);
3145         RETURN(rc);
3146 }
3147
3148 struct mgs_srpc_read_data {
3149         struct fs_db   *msrd_fsdb;
3150         int             msrd_skip;
3151 };
3152
3153 static int mgs_srpc_read_handler(const struct lu_env *env,
3154                                  struct llog_handle *llh,
3155                                  struct llog_rec_hdr *rec, void *data)
3156 {
3157         struct mgs_srpc_read_data *msrd = data;
3158         struct cfg_marker         *marker;
3159         struct lustre_cfg         *lcfg = REC_DATA(rec);
3160         char                      *svname, *param;
3161         int                        cfg_len, rc;
3162         ENTRY;
3163
3164         if (rec->lrh_type != OBD_CFG_REC) {
3165                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3166                 RETURN(-EINVAL);
3167         }
3168
3169         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3170                   sizeof(struct llog_rec_tail);
3171
3172         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3173         if (rc) {
3174                 CERROR("Insane cfg\n");
3175                 RETURN(rc);
3176         }
3177
3178         if (lcfg->lcfg_command == LCFG_MARKER) {
3179                 marker = lustre_cfg_buf(lcfg, 1);
3180
3181                 if (marker->cm_flags & CM_START &&
3182                     marker->cm_flags & CM_SKIP)
3183                         msrd->msrd_skip = 1;
3184                 if (marker->cm_flags & CM_END)
3185                         msrd->msrd_skip = 0;
3186
3187                 RETURN(0);
3188         }
3189
3190         if (msrd->msrd_skip)
3191                 RETURN(0);
3192
3193         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3194                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3195                 RETURN(0);
3196         }
3197
3198         svname = lustre_cfg_string(lcfg, 0);
3199         if (svname == NULL) {
3200                 CERROR("svname is empty\n");
3201                 RETURN(0);
3202         }
3203
3204         param = lustre_cfg_string(lcfg, 1);
3205         if (param == NULL) {
3206                 CERROR("param is empty\n");
3207                 RETURN(0);
3208         }
3209
3210         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3211         if (rc)
3212                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3213
3214         RETURN(0);
3215 }
3216
3217 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3218                                 struct mgs_device *mgs,
3219                                 struct fs_db *fsdb)
3220 {
3221         struct llog_handle        *llh = NULL;
3222         struct llog_ctxt          *ctxt;
3223         char                      *logname;
3224         struct mgs_srpc_read_data  msrd;
3225         int                        rc;
3226         ENTRY;
3227
3228         /* construct log name */
3229         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3230         if (rc)
3231                 RETURN(rc);
3232
3233         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3234         LASSERT(ctxt != NULL);
3235
3236         if (mgs_log_is_empty(env, mgs, logname))
3237                 GOTO(out, rc = 0);
3238
3239         rc = llog_open(env, ctxt, &llh, NULL, logname,
3240                        LLOG_OPEN_EXISTS);
3241         if (rc < 0) {
3242                 if (rc == -ENOENT)
3243                         rc = 0;
3244                 GOTO(out, rc);
3245         }
3246
3247         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3248         if (rc)
3249                 GOTO(out_close, rc);
3250
3251         if (llog_get_size(llh) <= 1)
3252                 GOTO(out_close, rc = 0);
3253
3254         msrd.msrd_fsdb = fsdb;
3255         msrd.msrd_skip = 0;
3256
3257         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3258                           NULL);
3259
3260 out_close:
3261         llog_close(env, llh);
3262 out:
3263         llog_ctxt_put(ctxt);
3264         name_destroy(&logname);
3265