Whamcloud - gitweb
LU-4629 libcfs: fix buffer overflow of string buffer
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (fsdb)
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 RETURN(-ENOMEM);
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
563                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
564                                    "but the max index is %d.\n",
565                                    mti->mti_svname, mti->mti_stripe_index,
566                                    INDEX_MAP_SIZE * 8);
567                 GOTO(out_up, rc = -ERANGE);
568         }
569
570         if (test_bit(mti->mti_stripe_index, imap)) {
571                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
572                     !(mti->mti_flags & LDD_F_WRITECONF)) {
573                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
574                                            "%d, but that index is already in "
575                                            "use. Use --writeconf to force\n",
576                                            mti->mti_svname,
577                                            mti->mti_stripe_index);
578                         GOTO(out_up, rc = -EADDRINUSE);
579                 } else {
580                         CDEBUG(D_MGS, "Server %s updating index %d\n",
581                                mti->mti_svname, mti->mti_stripe_index);
582                         GOTO(out_up, rc = EALREADY);
583                 }
584         }
585
586         set_bit(mti->mti_stripe_index, imap);
587         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
588         mutex_unlock(&fsdb->fsdb_mutex);
589         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
590                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
591
592         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
593                mti->mti_stripe_index);
594
595         RETURN(0);
596 out_up:
597         mutex_unlock(&fsdb->fsdb_mutex);
598         return rc;
599 }
600
601 struct mgs_modify_lookup {
602         struct cfg_marker mml_marker;
603         int               mml_modified;
604 };
605
606 static int mgs_modify_handler(const struct lu_env *env,
607                               struct llog_handle *llh,
608                               struct llog_rec_hdr *rec, void *data)
609 {
610         struct mgs_modify_lookup *mml = data;
611         struct cfg_marker *marker;
612         struct lustre_cfg *lcfg = REC_DATA(rec);
613         int cfg_len = REC_DATA_LEN(rec);
614         int rc;
615         ENTRY;
616
617         if (rec->lrh_type != OBD_CFG_REC) {
618                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
619                 RETURN(-EINVAL);
620         }
621
622         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
623         if (rc) {
624                 CERROR("Insane cfg\n");
625                 RETURN(rc);
626         }
627
628         /* We only care about markers */
629         if (lcfg->lcfg_command != LCFG_MARKER)
630                 RETURN(0);
631
632         marker = lustre_cfg_buf(lcfg, 1);
633         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
634             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
635             !(marker->cm_flags & CM_SKIP)) {
636                 /* Found a non-skipped marker match */
637                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
638                        rec->lrh_index, marker->cm_step,
639                        marker->cm_flags, mml->mml_marker.cm_flags,
640                        marker->cm_tgtname, marker->cm_comment);
641                 /* Overwrite the old marker llog entry */
642                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
643                 marker->cm_flags |= mml->mml_marker.cm_flags;
644                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
645                 rc = llog_write(env, llh, rec, rec->lrh_index);
646                 if (!rc)
647                          mml->mml_modified++;
648         }
649
650         RETURN(rc);
651 }
652
653 /**
654  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
655  * Return code:
656  * 0 - modified successfully,
657  * 1 - no modification was done
658  * negative - error
659  */
660 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
661                       struct fs_db *fsdb, struct mgs_target_info *mti,
662                       char *logname, char *devname, char *comment, int flags)
663 {
664         struct llog_handle *loghandle;
665         struct llog_ctxt *ctxt;
666         struct mgs_modify_lookup *mml;
667         int rc;
668
669         ENTRY;
670
671         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
672         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
673                flags);
674
675         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
676         LASSERT(ctxt != NULL);
677         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
678         if (rc < 0) {
679                 if (rc == -ENOENT)
680                         rc = 0;
681                 GOTO(out_pop, rc);
682         }
683
684         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
685         if (rc)
686                 GOTO(out_close, rc);
687
688         if (llog_get_size(loghandle) <= 1)
689                 GOTO(out_close, rc = 0);
690
691         OBD_ALLOC_PTR(mml);
692         if (!mml)
693                 GOTO(out_close, rc = -ENOMEM);
694         if (strlcpy(mml->mml_marker.cm_comment, comment,
695                     sizeof(mml->mml_marker.cm_comment)) >=
696             sizeof(mml->mml_marker.cm_comment))
697                 GOTO(out_free, rc = -E2BIG);
698         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
699                     sizeof(mml->mml_marker.cm_tgtname)) >=
700             sizeof(mml->mml_marker.cm_tgtname))
701                 GOTO(out_free, rc = -E2BIG);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710
711 out_free:
712         OBD_FREE_PTR(mml);
713
714 out_close:
715         llog_close(env, loghandle);
716 out_pop:
717         if (rc < 0)
718                 CERROR("%s: modify %s/%s failed: rc = %d\n",
719                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
720         llog_ctxt_put(ctxt);
721         RETURN(rc);
722 }
723
724 /** This structure is passed to mgs_replace_handler */
725 struct mgs_replace_uuid_lookup {
726         /* Nids are replaced for this target device */
727         struct mgs_target_info target;
728         /* Temporary modified llog */
729         struct llog_handle *temp_llh;
730         /* Flag is set if in target block*/
731         int in_target_device;
732         /* Nids already added. Just skip (multiple nids) */
733         int device_nids_added;
734         /* Flag is set if this block should not be copied */
735         int skip_it;
736 };
737
738 /**
739  * Check: a) if block should be skipped
740  * b) is it target block
741  *
742  * \param[in] lcfg
743  * \param[in] mrul
744  *
745  * \retval 0 should not to be skipped
746  * \retval 1 should to be skipped
747  */
748 static int check_markers(struct lustre_cfg *lcfg,
749                          struct mgs_replace_uuid_lookup *mrul)
750 {
751          struct cfg_marker *marker;
752
753         /* Track markers. Find given device */
754         if (lcfg->lcfg_command == LCFG_MARKER) {
755                 marker = lustre_cfg_buf(lcfg, 1);
756                 /* Clean llog from records marked as CM_EXCLUDE.
757                    CM_SKIP records are used for "active" command
758                    and can be restored if needed */
759                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
760                     (CM_EXCLUDE | CM_START)) {
761                         mrul->skip_it = 1;
762                         return 1;
763                 }
764
765                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
766                     (CM_EXCLUDE | CM_END)) {
767                         mrul->skip_it = 0;
768                         return 1;
769                 }
770
771                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
772                         LASSERT(!(marker->cm_flags & CM_START) ||
773                                 !(marker->cm_flags & CM_END));
774                         if (marker->cm_flags & CM_START) {
775                                 mrul->in_target_device = 1;
776                                 mrul->device_nids_added = 0;
777                         } else if (marker->cm_flags & CM_END)
778                                 mrul->in_target_device = 0;
779                 }
780         }
781
782         return 0;
783 }
784
785 static int record_base(const struct lu_env *env, struct llog_handle *llh,
786                      char *cfgname, lnet_nid_t nid, int cmd,
787                      char *s1, char *s2, char *s3, char *s4)
788 {
789         struct mgs_thread_info  *mgi = mgs_env_info(env);
790         struct llog_cfg_rec     *lcr;
791         int rc;
792
793         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
794                cmd, s1, s2, s3, s4);
795
796         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
797         if (s1)
798                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
799         if (s2)
800                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
801         if (s3)
802                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
803         if (s4)
804                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
805
806         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
807         if (lcr == NULL)
808                 return -ENOMEM;
809
810         lcr->lcr_cfg.lcfg_nid = nid;
811         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
812
813         lustre_cfg_rec_free(lcr);
814
815         if (rc < 0)
816                 CDEBUG(D_MGS,
817                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
818                        cfgname, cmd, s1, s2, s3, s4, rc);
819         return rc;
820 }
821
822 static inline int record_add_uuid(const struct lu_env *env,
823                                   struct llog_handle *llh,
824                                   uint64_t nid, char *uuid)
825 {
826         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
827 }
828
829 static inline int record_add_conn(const struct lu_env *env,
830                                   struct llog_handle *llh,
831                                   char *devname, char *uuid)
832 {
833         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
834 }
835
836 static inline int record_attach(const struct lu_env *env,
837                                 struct llog_handle *llh, char *devname,
838                                 char *type, char *uuid)
839 {
840         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
841 }
842
843 static inline int record_setup(const struct lu_env *env,
844                                struct llog_handle *llh, char *devname,
845                                char *s1, char *s2, char *s3, char *s4)
846 {
847         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
848 }
849
850 /**
851  * \retval <0 record processing error
852  * \retval n record is processed. No need copy original one.
853  * \retval 0 record is not processed.
854  */
855 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
856                            struct mgs_replace_uuid_lookup *mrul)
857 {
858         int nids_added = 0;
859         lnet_nid_t nid;
860         char *ptr;
861         int rc;
862
863         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
864                 /* LCFG_ADD_UUID command found. Let's skip original command
865                    and add passed nids */
866                 ptr = mrul->target.mti_params;
867                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
868                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
869                                "device %s\n", libcfs_nid2str(nid),
870                                 mrul->target.mti_params,
871                                 mrul->target.mti_svname);
872                         rc = record_add_uuid(env,
873                                              mrul->temp_llh, nid,
874                                              mrul->target.mti_params);
875                         if (!rc)
876                                 nids_added++;
877                 }
878
879                 if (nids_added == 0) {
880                         CERROR("No new nids were added, nid %s with uuid %s, "
881                                "device %s\n", libcfs_nid2str(nid),
882                                mrul->target.mti_params,
883                                mrul->target.mti_svname);
884                         RETURN(-ENXIO);
885                 } else {
886                         mrul->device_nids_added = 1;
887                 }
888
889                 return nids_added;
890         }
891
892         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
893                 /* LCFG_SETUP command found. UUID should be changed */
894                 rc = record_setup(env,
895                                   mrul->temp_llh,
896                                   /* devname the same */
897                                   lustre_cfg_string(lcfg, 0),
898                                   /* s1 is not changed */
899                                   lustre_cfg_string(lcfg, 1),
900                                   /* new uuid should be
901                                   the full nidlist */
902                                   mrul->target.mti_params,
903                                   /* s3 is not changed */
904                                   lustre_cfg_string(lcfg, 3),
905                                   /* s4 is not changed */
906                                   lustre_cfg_string(lcfg, 4));
907                 return rc ? rc : 1;
908         }
909
910         /* Another commands in target device block */
911         return 0;
912 }
913
914 /**
915  * Handler that called for every record in llog.
916  * Records are processed in order they placed in llog.
917  *
918  * \param[in] llh       log to be processed
919  * \param[in] rec       current record
920  * \param[in] data      mgs_replace_uuid_lookup structure
921  *
922  * \retval 0    success
923  */
924 static int mgs_replace_handler(const struct lu_env *env,
925                                struct llog_handle *llh,
926                                struct llog_rec_hdr *rec,
927                                void *data)
928 {
929         struct mgs_replace_uuid_lookup *mrul;
930         struct lustre_cfg *lcfg = REC_DATA(rec);
931         int cfg_len = REC_DATA_LEN(rec);
932         int rc;
933         ENTRY;
934
935         mrul = (struct mgs_replace_uuid_lookup *)data;
936
937         if (rec->lrh_type != OBD_CFG_REC) {
938                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
939                        rec->lrh_type, lcfg->lcfg_command,
940                        lustre_cfg_string(lcfg, 0),
941                        lustre_cfg_string(lcfg, 1));
942                 RETURN(-EINVAL);
943         }
944
945         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
946         if (rc) {
947                 /* Do not copy any invalidated records */
948                 GOTO(skip_out, rc = 0);
949         }
950
951         rc = check_markers(lcfg, mrul);
952         if (rc || mrul->skip_it)
953                 GOTO(skip_out, rc = 0);
954
955         /* Write to new log all commands outside target device block */
956         if (!mrul->in_target_device)
957                 GOTO(copy_out, rc = 0);
958
959         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
960            (failover nids) for this target, assuming that if then
961            primary is changing then so is the failover */
962         if (mrul->device_nids_added &&
963             (lcfg->lcfg_command == LCFG_ADD_UUID ||
964              lcfg->lcfg_command == LCFG_ADD_CONN))
965                 GOTO(skip_out, rc = 0);
966
967         rc = process_command(env, lcfg, mrul);
968         if (rc < 0)
969                 RETURN(rc);
970
971         if (rc)
972                 RETURN(0);
973 copy_out:
974         /* Record is placed in temporary llog as is */
975         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
976
977         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
978                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
979                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
980         RETURN(rc);
981
982 skip_out:
983         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
984                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
985                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
986         RETURN(rc);
987 }
988
989 static int mgs_log_is_empty(const struct lu_env *env,
990                             struct mgs_device *mgs, char *name)
991 {
992         struct llog_ctxt        *ctxt;
993         int                      rc;
994
995         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
996         LASSERT(ctxt != NULL);
997
998         rc = llog_is_empty(env, ctxt, name);
999         llog_ctxt_put(ctxt);
1000         return rc;
1001 }
1002
1003 static int mgs_replace_nids_log(const struct lu_env *env,
1004                                 struct obd_device *mgs, struct fs_db *fsdb,
1005                                 char *logname, char *devname, char *nids)
1006 {
1007         struct llog_handle *orig_llh, *backup_llh;
1008         struct llog_ctxt *ctxt;
1009         struct mgs_replace_uuid_lookup *mrul;
1010         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1011         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1012         char *backup;
1013         int rc, rc2;
1014         ENTRY;
1015
1016         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1017
1018         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1019         LASSERT(ctxt != NULL);
1020
1021         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1022                 /* Log is empty. Nothing to replace */
1023                 GOTO(out_put, rc = 0);
1024         }
1025
1026         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1027         if (backup == NULL)
1028                 GOTO(out_put, rc = -ENOMEM);
1029
1030         sprintf(backup, "%s.bak", logname);
1031
1032         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1033         if (rc == 0) {
1034                 /* Now erase original log file. Connections are not allowed.
1035                    Backup is already saved */
1036                 rc = llog_erase(env, ctxt, NULL, logname);
1037                 if (rc < 0)
1038                         GOTO(out_free, rc);
1039         } else if (rc != -ENOENT) {
1040                 CERROR("%s: can't make backup for %s: rc = %d\n",
1041                        mgs->obd_name, logname, rc);
1042                 GOTO(out_free,rc);
1043         }
1044
1045         /* open local log */
1046         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1047         if (rc)
1048                 GOTO(out_restore, rc);
1049
1050         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1051         if (rc)
1052                 GOTO(out_closel, rc);
1053
1054         /* open backup llog */
1055         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1056                        LLOG_OPEN_EXISTS);
1057         if (rc)
1058                 GOTO(out_closel, rc);
1059
1060         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1061         if (rc)
1062                 GOTO(out_close, rc);
1063
1064         if (llog_get_size(backup_llh) <= 1)
1065                 GOTO(out_close, rc = 0);
1066
1067         OBD_ALLOC_PTR(mrul);
1068         if (!mrul)
1069                 GOTO(out_close, rc = -ENOMEM);
1070         /* devname is only needed information to replace UUID records */
1071         strlcpy(mrul->target.mti_svname, devname,
1072                 sizeof(mrul->target.mti_svname));
1073         /* parse nids later */
1074         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1075         /* Copy records to this temporary llog */
1076         mrul->temp_llh = orig_llh;
1077
1078         rc = llog_process(env, backup_llh, mgs_replace_handler,
1079                           (void *)mrul, NULL);
1080         OBD_FREE_PTR(mrul);
1081 out_close:
1082         rc2 = llog_close(NULL, backup_llh);
1083         if (!rc)
1084                 rc = rc2;
1085 out_closel:
1086         rc2 = llog_close(NULL, orig_llh);
1087         if (!rc)
1088                 rc = rc2;
1089
1090 out_restore:
1091         if (rc) {
1092                 CERROR("%s: llog should be restored: rc = %d\n",
1093                        mgs->obd_name, rc);
1094                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1095                                   logname);
1096                 if (rc2 < 0)
1097                         CERROR("%s: can't restore backup %s: rc = %d\n",
1098                                mgs->obd_name, logname, rc2);
1099         }
1100
1101 out_free:
1102         OBD_FREE(backup, strlen(backup) + 1);
1103
1104 out_put:
1105         llog_ctxt_put(ctxt);
1106
1107         if (rc)
1108                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1109                        mgs->obd_name, logname, rc);
1110
1111         RETURN(rc);
1112 }
1113
1114 /**
1115  * Parse device name and get file system name and/or device index
1116  *
1117  * \param[in]   devname device name (ex. lustre-MDT0000)
1118  * \param[out]  fsname  file system name(optional)
1119  * \param[out]  index   device index(optional)
1120  *
1121  * \retval 0    success
1122  */
1123 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1124 {
1125         int rc;
1126         ENTRY;
1127
1128         /* Extract fsname */
1129         if (fsname) {
1130                 rc = server_name2fsname(devname, fsname, NULL);
1131                 if (rc < 0) {
1132                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1133                                devname);
1134                         RETURN(-EINVAL);
1135                 }
1136         }
1137
1138         if (index) {
1139                 rc = server_name2index(devname, index, NULL);
1140                 if (rc < 0) {
1141                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1142                                devname);
1143                         RETURN(-EINVAL);
1144                 }
1145         }
1146
1147         RETURN(0);
1148 }
1149
1150 /* This is only called during replace_nids */
1151 static int only_mgs_is_running(struct obd_device *mgs_obd)
1152 {
1153         /* TDB: Is global variable with devices count exists? */
1154         int num_devices = get_devices_count();
1155         int num_exports = 0;
1156         struct obd_export *exp;
1157
1158         spin_lock(&mgs_obd->obd_dev_lock);
1159         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1160                 /* skip self export */
1161                 if (exp == mgs_obd->obd_self_export)
1162                         continue;
1163                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1164                         continue;
1165
1166                 ++num_exports;
1167
1168                 CERROR("%s: node %s still connected during replace_nids "
1169                        "connect_flags:%llx\n",
1170                        mgs_obd->obd_name,
1171                        libcfs_nid2str(exp->exp_nid_stats->nid),
1172                        exp_connect_flags(exp));
1173
1174         }
1175         spin_unlock(&mgs_obd->obd_dev_lock);
1176
1177         /* osd, MGS and MGC + self_export
1178            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1179         return (num_devices <= 3) && (num_exports == 0);
1180 }
1181
1182 static int name_create_mdt(char **logname, char *fsname, int i)
1183 {
1184         char mdt_index[9];
1185
1186         sprintf(mdt_index, "-MDT%04x", i);
1187         return name_create(logname, fsname, mdt_index);
1188 }
1189
1190 /**
1191  * Replace nids for \a device to \a nids values
1192  *
1193  * \param obd           MGS obd device
1194  * \param devname       nids need to be replaced for this device
1195  * (ex. lustre-OST0000)
1196  * \param nids          nids list (ex. nid1,nid2,nid3)
1197  *
1198  * \retval 0    success
1199  */
1200 int mgs_replace_nids(const struct lu_env *env,
1201                      struct mgs_device *mgs,
1202                      char *devname, char *nids)
1203 {
1204         /* Assume fsname is part of device name */
1205         char fsname[MTI_NAME_MAXLEN];
1206         int rc;
1207         __u32 index;
1208         char *logname;
1209         struct fs_db *fsdb;
1210         unsigned int i;
1211         int conn_state;
1212         struct obd_device *mgs_obd = mgs->mgs_obd;
1213         ENTRY;
1214
1215         /* We can only change NIDs if no other nodes are connected */
1216         spin_lock(&mgs_obd->obd_dev_lock);
1217         conn_state = mgs_obd->obd_no_conn;
1218         mgs_obd->obd_no_conn = 1;
1219         spin_unlock(&mgs_obd->obd_dev_lock);
1220
1221         /* We can not change nids if not only MGS is started */
1222         if (!only_mgs_is_running(mgs_obd)) {
1223                 CERROR("Only MGS is allowed to be started\n");
1224                 GOTO(out, rc = -EINPROGRESS);
1225         }
1226
1227         /* Get fsname and index*/
1228         rc = mgs_parse_devname(devname, fsname, &index);
1229         if (rc)
1230                 GOTO(out, rc);
1231
1232         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1233         if (rc) {
1234                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1235                 GOTO(out, rc);
1236         }
1237
1238         /* Process client llogs */
1239         name_create(&logname, fsname, "-client");
1240         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1241         name_destroy(&logname);
1242         if (rc) {
1243                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1244                        fsname, devname, rc);
1245                 GOTO(out, rc);
1246         }
1247
1248         /* Process MDT llogs */
1249         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1250                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1251                         continue;
1252                 name_create_mdt(&logname, fsname, i);
1253                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1254                 name_destroy(&logname);
1255                 if (rc)
1256                         GOTO(out, rc);
1257         }
1258
1259 out:
1260         spin_lock(&mgs_obd->obd_dev_lock);
1261         mgs_obd->obd_no_conn = conn_state;
1262         spin_unlock(&mgs_obd->obd_dev_lock);
1263
1264         RETURN(rc);
1265 }
1266
1267 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1268                             char *devname, struct lov_desc *desc)
1269 {
1270         struct mgs_thread_info  *mgi = mgs_env_info(env);
1271         struct llog_cfg_rec     *lcr;
1272         int rc;
1273
1274         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1275         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1276         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1277         if (lcr == NULL)
1278                 return -ENOMEM;
1279
1280         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1281         lustre_cfg_rec_free(lcr);
1282         return rc;
1283 }
1284
1285 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1286                             char *devname, struct lmv_desc *desc)
1287 {
1288         struct mgs_thread_info  *mgi = mgs_env_info(env);
1289         struct llog_cfg_rec     *lcr;
1290         int rc;
1291
1292         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1293         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1294         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1295         if (lcr == NULL)
1296                 return -ENOMEM;
1297
1298         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1299         lustre_cfg_rec_free(lcr);
1300         return rc;
1301 }
1302
1303 static inline int record_mdc_add(const struct lu_env *env,
1304                                  struct llog_handle *llh,
1305                                  char *logname, char *mdcuuid,
1306                                  char *mdtuuid, char *index,
1307                                  char *gen)
1308 {
1309         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1310                            mdtuuid,index,gen,mdcuuid);
1311 }
1312
1313 static inline int record_lov_add(const struct lu_env *env,
1314                                  struct llog_handle *llh,
1315                                  char *lov_name, char *ost_uuid,
1316                                  char *index, char *gen)
1317 {
1318         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1319                            ost_uuid, index, gen, 0);
1320 }
1321
1322 static inline int record_mount_opt(const struct lu_env *env,
1323                                    struct llog_handle *llh,
1324                                    char *profile, char *lov_name,
1325                                    char *mdc_name)
1326 {
1327         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1328                            profile,lov_name,mdc_name,0);
1329 }
1330
1331 static int record_marker(const struct lu_env *env,
1332                          struct llog_handle *llh,
1333                          struct fs_db *fsdb, __u32 flags,
1334                          char *tgtname, char *comment)
1335 {
1336         struct mgs_thread_info *mgi = mgs_env_info(env);
1337         struct llog_cfg_rec *lcr;
1338         int rc;
1339         int cplen = 0;
1340
1341         if (flags & CM_START)
1342                 fsdb->fsdb_gen++;
1343         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1344         mgi->mgi_marker.cm_flags = flags;
1345         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1346         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1347                         sizeof(mgi->mgi_marker.cm_tgtname));
1348         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1349                 return -E2BIG;
1350         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1351                         sizeof(mgi->mgi_marker.cm_comment));
1352         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1353                 return -E2BIG;
1354         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1355         mgi->mgi_marker.cm_canceltime = 0;
1356         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1357         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1358                             sizeof(mgi->mgi_marker));
1359         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1360         if (lcr == NULL)
1361                 return -ENOMEM;
1362
1363         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1364         lustre_cfg_rec_free(lcr);
1365         return rc;
1366 }
1367
1368 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1369                             struct llog_handle **llh, char *name)
1370 {
1371         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1372         struct llog_ctxt        *ctxt;
1373         int                      rc = 0;
1374         ENTRY;
1375
1376         if (*llh)
1377                 GOTO(out, rc = -EBUSY);
1378
1379         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1380         if (!ctxt)
1381                 GOTO(out, rc = -ENODEV);
1382         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1383
1384         rc = llog_open_create(env, ctxt, llh, NULL, name);
1385         if (rc)
1386                 GOTO(out_ctxt, rc);
1387         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1388         if (rc)
1389                 llog_close(env, *llh);
1390 out_ctxt:
1391         llog_ctxt_put(ctxt);
1392 out:
1393         if (rc) {
1394                 CERROR("%s: can't start log %s: rc = %d\n",
1395                        mgs->mgs_obd->obd_name, name, rc);
1396                 *llh = NULL;
1397         }
1398         RETURN(rc);
1399 }
1400
1401 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1402 {
1403         int rc;
1404
1405         rc = llog_close(env, *llh);
1406         *llh = NULL;
1407
1408         return rc;
1409 }
1410
1411 /******************** config "macros" *********************/
1412
1413 /* write an lcfg directly into a log (with markers) */
1414 static int mgs_write_log_direct(const struct lu_env *env,
1415                                 struct mgs_device *mgs, struct fs_db *fsdb,
1416                                 char *logname, struct llog_cfg_rec *lcr,
1417                                 char *devname, char *comment)
1418 {
1419         struct llog_handle *llh = NULL;
1420         int rc;
1421
1422         ENTRY;
1423
1424         rc = record_start_log(env, mgs, &llh, logname);
1425         if (rc)
1426                 RETURN(rc);
1427
1428         /* FIXME These should be a single journal transaction */
1429         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1430         if (rc)
1431                 GOTO(out_end, rc);
1432         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1433         if (rc)
1434                 GOTO(out_end, rc);
1435         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1436         if (rc)
1437                 GOTO(out_end, rc);
1438 out_end:
1439         record_end_log(env, &llh);
1440         RETURN(rc);
1441 }
1442
1443 /* write the lcfg in all logs for the given fs */
1444 int mgs_write_log_direct_all(const struct lu_env *env, struct mgs_device *mgs,
1445                              struct fs_db *fsdb, struct mgs_target_info *mti,
1446                              struct llog_cfg_rec *lcr, char *devname,
1447                              char *comment, int server_only)
1448 {
1449         struct list_head         log_list;
1450         struct mgs_direntry     *dirent, *n;
1451         char                    *fsname = mti->mti_fsname;
1452         int                      rc = 0, len = strlen(fsname);
1453
1454         ENTRY;
1455         /* Find all the logs in the CONFIGS directory */
1456         rc = class_dentry_readdir(env, mgs, &log_list);
1457         if (rc)
1458                 RETURN(rc);
1459
1460         /* Could use fsdb index maps instead of directory listing */
1461         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1462                 list_del_init(&dirent->mde_list);
1463                 /* don't write to sptlrpc rule log */
1464                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1465                         goto next;
1466
1467                 /* caller wants write server logs only */
1468                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1469                         goto next;
1470
1471                 if (strncmp(fsname, dirent->mde_name, len) != 0)
1472                         goto next;
1473
1474                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1475                 /* Erase any old settings of this same parameter */
1476                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1477                                 devname, comment, CM_SKIP);
1478                 if (rc < 0)
1479                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1480                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1481                 if (lcr == NULL)
1482                         goto next;
1483                 /* Write the new one */
1484                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1485                                           lcr, devname, comment);
1486                 if (rc != 0)
1487                         CERROR("%s: writing log %s: rc = %d\n",
1488                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1489 next:
1490                 mgs_direntry_free(dirent);
1491         }
1492
1493         RETURN(rc);
1494 }
1495
1496 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1497                                     struct mgs_device *mgs,
1498                                     struct fs_db *fsdb,
1499                                     struct mgs_target_info *mti,
1500                                     int index, char *logname);
1501 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1502                                     struct mgs_device *mgs,
1503                                     struct fs_db *fsdb,
1504                                     struct mgs_target_info *mti,
1505                                     char *logname, char *suffix, char *lovname,
1506                                     enum lustre_sec_part sec_part, int flags);
1507 static int name_create_mdt_and_lov(char **logname, char **lovname,
1508                                    struct fs_db *fsdb, int i);
1509
1510 static int add_param(char *params, char *key, char *val)
1511 {
1512         char *start = params + strlen(params);
1513         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1514         int keylen = 0;
1515
1516         if (key != NULL)
1517                 keylen = strlen(key);
1518         if (start + 1 + keylen + strlen(val) >= end) {
1519                 CERROR("params are too long: %s %s%s\n",
1520                        params, key != NULL ? key : "", val);
1521                 return -EINVAL;
1522         }
1523
1524         sprintf(start, " %s%s", key != NULL ? key : "", val);
1525         return 0;
1526 }
1527
1528 /**
1529  * Walk through client config log record and convert the related records
1530  * into the target.
1531  **/
1532 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1533                                          struct llog_handle *llh,
1534                                          struct llog_rec_hdr *rec, void *data)
1535 {
1536         struct mgs_device *mgs;
1537         struct obd_device *obd;
1538         struct mgs_target_info *mti, *tmti;
1539         struct fs_db *fsdb;
1540         int cfg_len = rec->lrh_len;
1541         char *cfg_buf = (char*) (rec + 1);
1542         struct lustre_cfg *lcfg;
1543         int rc = 0;
1544         struct llog_handle *mdt_llh = NULL;
1545         static int got_an_osc_or_mdc = 0;
1546         /* 0: not found any osc/mdc;
1547            1: found osc;
1548            2: found mdc;
1549         */
1550         static int last_step = -1;
1551         int cplen = 0;
1552
1553         ENTRY;
1554
1555         mti = ((struct temp_comp*)data)->comp_mti;
1556         tmti = ((struct temp_comp*)data)->comp_tmti;
1557         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1558         obd = ((struct temp_comp *)data)->comp_obd;
1559         mgs = lu2mgs_dev(obd->obd_lu_dev);
1560         LASSERT(mgs);
1561
1562         if (rec->lrh_type != OBD_CFG_REC) {
1563                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1564                 RETURN(-EINVAL);
1565         }
1566
1567         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1568         if (rc) {
1569                 CERROR("Insane cfg\n");
1570                 RETURN(rc);
1571         }
1572
1573         lcfg = (struct lustre_cfg *)cfg_buf;
1574
1575         if (lcfg->lcfg_command == LCFG_MARKER) {
1576                 struct cfg_marker *marker;
1577                 marker = lustre_cfg_buf(lcfg, 1);
1578                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1579                     (marker->cm_flags & CM_START) &&
1580                      !(marker->cm_flags & CM_SKIP)) {
1581                         got_an_osc_or_mdc = 1;
1582                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1583                                         sizeof(tmti->mti_svname));
1584                         if (cplen >= sizeof(tmti->mti_svname))
1585                                 RETURN(-E2BIG);
1586                         rc = record_start_log(env, mgs, &mdt_llh,
1587                                               mti->mti_svname);
1588                         if (rc)
1589                                 RETURN(rc);
1590                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1591                                            mti->mti_svname, "add osc(copied)");
1592                         record_end_log(env, &mdt_llh);
1593                         last_step = marker->cm_step;
1594                         RETURN(rc);
1595                 }
1596                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1597                     (marker->cm_flags & CM_END) &&
1598                      !(marker->cm_flags & CM_SKIP)) {
1599                         LASSERT(last_step == marker->cm_step);
1600                         last_step = -1;
1601                         got_an_osc_or_mdc = 0;
1602                         memset(tmti, 0, sizeof(*tmti));
1603                         rc = record_start_log(env, mgs, &mdt_llh,
1604                                               mti->mti_svname);
1605                         if (rc)
1606                                 RETURN(rc);
1607                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1608                                            mti->mti_svname, "add osc(copied)");
1609                         record_end_log(env, &mdt_llh);
1610                         RETURN(rc);
1611                 }
1612                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1613                     (marker->cm_flags & CM_START) &&
1614                      !(marker->cm_flags & CM_SKIP)) {
1615                         got_an_osc_or_mdc = 2;
1616                         last_step = marker->cm_step;
1617                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1618                                strlen(marker->cm_tgtname));
1619
1620                         RETURN(rc);
1621                 }
1622                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1623                     (marker->cm_flags & CM_END) &&
1624                      !(marker->cm_flags & CM_SKIP)) {
1625                         LASSERT(last_step == marker->cm_step);
1626                         last_step = -1;
1627                         got_an_osc_or_mdc = 0;
1628                         memset(tmti, 0, sizeof(*tmti));
1629                         RETURN(rc);
1630                 }
1631         }
1632
1633         if (got_an_osc_or_mdc == 0 || last_step < 0)
1634                 RETURN(rc);
1635
1636         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1637                 uint64_t nodenid = lcfg->lcfg_nid;
1638
1639                 if (strlen(tmti->mti_uuid) == 0) {
1640                         /* target uuid not set, this config record is before
1641                          * LCFG_SETUP, this nid is one of target node nid.
1642                          */
1643                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1644                         tmti->mti_nid_count++;
1645                 } else {
1646                         /* failover node nid */
1647                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1648                                        libcfs_nid2str(nodenid));
1649                 }
1650
1651                 RETURN(rc);
1652         }
1653
1654         if (lcfg->lcfg_command == LCFG_SETUP) {
1655                 char *target;
1656
1657                 target = lustre_cfg_string(lcfg, 1);
1658                 memcpy(tmti->mti_uuid, target, strlen(target));
1659                 RETURN(rc);
1660         }
1661
1662         /* ignore client side sptlrpc_conf_log */
1663         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1664                 RETURN(rc);
1665
1666         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1667                 int index;
1668
1669                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1670                         RETURN (-EINVAL);
1671
1672                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1673                        strlen(mti->mti_fsname));
1674                 tmti->mti_stripe_index = index;
1675
1676                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1677                                               mti->mti_stripe_index,
1678                                               mti->mti_svname);
1679                 memset(tmti, 0, sizeof(*tmti));
1680                 RETURN(rc);
1681         }
1682
1683         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1684                 int index;
1685                 char mdt_index[9];
1686                 char *logname, *lovname;
1687
1688                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1689                                              mti->mti_stripe_index);
1690                 if (rc)
1691                         RETURN(rc);
1692                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1693
1694                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1695                         name_destroy(&logname);
1696                         name_destroy(&lovname);
1697                         RETURN(-EINVAL);
1698                 }
1699
1700                 tmti->mti_stripe_index = index;
1701                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1702                                          mdt_index, lovname,
1703                                          LUSTRE_SP_MDT, 0);
1704                 name_destroy(&logname);
1705                 name_destroy(&lovname);
1706                 RETURN(rc);
1707         }
1708         RETURN(rc);
1709 }
1710
1711 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1712 /* stealed from mgs_get_fsdb_from_llog*/
1713 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1714                                               struct mgs_device *mgs,
1715                                               char *client_name,
1716                                               struct temp_comp* comp)
1717 {
1718         struct llog_handle *loghandle;
1719         struct mgs_target_info *tmti;
1720         struct llog_ctxt *ctxt;
1721         int rc;
1722
1723         ENTRY;
1724
1725         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1726         LASSERT(ctxt != NULL);
1727
1728         OBD_ALLOC_PTR(tmti);
1729         if (tmti == NULL)
1730                 GOTO(out_ctxt, rc = -ENOMEM);
1731
1732         comp->comp_tmti = tmti;
1733         comp->comp_obd = mgs->mgs_obd;
1734
1735         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1736                        LLOG_OPEN_EXISTS);
1737         if (rc < 0) {
1738                 if (rc == -ENOENT)
1739                         rc = 0;
1740                 GOTO(out_pop, rc);
1741         }
1742
1743         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1744         if (rc)
1745                 GOTO(out_close, rc);
1746
1747         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1748                                   (void *)comp, NULL, false);
1749         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1750 out_close:
1751         llog_close(env, loghandle);
1752 out_pop:
1753         OBD_FREE_PTR(tmti);
1754 out_ctxt:
1755         llog_ctxt_put(ctxt);
1756         RETURN(rc);
1757 }
1758
1759 /* lmv is the second thing for client logs */
1760 /* copied from mgs_write_log_lov. Please refer to that.  */
1761 static int mgs_write_log_lmv(const struct lu_env *env,
1762                              struct mgs_device *mgs,
1763                              struct fs_db *fsdb,
1764                              struct mgs_target_info *mti,
1765                              char *logname, char *lmvname)
1766 {
1767         struct llog_handle *llh = NULL;
1768         struct lmv_desc *lmvdesc;
1769         char *uuid;
1770         int rc = 0;
1771         ENTRY;
1772
1773         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1774
1775         OBD_ALLOC_PTR(lmvdesc);
1776         if (lmvdesc == NULL)
1777                 RETURN(-ENOMEM);
1778         lmvdesc->ld_active_tgt_count = 0;
1779         lmvdesc->ld_tgt_count = 0;
1780         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1781         uuid = (char *)lmvdesc->ld_uuid.uuid;
1782
1783         rc = record_start_log(env, mgs, &llh, logname);
1784         if (rc)
1785                 GOTO(out_free, rc);
1786         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1787         if (rc)
1788                 GOTO(out_end, rc);
1789         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1790         if (rc)
1791                 GOTO(out_end, rc);
1792         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1793         if (rc)
1794                 GOTO(out_end, rc);
1795         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1796         if (rc)
1797                 GOTO(out_end, rc);
1798 out_end:
1799         record_end_log(env, &llh);
1800 out_free:
1801         OBD_FREE_PTR(lmvdesc);
1802         RETURN(rc);
1803 }
1804
1805 /* lov is the first thing in the mdt and client logs */
1806 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1807                              struct fs_db *fsdb, struct mgs_target_info *mti,
1808                              char *logname, char *lovname)
1809 {
1810         struct llog_handle *llh = NULL;
1811         struct lov_desc *lovdesc;
1812         char *uuid;
1813         int rc = 0;
1814         ENTRY;
1815
1816         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1817
1818         /*
1819         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1820         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1821               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1822         */
1823
1824         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1825         OBD_ALLOC_PTR(lovdesc);
1826         if (lovdesc == NULL)
1827                 RETURN(-ENOMEM);
1828         lovdesc->ld_magic = LOV_DESC_MAGIC;
1829         lovdesc->ld_tgt_count = 0;
1830         /* Defaults.  Can be changed later by lcfg config_param */
1831         lovdesc->ld_default_stripe_count = 1;
1832         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1833         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1834         lovdesc->ld_default_stripe_offset = -1;
1835         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1836         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1837         /* can these be the same? */
1838         uuid = (char *)lovdesc->ld_uuid.uuid;
1839
1840         /* This should always be the first entry in a log.
1841         rc = mgs_clear_log(obd, logname); */
1842         rc = record_start_log(env, mgs, &llh, logname);
1843         if (rc)
1844                 GOTO(out_free, rc);
1845         /* FIXME these should be a single journal transaction */
1846         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1847         if (rc)
1848                 GOTO(out_end, rc);
1849         rc = record_attach(env, llh, lovname, "lov", uuid);
1850         if (rc)
1851                 GOTO(out_end, rc);
1852         rc = record_lov_setup(env, llh, lovname, lovdesc);
1853         if (rc)
1854                 GOTO(out_end, rc);
1855         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1856         if (rc)
1857                 GOTO(out_end, rc);
1858         EXIT;
1859 out_end:
1860         record_end_log(env, &llh);
1861 out_free:
1862         OBD_FREE_PTR(lovdesc);
1863         return rc;
1864 }
1865
1866 /* add failnids to open log */
1867 static int mgs_write_log_failnids(const struct lu_env *env,
1868                                   struct mgs_target_info *mti,
1869                                   struct llog_handle *llh,
1870                                   char *cliname)
1871 {
1872         char *failnodeuuid = NULL;
1873         char *ptr = mti->mti_params;
1874         lnet_nid_t nid;
1875         int rc = 0;
1876
1877         /*
1878         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1879         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1880         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1881         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1882         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1883         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1884         */
1885
1886         /* Pull failnid info out of params string */
1887         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1889                         if (failnodeuuid == NULL) {
1890                                 /* We don't know the failover node name,
1891                                    so just use the first nid as the uuid */
1892                                 rc = name_create(&failnodeuuid,
1893                                                  libcfs_nid2str(nid), "");
1894                                 if (rc)
1895                                         return rc;
1896                         }
1897                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1898                                "client %s\n", libcfs_nid2str(nid),
1899                                failnodeuuid, cliname);
1900                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1901                 }
1902                 if (failnodeuuid) {
1903                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1904                         name_destroy(&failnodeuuid);
1905                         failnodeuuid = NULL;
1906                 }
1907         }
1908
1909         return rc;
1910 }
1911
1912 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1913                                     struct mgs_device *mgs,
1914                                     struct fs_db *fsdb,
1915                                     struct mgs_target_info *mti,
1916                                     char *logname, char *lmvname)
1917 {
1918         struct llog_handle *llh = NULL;
1919         char *mdcname = NULL;
1920         char *nodeuuid = NULL;
1921         char *mdcuuid = NULL;
1922         char *lmvuuid = NULL;
1923         char index[6];
1924         int i, rc;
1925         ENTRY;
1926
1927         if (mgs_log_is_empty(env, mgs, logname)) {
1928                 CERROR("log is empty! Logical error\n");
1929                 RETURN(-EINVAL);
1930         }
1931
1932         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1933                mti->mti_svname, logname, lmvname);
1934
1935         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1936         if (rc)
1937                 RETURN(rc);
1938         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1939         if (rc)
1940                 GOTO(out_free, rc);
1941         rc = name_create(&mdcuuid, mdcname, "_UUID");
1942         if (rc)
1943                 GOTO(out_free, rc);
1944         rc = name_create(&lmvuuid, lmvname, "_UUID");
1945         if (rc)
1946                 GOTO(out_free, rc);
1947
1948         rc = record_start_log(env, mgs, &llh, logname);
1949         if (rc)
1950                 GOTO(out_free, rc);
1951         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1952                            "add mdc");
1953         if (rc)
1954                 GOTO(out_end, rc);
1955         for (i = 0; i < mti->mti_nid_count; i++) {
1956                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1957                        libcfs_nid2str(mti->mti_nids[i]));
1958
1959                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1960                 if (rc)
1961                         GOTO(out_end, rc);
1962         }
1963
1964         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1965         if (rc)
1966                 GOTO(out_end, rc);
1967         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1968         if (rc)
1969                 GOTO(out_end, rc);
1970         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1971         if (rc)
1972                 GOTO(out_end, rc);
1973         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1974         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1975                             index, "1");
1976         if (rc)
1977                 GOTO(out_end, rc);
1978         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1979                            "add mdc");
1980         if (rc)
1981                 GOTO(out_end, rc);
1982 out_end:
1983         record_end_log(env, &llh);
1984 out_free:
1985         name_destroy(&lmvuuid);
1986         name_destroy(&mdcuuid);
1987         name_destroy(&mdcname);
1988         name_destroy(&nodeuuid);
1989         RETURN(rc);
1990 }
1991
1992 static inline int name_create_lov(char **lovname, char *mdtname,
1993                                   struct fs_db *fsdb, int index)
1994 {
1995         /* COMPAT_180 */
1996         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1997                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
1998         else
1999                 return name_create(lovname, mdtname, "-mdtlov");
2000 }
2001
2002 static int name_create_mdt_and_lov(char **logname, char **lovname,
2003                                    struct fs_db *fsdb, int i)
2004 {
2005         int rc;
2006
2007         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2008         if (rc)
2009                 return rc;
2010         /* COMPAT_180 */
2011         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2012                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2013         else
2014                 rc = name_create(lovname, *logname, "-mdtlov");
2015         if (rc) {
2016                 name_destroy(logname);
2017                 *logname = NULL;
2018         }
2019         return rc;
2020 }
2021
2022 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2023                                       struct fs_db *fsdb, int i)
2024 {
2025         char suffix[16];
2026
2027         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2028                 sprintf(suffix, "-osc");
2029         else
2030                 sprintf(suffix, "-osc-MDT%04x", i);
2031         return name_create(oscname, ostname, suffix);
2032 }
2033
2034 /* add new mdc to already existent MDS */
2035 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2036                                     struct mgs_device *mgs,
2037                                     struct fs_db *fsdb,
2038                                     struct mgs_target_info *mti,
2039                                     int mdt_index, char *logname)
2040 {
2041         struct llog_handle      *llh = NULL;
2042         char    *nodeuuid = NULL;
2043         char    *ospname = NULL;
2044         char    *lovuuid = NULL;
2045         char    *mdtuuid = NULL;
2046         char    *svname = NULL;
2047         char    *mdtname = NULL;
2048         char    *lovname = NULL;
2049         char    index_str[16];
2050         int     i, rc;
2051
2052         ENTRY;
2053         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2054                 CERROR("log is empty! Logical error\n");
2055                 RETURN (-EINVAL);
2056         }
2057
2058         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2059                logname);
2060
2061         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2062         if (rc)
2063                 RETURN(rc);
2064
2065         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2066         if (rc)
2067                 GOTO(out_destory, rc);
2068
2069         rc = name_create(&svname, mdtname, "-osp");
2070         if (rc)
2071                 GOTO(out_destory, rc);
2072
2073         sprintf(index_str, "-MDT%04x", mdt_index);
2074         rc = name_create(&ospname, svname, index_str);
2075         if (rc)
2076                 GOTO(out_destory, rc);
2077
2078         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2079         if (rc)
2080                 GOTO(out_destory, rc);
2081
2082         rc = name_create(&lovuuid, lovname, "_UUID");
2083         if (rc)
2084                 GOTO(out_destory, rc);
2085
2086         rc = name_create(&mdtuuid, mdtname, "_UUID");
2087         if (rc)
2088                 GOTO(out_destory, rc);
2089
2090         rc = record_start_log(env, mgs, &llh, logname);
2091         if (rc)
2092                 GOTO(out_destory, rc);
2093
2094         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2095                            "add osp");
2096         if (rc)
2097                 GOTO(out_destory, rc);
2098
2099         for (i = 0; i < mti->mti_nid_count; i++) {
2100                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2101                        libcfs_nid2str(mti->mti_nids[i]));
2102                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2103                 if (rc)
2104                         GOTO(out_end, rc);
2105         }
2106
2107         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2108         if (rc)
2109                 GOTO(out_end, rc);
2110
2111         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2112                           NULL, NULL);
2113         if (rc)
2114                 GOTO(out_end, rc);
2115
2116         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2117         if (rc)
2118                 GOTO(out_end, rc);
2119
2120         /* Add mdc(osp) to lod */
2121         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2122                  mti->mti_stripe_index);
2123         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2124                          index_str, "1", NULL);
2125         if (rc)
2126                 GOTO(out_end, rc);
2127
2128         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2129         if (rc)
2130                 GOTO(out_end, rc);
2131
2132 out_end:
2133         record_end_log(env, &llh);
2134
2135 out_destory:
2136         name_destroy(&mdtuuid);
2137         name_destroy(&lovuuid);
2138         name_destroy(&lovname);
2139         name_destroy(&ospname);
2140         name_destroy(&svname);
2141         name_destroy(&nodeuuid);
2142         name_destroy(&mdtname);
2143         RETURN(rc);
2144 }
2145
2146 static int mgs_write_log_mdt0(const struct lu_env *env,
2147                               struct mgs_device *mgs,
2148                               struct fs_db *fsdb,
2149                               struct mgs_target_info *mti)
2150 {
2151         char *log = mti->mti_svname;
2152         struct llog_handle *llh = NULL;
2153         char *uuid, *lovname;
2154         char mdt_index[6];
2155         char *ptr = mti->mti_params;
2156         int rc = 0, failout = 0;
2157         ENTRY;
2158
2159         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2160         if (uuid == NULL)
2161                 RETURN(-ENOMEM);
2162
2163         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2164                 failout = (strncmp(ptr, "failout", 7) == 0);
2165
2166         rc = name_create(&lovname, log, "-mdtlov");
2167         if (rc)
2168                 GOTO(out_free, rc);
2169         if (mgs_log_is_empty(env, mgs, log)) {
2170                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2171                 if (rc)
2172                         GOTO(out_lod, rc);
2173         }
2174
2175         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2176
2177         rc = record_start_log(env, mgs, &llh, log);
2178         if (rc)
2179                 GOTO(out_lod, rc);
2180
2181         /* add MDT itself */
2182
2183         /* FIXME this whole fn should be a single journal transaction */
2184         sprintf(uuid, "%s_UUID", log);
2185         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2186         if (rc)
2187                 GOTO(out_lod, rc);
2188         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2189         if (rc)
2190                 GOTO(out_end, rc);
2191         rc = record_mount_opt(env, llh, log, lovname, NULL);
2192         if (rc)
2193                 GOTO(out_end, rc);
2194         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2195                         failout ? "n" : "f");
2196         if (rc)
2197                 GOTO(out_end, rc);
2198         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2199         if (rc)
2200                 GOTO(out_end, rc);
2201 out_end:
2202         record_end_log(env, &llh);
2203 out_lod:
2204         name_destroy(&lovname);
2205 out_free:
2206         OBD_FREE(uuid, sizeof(struct obd_uuid));
2207         RETURN(rc);
2208 }
2209
2210 /* envelope method for all layers log */
2211 static int mgs_write_log_mdt(const struct lu_env *env,
2212                              struct mgs_device *mgs,
2213                              struct fs_db *fsdb,
2214                              struct mgs_target_info *mti)
2215 {
2216         struct mgs_thread_info *mgi = mgs_env_info(env);
2217         struct llog_handle *llh = NULL;
2218         char *cliname;
2219         int rc, i = 0;
2220         ENTRY;
2221
2222         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2223
2224         if (mti->mti_uuid[0] == '\0') {
2225                 /* Make up our own uuid */
2226                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2227                          "%s_UUID", mti->mti_svname);
2228         }
2229
2230         /* add mdt */
2231         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2232         if (rc)
2233                 RETURN(rc);
2234         /* Append the mdt info to the client log */
2235         rc = name_create(&cliname, mti->mti_fsname, "-client");
2236         if (rc)
2237                 RETURN(rc);
2238
2239         if (mgs_log_is_empty(env, mgs, cliname)) {
2240                 /* Start client log */
2241                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2242                                        fsdb->fsdb_clilov);
2243                 if (rc)
2244                         GOTO(out_free, rc);
2245                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2246                                        fsdb->fsdb_clilmv);
2247                 if (rc)
2248                         GOTO(out_free, rc);
2249         }
2250
2251         /*
2252         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2253         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2254         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2255         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2256         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2257         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2258         */
2259
2260                 /* copy client info about lov/lmv */
2261                 mgi->mgi_comp.comp_mti = mti;
2262                 mgi->mgi_comp.comp_fsdb = fsdb;
2263
2264                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2265                                                         &mgi->mgi_comp);
2266                 if (rc)
2267                         GOTO(out_free, rc);
2268                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2269                                               fsdb->fsdb_clilmv);
2270                 if (rc)
2271                         GOTO(out_free, rc);
2272
2273                 /* add mountopts */
2274                 rc = record_start_log(env, mgs, &llh, cliname);
2275                 if (rc)
2276                         GOTO(out_free, rc);
2277
2278                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2279                                    "mount opts");
2280                 if (rc)
2281                         GOTO(out_end, rc);
2282                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2283                                       fsdb->fsdb_clilmv);
2284                 if (rc)
2285                         GOTO(out_end, rc);
2286                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2287                                    "mount opts");
2288
2289         if (rc)
2290                 GOTO(out_end, rc);
2291
2292         /* for_all_existing_mdt except current one */
2293         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2294                 if (i !=  mti->mti_stripe_index &&
2295                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2296                         char *logname;
2297
2298                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2299                         if (rc)
2300                                 GOTO(out_end, rc);
2301
2302                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2303                                                       i, logname);
2304                         name_destroy(&logname);
2305                         if (rc)
2306                                 GOTO(out_end, rc);
2307                 }
2308         }
2309 out_end:
2310         record_end_log(env, &llh);
2311 out_free:
2312         name_destroy(&cliname);
2313         RETURN(rc);
2314 }
2315
2316 /* Add the ost info to the client/mdt lov */
2317 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2318                                     struct mgs_device *mgs, struct fs_db *fsdb,
2319                                     struct mgs_target_info *mti,
2320                                     char *logname, char *suffix, char *lovname,
2321                                     enum lustre_sec_part sec_part, int flags)
2322 {
2323         struct llog_handle *llh = NULL;
2324         char *nodeuuid = NULL;
2325         char *oscname = NULL;
2326         char *oscuuid = NULL;
2327         char *lovuuid = NULL;
2328         char *svname = NULL;
2329         char index[6];
2330         int i, rc;
2331
2332         ENTRY;
2333         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2334                mti->mti_svname, logname);
2335
2336         if (mgs_log_is_empty(env, mgs, logname)) {
2337                 CERROR("log is empty! Logical error\n");
2338                 RETURN (-EINVAL);
2339         }
2340
2341         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2342         if (rc)
2343                 RETURN(rc);
2344         rc = name_create(&svname, mti->mti_svname, "-osc");
2345         if (rc)
2346                 GOTO(out_free, rc);
2347
2348         /* for the system upgraded from old 1.8, keep using the old osc naming
2349          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2350         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2351                 rc = name_create(&oscname, svname, "");
2352         else
2353                 rc = name_create(&oscname, svname, suffix);
2354         if (rc)
2355                 GOTO(out_free, rc);
2356
2357         rc = name_create(&oscuuid, oscname, "_UUID");
2358         if (rc)
2359                 GOTO(out_free, rc);
2360         rc = name_create(&lovuuid, lovname, "_UUID");
2361         if (rc)
2362                 GOTO(out_free, rc);
2363
2364
2365         /*
2366         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2367         multihomed (#4)
2368         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2369         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2370         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2371         failover (#6,7)
2372         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2373         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2374         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2375         */
2376
2377         rc = record_start_log(env, mgs, &llh, logname);
2378         if (rc)
2379                 GOTO(out_free, rc);
2380
2381         /* FIXME these should be a single journal transaction */
2382         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2383                            "add osc");
2384         if (rc)
2385                 GOTO(out_end, rc);
2386
2387         /* NB: don't change record order, because upon MDT steal OSC config
2388          * from client, it treats all nids before LCFG_SETUP as target nids
2389          * (multiple interfaces), while nids after as failover node nids.
2390          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2391          */
2392         for (i = 0; i < mti->mti_nid_count; i++) {
2393                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2394                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2395                 if (rc)
2396                         GOTO(out_end, rc);
2397         }
2398         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2399         if (rc)
2400                 GOTO(out_end, rc);
2401         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2402         if (rc)
2403                 GOTO(out_end, rc);
2404         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2405         if (rc)
2406                 GOTO(out_end, rc);
2407
2408         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2409
2410         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2411         if (rc)
2412                 GOTO(out_end, rc);
2413         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2414                            "add osc");
2415         if (rc)
2416                 GOTO(out_end, rc);
2417 out_end:
2418         record_end_log(env, &llh);
2419 out_free:
2420         name_destroy(&lovuuid);
2421         name_destroy(&oscuuid);
2422         name_destroy(&oscname);
2423         name_destroy(&svname);
2424         name_destroy(&nodeuuid);
2425         RETURN(rc);
2426 }
2427
2428 static int mgs_write_log_ost(const struct lu_env *env,
2429                              struct mgs_device *mgs, struct fs_db *fsdb,
2430                              struct mgs_target_info *mti)
2431 {
2432         struct llog_handle *llh = NULL;
2433         char *logname, *lovname;
2434         char *ptr = mti->mti_params;
2435         int rc, flags = 0, failout = 0, i;
2436         ENTRY;
2437
2438         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2439
2440         /* The ost startup log */
2441
2442         /* If the ost log already exists, that means that someone reformatted
2443            the ost and it called target_add again. */
2444         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2445                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2446                                    "exists, yet the server claims it never "
2447                                    "registered. It may have been reformatted, "
2448                                    "or the index changed. writeconf the MDT to "
2449                                    "regenerate all logs.\n", mti->mti_svname);
2450                 RETURN(-EALREADY);
2451         }
2452
2453         /*
2454         attach obdfilter ost1 ost1_UUID
2455         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2456         */
2457         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2458                 failout = (strncmp(ptr, "failout", 7) == 0);
2459         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2460         if (rc)
2461                 RETURN(rc);
2462         /* FIXME these should be a single journal transaction */
2463         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2464         if (rc)
2465                 GOTO(out_end, rc);
2466         if (*mti->mti_uuid == '\0')
2467                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2468                          "%s_UUID", mti->mti_svname);
2469         rc = record_attach(env, llh, mti->mti_svname,
2470                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2471         if (rc)
2472                 GOTO(out_end, rc);
2473         rc = record_setup(env, llh, mti->mti_svname,
2474                           "dev"/*ignored*/, "type"/*ignored*/,
2475                           failout ? "n" : "f", 0/*options*/);
2476         if (rc)
2477                 GOTO(out_end, rc);
2478         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2479         if (rc)
2480                 GOTO(out_end, rc);
2481 out_end:
2482         record_end_log(env, &llh);
2483         if (rc)
2484                 RETURN(rc);
2485         /* We also have to update the other logs where this osc is part of
2486            the lov */
2487
2488         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2489                 /* If we're upgrading, the old mdt log already has our
2490                    entry. Let's do a fake one for fun. */
2491                 /* Note that we can't add any new failnids, since we don't
2492                    know the old osc names. */
2493                 flags = CM_SKIP | CM_UPGRADE146;
2494
2495         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2496                 /* If the update flag isn't set, don't update client/mdt
2497                    logs. */
2498                 flags |= CM_SKIP;
2499                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2500                               "the MDT first to regenerate it.\n",
2501                               mti->mti_svname);
2502         }
2503
2504         /* Add ost to all MDT lov defs */
2505         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2506                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2507                         char mdt_index[9];
2508
2509                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2510                                                      i);
2511                         if (rc)
2512                                 RETURN(rc);
2513                         sprintf(mdt_index, "-MDT%04x", i);
2514                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2515                                                       logname, mdt_index,
2516                                                       lovname, LUSTRE_SP_MDT,
2517                                                       flags);
2518                         name_destroy(&logname);
2519                         name_destroy(&lovname);
2520                         if (rc)
2521                                 RETURN(rc);
2522                 }
2523         }
2524
2525         /* Append ost info to the client log */
2526         rc = name_create(&logname, mti->mti_fsname, "-client");
2527         if (rc)
2528                 RETURN(rc);
2529         if (mgs_log_is_empty(env, mgs, logname)) {
2530                 /* Start client log */
2531                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2532                                        fsdb->fsdb_clilov);
2533                 if (rc)
2534                         GOTO(out_free, rc);
2535                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2536                                        fsdb->fsdb_clilmv);
2537                 if (rc)
2538                         GOTO(out_free, rc);
2539         }
2540         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2541                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2542 out_free:
2543         name_destroy(&logname);
2544         RETURN(rc);
2545 }
2546
2547 static __inline__ int mgs_param_empty(char *ptr)
2548 {
2549         char *tmp;
2550
2551         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2552                 return 1;
2553         return 0;
2554 }
2555
2556 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2557                                           struct mgs_device *mgs,
2558                                           struct fs_db *fsdb,
2559                                           struct mgs_target_info *mti,
2560                                           char *logname, char *cliname)
2561 {
2562         int rc;
2563         struct llog_handle *llh = NULL;
2564
2565         if (mgs_param_empty(mti->mti_params)) {
2566                 /* Remove _all_ failnids */
2567                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2568                                 mti->mti_svname, "add failnid", CM_SKIP);
2569                 return rc < 0 ? rc : 0;
2570         }
2571
2572         /* Otherwise failover nids are additive */
2573         rc = record_start_log(env, mgs, &llh, logname);
2574         if (rc)
2575                 return rc;
2576                 /* FIXME this should be a single journal transaction */
2577         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2578                            "add failnid");
2579         if (rc)
2580                 goto out_end;
2581         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2582         if (rc)
2583                 goto out_end;
2584         rc = record_marker(env, llh, fsdb, CM_END,
2585                            mti->mti_svname, "add failnid");
2586 out_end:
2587         record_end_log(env, &llh);
2588         return rc;
2589 }
2590
2591
2592 /* Add additional failnids to an existing log.
2593    The mdc/osc must have been added to logs first */
2594 /* tcp nids must be in dotted-quad ascii -
2595    we can't resolve hostnames from the kernel. */
2596 static int mgs_write_log_add_failnid(const struct lu_env *env,
2597                                      struct mgs_device *mgs,
2598                                      struct fs_db *fsdb,
2599                                      struct mgs_target_info *mti)
2600 {
2601         char *logname, *cliname;
2602         int rc;
2603         ENTRY;
2604
2605         /* FIXME we currently can't erase the failnids
2606          * given when a target first registers, since they aren't part of
2607          * an "add uuid" stanza */
2608
2609         /* Verify that we know about this target */
2610         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2611                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2612                                    "yet. It must be started before failnids "
2613                                    "can be added.\n", mti->mti_svname);
2614                 RETURN(-ENOENT);
2615         }
2616
2617         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2618         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2619                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2620         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2621                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2622         } else {
2623                 RETURN(-EINVAL);
2624         }
2625         if (rc)
2626                 RETURN(rc);
2627         /* Add failover nids to the client log */
2628         rc = name_create(&logname, mti->mti_fsname, "-client");
2629         if (rc) {
2630                 name_destroy(&cliname);
2631                 RETURN(rc);
2632         }
2633         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2634         name_destroy(&logname);
2635         name_destroy(&cliname);
2636         if (rc)
2637                 RETURN(rc);
2638
2639         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2640                 /* Add OST failover nids to the MDT logs as well */
2641                 int i;
2642
2643                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2644                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2645                                 continue;
2646                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2647                         if (rc)
2648                                 RETURN(rc);
2649                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2650                                                  fsdb, i);
2651                         if (rc) {
2652                                 name_destroy(&logname);
2653                                 RETURN(rc);
2654                         }
2655                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2656                                                             mti, logname,
2657                                                             cliname);
2658                         name_destroy(&cliname);
2659                         name_destroy(&logname);
2660                         if (rc)
2661                                 RETURN(rc);
2662                 }
2663         }
2664
2665         RETURN(rc);
2666 }
2667
2668 static int mgs_wlp_lcfg(const struct lu_env *env,
2669                         struct mgs_device *mgs, struct fs_db *fsdb,
2670                         struct mgs_target_info *mti,
2671                         char *logname, struct lustre_cfg_bufs *bufs,
2672                         char *tgtname, char *ptr)
2673 {
2674         char comment[MTI_NAME_MAXLEN];
2675         char *tmp;
2676         struct llog_cfg_rec *lcr;
2677         int rc, del;
2678
2679         /* Erase any old settings of this same parameter */
2680         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2681         comment[MTI_NAME_MAXLEN - 1] = 0;
2682         /* But don't try to match the value. */
2683         tmp = strchr(comment, '=');
2684         if (tmp != NULL)
2685                 *tmp = 0;
2686         /* FIXME we should skip settings that are the same as old values */
2687         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2688         if (rc < 0)
2689                 return rc;
2690         del = mgs_param_empty(ptr);
2691
2692         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2693                       "Setting" : "Modifying", tgtname, comment, logname);
2694         if (del) {
2695                 /* mgs_modify() will return 1 if nothing had to be done */
2696                 if (rc == 1)
2697                         rc = 0;
2698                 return rc;
2699         }
2700
2701         lustre_cfg_bufs_reset(bufs, tgtname);
2702         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2703         if (mti->mti_flags & LDD_F_PARAM2)
2704                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2705
2706         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2707                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2708         if (lcr == NULL)
2709                 return -ENOMEM;
2710
2711         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2712                                   comment);
2713         lustre_cfg_rec_free(lcr);
2714         return rc;
2715 }
2716
2717 static int mgs_write_log_param2(const struct lu_env *env,
2718                                 struct mgs_device *mgs,
2719                                 struct fs_db *fsdb,
2720                                 struct mgs_target_info *mti, char *ptr)
2721 {
2722         struct lustre_cfg_bufs  bufs;
2723         int                     rc = 0;
2724         ENTRY;
2725
2726         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2727         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2728                           mti->mti_svname, ptr);
2729
2730         RETURN(rc);
2731 }
2732
2733 /* write global variable settings into log */
2734 static int mgs_write_log_sys(const struct lu_env *env,
2735                              struct mgs_device *mgs, struct fs_db *fsdb,
2736                              struct mgs_target_info *mti, char *sys, char *ptr)
2737 {
2738         struct mgs_thread_info  *mgi = mgs_env_info(env);
2739         struct lustre_cfg       *lcfg;
2740         struct llog_cfg_rec     *lcr;
2741         char *tmp, sep;
2742         int rc, cmd, convert = 1;
2743
2744         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2745                 cmd = LCFG_SET_TIMEOUT;
2746         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2747                 cmd = LCFG_SET_LDLM_TIMEOUT;
2748         /* Check for known params here so we can return error to lctl */
2749         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2750                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2751                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2752                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2753                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2754                 cmd = LCFG_PARAM;
2755         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2756                 convert = 0; /* Don't convert string value to integer */
2757                 cmd = LCFG_PARAM;
2758         } else {
2759                 return -EINVAL;
2760         }
2761
2762         if (mgs_param_empty(ptr))
2763                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2764         else
2765                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2766
2767         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2768         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2769         if (!convert && *tmp != '\0')
2770                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2771         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2772         if (lcr == NULL)
2773                 return -ENOMEM;
2774
2775         lcfg = &lcr->lcr_cfg;
2776         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2777         /* truncate the comment to the parameter name */
2778         ptr = tmp - 1;
2779         sep = *ptr;
2780         *ptr = '\0';
2781         /* modify all servers and clients */
2782         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2783                                       *tmp == '\0' ? NULL : lcr,
2784                                       mti->mti_fsname, sys, 0);
2785         if (rc == 0 && *tmp != '\0') {
2786                 switch (cmd) {
2787                 case LCFG_SET_TIMEOUT:
2788                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2789                                 class_process_config(lcfg);
2790                         break;
2791                 case LCFG_SET_LDLM_TIMEOUT:
2792                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2793                                 class_process_config(lcfg);
2794                         break;
2795                 default:
2796                         break;
2797                 }
2798         }
2799         *ptr = sep;
2800         lustre_cfg_rec_free(lcr);
2801         return rc;
2802 }
2803
2804 /* write quota settings into log */
2805 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2806                                struct fs_db *fsdb, struct mgs_target_info *mti,
2807                                char *quota, char *ptr)
2808 {
2809         struct mgs_thread_info  *mgi = mgs_env_info(env);
2810         struct llog_cfg_rec     *lcr;
2811         char                    *tmp;
2812         char                     sep;
2813         int                      rc, cmd = LCFG_PARAM;
2814
2815         /* support only 'meta' and 'data' pools so far */
2816         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2817             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2818                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2819                        "& quota.ost are)\n", ptr);
2820                 return -EINVAL;
2821         }
2822
2823         if (*tmp == '\0') {
2824                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2825         } else {
2826                 CDEBUG(D_MGS, "global '%s'\n", quota);
2827
2828                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2829                     strcmp(tmp, "none") != 0) {
2830                         CERROR("enable option(%s) isn't supported\n", tmp);
2831                         return -EINVAL;
2832                 }
2833         }
2834
2835         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2836         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2837         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2838         if (lcr == NULL)
2839                 return -ENOMEM;
2840
2841         /* truncate the comment to the parameter name */
2842         ptr = tmp - 1;
2843         sep = *ptr;
2844         *ptr = '\0';
2845
2846         /* XXX we duplicated quota enable information in all server
2847          *     config logs, it should be moved to a separate config
2848          *     log once we cleanup the config log for global param. */
2849         /* modify all servers */
2850         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2851                                       *tmp == '\0' ? NULL : lcr,
2852                                       mti->mti_fsname, quota, 1);
2853         *ptr = sep;
2854         lustre_cfg_rec_free(lcr);
2855         return rc < 0 ? rc : 0;
2856 }
2857
2858 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2859                                    struct mgs_device *mgs,
2860                                    struct fs_db *fsdb,
2861                                    struct mgs_target_info *mti,
2862                                    char *param)
2863 {
2864         struct mgs_thread_info  *mgi = mgs_env_info(env);
2865         struct llog_cfg_rec     *lcr;
2866         struct llog_handle      *llh = NULL;
2867         char                    *logname;
2868         char                    *comment, *ptr;
2869         int                      rc, len;
2870
2871         ENTRY;
2872
2873         /* get comment */
2874         ptr = strchr(param, '=');
2875         LASSERT(ptr != NULL);
2876         len = ptr - param;
2877
2878         OBD_ALLOC(comment, len + 1);
2879         if (comment == NULL)
2880                 RETURN(-ENOMEM);
2881         strncpy(comment, param, len);
2882         comment[len] = '\0';
2883
2884         /* prepare lcfg */
2885         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2886         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2887         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2888         if (lcr == NULL)
2889                 GOTO(out_comment, rc = -ENOMEM);
2890
2891         /* construct log name */
2892         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2893         if (rc < 0)
2894                 GOTO(out_lcfg, rc);
2895
2896         if (mgs_log_is_empty(env, mgs, logname)) {
2897                 rc = record_start_log(env, mgs, &llh, logname);
2898                 if (rc < 0)
2899                         GOTO(out, rc);
2900                 record_end_log(env, &llh);
2901         }
2902
2903         /* obsolete old one */
2904         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2905                         comment, CM_SKIP);
2906         if (rc < 0)
2907                 GOTO(out, rc);
2908         /* write the new one */
2909         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2910                                   mti->mti_svname, comment);
2911         if (rc)
2912                 CERROR("%s: error writing log %s: rc = %d\n",
2913                        mgs->mgs_obd->obd_name, logname, rc);
2914 out:
2915         name_destroy(&logname);
2916 out_lcfg:
2917         lustre_cfg_rec_free(lcr);
2918 out_comment:
2919         OBD_FREE(comment, len + 1);
2920         RETURN(rc);
2921 }
2922
2923 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2924                                         char *param)
2925 {
2926         char    *ptr;
2927
2928         /* disable the adjustable udesc parameter for now, i.e. use default
2929          * setting that client always ship udesc to MDT if possible. to enable
2930          * it simply remove the following line */
2931         goto error_out;
2932
2933         ptr = strchr(param, '=');
2934         if (ptr == NULL)
2935                 goto error_out;
2936         *ptr++ = '\0';
2937
2938         if (strcmp(param, PARAM_SRPC_UDESC))
2939                 goto error_out;
2940
2941         if (strcmp(ptr, "yes") == 0) {
2942                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2943                 CWARN("Enable user descriptor shipping from client to MDT\n");
2944         } else if (strcmp(ptr, "no") == 0) {
2945                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2946                 CWARN("Disable user descriptor shipping from client to MDT\n");
2947         } else {
2948                 *(ptr - 1) = '=';
2949                 goto error_out;
2950         }
2951         return 0;
2952
2953 error_out:
2954         CERROR("Invalid param: %s\n", param);
2955         return -EINVAL;
2956 }
2957
2958 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2959                                   const char *svname,
2960                                   char *param)
2961 {
2962         struct sptlrpc_rule      rule;
2963         struct sptlrpc_rule_set *rset;
2964         int                      rc;
2965         ENTRY;
2966
2967         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2968                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2969                 RETURN(-EINVAL);
2970         }
2971
2972         if (strncmp(param, PARAM_SRPC_UDESC,
2973                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2974                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2975         }
2976
2977         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2978                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2979                 RETURN(-EINVAL);
2980         }
2981
2982         param += sizeof(PARAM_SRPC_FLVR) - 1;
2983
2984         rc = sptlrpc_parse_rule(param, &rule);
2985         if (rc)
2986                 RETURN(rc);
2987
2988         /* mgs rules implies must be mgc->mgs */
2989         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2990                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2991                      rule.sr_from != LUSTRE_SP_ANY) ||
2992                     (rule.sr_to != LUSTRE_SP_MGS &&
2993                      rule.sr_to != LUSTRE_SP_ANY))
2994                         RETURN(-EINVAL);
2995         }
2996
2997         /* preapre room for this coming rule. svcname format should be:
2998          * - fsname: general rule
2999          * - fsname-tgtname: target-specific rule
3000          */
3001         if (strchr(svname, '-')) {
3002                 struct mgs_tgt_srpc_conf *tgtconf;
3003                 int                       found = 0;
3004
3005                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3006                      tgtconf = tgtconf->mtsc_next) {
3007                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3008                                 found = 1;
3009                                 break;
3010                         }
3011                 }
3012
3013                 if (!found) {
3014                         int name_len;
3015
3016                         OBD_ALLOC_PTR(tgtconf);
3017                         if (tgtconf == NULL)
3018                                 RETURN(-ENOMEM);
3019
3020                         name_len = strlen(svname);
3021
3022                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3023                         if (tgtconf->mtsc_tgt == NULL) {
3024                                 OBD_FREE_PTR(tgtconf);
3025                                 RETURN(-ENOMEM);
3026                         }
3027                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3028
3029                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3030                         fsdb->fsdb_srpc_tgt = tgtconf;
3031                 }
3032
3033                 rset = &tgtconf->mtsc_rset;
3034         } else {
3035                 rset = &fsdb->fsdb_srpc_gen;
3036         }
3037
3038         rc = sptlrpc_rule_set_merge(rset, &rule);
3039
3040         RETURN(rc);
3041 }
3042
3043 static int mgs_srpc_set_param(const struct lu_env *env,
3044                               struct mgs_device *mgs,
3045                               struct fs_db *fsdb,
3046                               struct mgs_target_info *mti,
3047                               char *param)
3048 {
3049         char                   *copy;
3050         int                     rc, copy_size;
3051         ENTRY;
3052
3053 #ifndef HAVE_GSS
3054         RETURN(-EINVAL);
3055 #endif
3056         /* keep a copy of original param, which could be destroied
3057          * during parsing */
3058         copy_size = strlen(param) + 1;
3059         OBD_ALLOC(copy, copy_size);
3060         if (copy == NULL)
3061                 return -ENOMEM;
3062         memcpy(copy, param, copy_size);
3063
3064         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3065         if (rc)
3066                 goto out_free;
3067
3068         /* previous steps guaranteed the syntax is correct */
3069         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3070         if (rc)
3071                 goto out_free;
3072
3073         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3074                 /*
3075                  * for mgs rules, make them effective immediately.
3076                  */
3077                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3078                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3079                                                  &fsdb->fsdb_srpc_gen);
3080         }
3081
3082 out_free:
3083         OBD_FREE(copy, copy_size);
3084         RETURN(rc);
3085 }
3086
3087 struct mgs_srpc_read_data {
3088         struct fs_db   *msrd_fsdb;
3089         int             msrd_skip;
3090 };
3091
3092 static int mgs_srpc_read_handler(const struct lu_env *env,
3093                                  struct llog_handle *llh,
3094                                  struct llog_rec_hdr *rec, void *data)
3095 {
3096         struct mgs_srpc_read_data *msrd = data;
3097         struct cfg_marker         *marker;
3098         struct lustre_cfg         *lcfg = REC_DATA(rec);
3099         char                      *svname, *param;
3100         int                        cfg_len, rc;
3101         ENTRY;
3102
3103         if (rec->lrh_type != OBD_CFG_REC) {
3104                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3105                 RETURN(-EINVAL);
3106         }
3107
3108         cfg_len = REC_DATA_LEN(rec);
3109
3110         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3111         if (rc) {
3112                 CERROR("Insane cfg\n");
3113                 RETURN(rc);
3114         }
3115
3116         if (lcfg->lcfg_command == LCFG_MARKER) {
3117                 marker = lustre_cfg_buf(lcfg, 1);
3118
3119                 if (marker->cm_flags & CM_START &&
3120                     marker->cm_flags & CM_SKIP)
3121                         msrd->msrd_skip = 1;
3122                 if (marker->cm_flags & CM_END)
3123                         msrd->msrd_skip = 0;
3124
3125                 RETURN(0);
3126         }
3127
3128         if (msrd->msrd_skip)
3129                 RETURN(0);
3130
3131         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3132                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3133                 RETURN(0);
3134         }
3135
3136         svname = lustre_cfg_string(lcfg, 0);
3137         if (svname == NULL) {
3138                 CERROR("svname is empty\n");
3139                 RETURN(0);
3140         }
3141
3142         param = lustre_cfg_string(lcfg, 1);
3143         if (param == NULL) {
3144                 CERROR("param is empty\n");
3145                 RETURN(0);
3146         }
3147
3148         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3149         if (rc)
3150                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3151
3152         RETURN(0);
3153 }
3154
3155 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3156                                 struct mgs_device *mgs,
3157                                 struct fs_db *fsdb)
3158 {
3159         struct llog_handle        *llh = NULL;
3160         struct llog_ctxt          *ctxt;
3161         char                      *logname;
3162         struct mgs_srpc_read_data  msrd;
3163         int                        rc;
3164         ENTRY;
3165
3166         /* construct log name */
3167         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3168         if (rc)
3169                 RETURN(rc);
3170
3171         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3172         LASSERT(ctxt != NULL);
3173
3174         if (mgs_log_is_empty(env, mgs, logname))
3175                 GOTO(out, rc = 0);
3176
3177         rc = llog_open(env, ctxt, &llh, NULL, logname,
3178                        LLOG_OPEN_EXISTS);
3179         if (rc < 0) {
3180                 if (rc == -ENOENT)
3181                         rc = 0;
3182                 GOTO(out, rc);
3183         }
3184
3185         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3186         if (rc)
3187                 GOTO(out_close, rc);
3188
3189         if (llog_get_size(llh) <= 1)
3190                 GOTO(out_close, rc = 0);
3191
3192         msrd.msrd_fsdb = fsdb;
3193         msrd.msrd_skip = 0;
3194
3195         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3196                           NULL);
3197
3198 out_close:
3199         llog_close(env, llh);
3200 out:
3201         llog_ctxt_put(ctxt);
3202         name_destroy(&logname);
3203
3204         if (rc)
3205                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3206         RETURN(rc);
3207 }
3208
3209 /* Permanent settings of all parameters by writing into the appropriate
3210  * configuration logs.
3211  * A parameter with null value ("<param>='\0'") means to erase it out of
3212  * the logs.
3213  */
3214 static int mgs_write_log_param(const struct lu_env *env,
3215                                struct mgs_device *mgs, struct fs_db *fsdb,
3216                                struct mgs_target_info *mti, char *ptr)
3217 {
3218         struct mgs_thread_info *mgi = mgs_env_info(env);
3219         char *logname;
3220         char *tmp;
3221         int rc = 0, rc2 = 0;
3222         ENTRY;
3223
3224         /* For various parameter settings, we have to figure out which logs
3225            care about them (e.g. both mdt and client for lov settings) */
3226         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3227
3228         /* The params are stored in MOUNT_DATA_FILE and modified via
3229            tunefs.lustre, or set using lctl conf_param */
3230
3231         /* Processed in lustre_start_mgc */
3232         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3233                 GOTO(end, rc);
3234
3235         /* Processed in ost/mdt */
3236         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3237                 GOTO(end, rc);
3238
3239         /* Processed in mgs_write_log_ost */
3240         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3241                 if (mti->mti_flags & LDD_F_PARAM) {
3242                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3243                                            "changed with tunefs.lustre"
3244                                            "and --writeconf\n", ptr);
3245                         rc = -EPERM;
3246                 }
3247                 GOTO(end, rc);
3248         }
3249
3250         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3251                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3252                 GOTO(end, rc);
3253         }
3254
3255         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3256                 /* Add a failover nidlist */
3257                 rc = 0;