Whamcloud - gitweb
LU-1430 mgs: "replace_nids" lctl command added
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551                 if (fsdb->fsdb_mdt_count >= MAX_MDT_COUNT) {
552                         LCONSOLE_ERROR_MSG(0x13f, "The max mdt count"
553                                            "is %d\n", (int)MAX_MDT_COUNT);
554                         GOTO(out_up, rc = -ERANGE);
555                 }
556         } else {
557                 GOTO(out_up, rc = -EINVAL);
558         }
559
560         if (mti->mti_flags & LDD_F_NEED_INDEX) {
561                 rc = next_index(imap, INDEX_MAP_SIZE);
562                 if (rc == -1)
563                         GOTO(out_up, rc = -ERANGE);
564                 mti->mti_stripe_index = rc;
565                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
566                         fsdb->fsdb_mdt_count ++;
567         }
568
569         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
570                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
571                                    "but the max index is %d.\n",
572                                    mti->mti_svname, mti->mti_stripe_index,
573                                    INDEX_MAP_SIZE * 8);
574                 GOTO(out_up, rc = -ERANGE);
575         }
576
577         if (test_bit(mti->mti_stripe_index, imap)) {
578                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
579                     !(mti->mti_flags & LDD_F_WRITECONF)) {
580                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
581                                            "%d, but that index is already in "
582                                            "use. Use --writeconf to force\n",
583                                            mti->mti_svname,
584                                            mti->mti_stripe_index);
585                         GOTO(out_up, rc = -EADDRINUSE);
586                 } else {
587                         CDEBUG(D_MGS, "Server %s updating index %d\n",
588                                mti->mti_svname, mti->mti_stripe_index);
589                         GOTO(out_up, rc = EALREADY);
590                 }
591         }
592
593         set_bit(mti->mti_stripe_index, imap);
594         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
595         mutex_unlock(&fsdb->fsdb_mutex);
596         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
597                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
598
599         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
600                mti->mti_stripe_index);
601
602         RETURN(0);
603 out_up:
604         mutex_unlock(&fsdb->fsdb_mutex);
605         return rc;
606 }
607
608 struct mgs_modify_lookup {
609         struct cfg_marker mml_marker;
610         int               mml_modified;
611 };
612
613 static int mgs_modify_handler(const struct lu_env *env,
614                               struct llog_handle *llh,
615                               struct llog_rec_hdr *rec, void *data)
616 {
617         struct mgs_modify_lookup *mml = data;
618         struct cfg_marker *marker;
619         struct lustre_cfg *lcfg = REC_DATA(rec);
620         int cfg_len = REC_DATA_LEN(rec);
621         int rc;
622         ENTRY;
623
624         if (rec->lrh_type != OBD_CFG_REC) {
625                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
626                 RETURN(-EINVAL);
627         }
628
629         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
630         if (rc) {
631                 CERROR("Insane cfg\n");
632                 RETURN(rc);
633         }
634
635         /* We only care about markers */
636         if (lcfg->lcfg_command != LCFG_MARKER)
637                 RETURN(0);
638
639         marker = lustre_cfg_buf(lcfg, 1);
640         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
641             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
642             !(marker->cm_flags & CM_SKIP)) {
643                 /* Found a non-skipped marker match */
644                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
645                        rec->lrh_index, marker->cm_step,
646                        marker->cm_flags, mml->mml_marker.cm_flags,
647                        marker->cm_tgtname, marker->cm_comment);
648                 /* Overwrite the old marker llog entry */
649                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
650                 marker->cm_flags |= mml->mml_marker.cm_flags;
651                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
652                 /* Header and tail are added back to lrh_len in
653                    llog_lvfs_write_rec */
654                 rec->lrh_len = cfg_len;
655                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
656                                 rec->lrh_index);
657                 if (!rc)
658                          mml->mml_modified++;
659         }
660
661         RETURN(rc);
662 }
663
664 /**
665  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
666  * Return code:
667  * 0 - modified successfully,
668  * 1 - no modification was done
669  * negative - error
670  */
671 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
672                       struct fs_db *fsdb, struct mgs_target_info *mti,
673                       char *logname, char *devname, char *comment, int flags)
674 {
675         struct llog_handle *loghandle;
676         struct llog_ctxt *ctxt;
677         struct mgs_modify_lookup *mml;
678         int rc;
679
680         ENTRY;
681
682         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
683         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
684                flags);
685
686         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
687         LASSERT(ctxt != NULL);
688         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
689         if (rc < 0) {
690                 if (rc == -ENOENT)
691                         rc = 0;
692                 GOTO(out_pop, rc);
693         }
694
695         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
696         if (rc)
697                 GOTO(out_close, rc);
698
699         if (llog_get_size(loghandle) <= 1)
700                 GOTO(out_close, rc = 0);
701
702         OBD_ALLOC_PTR(mml);
703         if (!mml)
704                 GOTO(out_close, rc = -ENOMEM);
705         strcpy(mml->mml_marker.cm_comment, comment);
706         strcpy(mml->mml_marker.cm_tgtname, devname);
707         /* Modify mostly means cancel */
708         mml->mml_marker.cm_flags = flags;
709         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
710         mml->mml_modified = 0;
711         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
712                           NULL);
713         if (!rc && !mml->mml_modified)
714                 rc = 1;
715         OBD_FREE_PTR(mml);
716
717 out_close:
718         llog_close(env, loghandle);
719 out_pop:
720         if (rc < 0)
721                 CERROR("%s: modify %s/%s failed: rc = %d\n",
722                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
723         llog_ctxt_put(ctxt);
724         RETURN(rc);
725 }
726
727 /** This structure is passed to mgs_replace_handler */
728 struct mgs_replace_uuid_lookup {
729         /* Nids are replaced for this target device */
730         struct mgs_target_info target;
731         /* Temporary modified llog */
732         struct llog_handle *temp_llh;
733         /* Flag is set if in target block*/
734         int in_target_device;
735         /* Nids already added. Just skip (multiple nids) */
736         int device_nids_added;
737         /* Flag is set if this block should not be copied */
738         int skip_it;
739 };
740
741 /**
742  * Check: a) if block should be skipped
743  * b) is it target block
744  *
745  * \param[in] lcfg
746  * \param[in] mrul
747  *
748  * \retval 0 should not to be skipped
749  * \retval 1 should to be skipped
750  */
751 static int check_markers(struct lustre_cfg *lcfg,
752                          struct mgs_replace_uuid_lookup *mrul)
753 {
754          struct cfg_marker *marker;
755
756         /* Track markers. Find given device */
757         if (lcfg->lcfg_command == LCFG_MARKER) {
758                 marker = lustre_cfg_buf(lcfg, 1);
759                 /* Clean llog from records marked as CM_EXCLUDE.
760                    CM_SKIP records are used for "active" command
761                    and can be restored if needed */
762                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
763                     (CM_EXCLUDE | CM_START)) {
764                         mrul->skip_it = 1;
765                         return 1;
766                 }
767
768                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
769                     (CM_EXCLUDE | CM_END)) {
770                         mrul->skip_it = 0;
771                         return 1;
772                 }
773
774                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
775                         LASSERT(!(marker->cm_flags & CM_START) ||
776                                 !(marker->cm_flags & CM_END));
777                         if (marker->cm_flags & CM_START) {
778                                 mrul->in_target_device = 1;
779                                 mrul->device_nids_added = 0;
780                         } else if (marker->cm_flags & CM_END)
781                                 mrul->in_target_device = 0;
782                 }
783         }
784
785         return 0;
786 }
787
788 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
789                        struct lustre_cfg *lcfg)
790 {
791         struct llog_rec_hdr      rec;
792         int                      buflen, rc;
793
794         if (!lcfg || !llh)
795                 return -ENOMEM;
796
797         LASSERT(llh->lgh_ctxt);
798
799         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
800                                 lcfg->lcfg_buflens);
801         rec.lrh_len = llog_data_len(buflen);
802         rec.lrh_type = OBD_CFG_REC;
803
804         /* idx = -1 means append */
805         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
806         if (rc)
807                 CERROR("failed %d\n", rc);
808         return rc;
809 }
810
811 static int record_base(const struct lu_env *env, struct llog_handle *llh,
812                      char *cfgname, lnet_nid_t nid, int cmd,
813                      char *s1, char *s2, char *s3, char *s4)
814 {
815         struct mgs_thread_info *mgi = mgs_env_info(env);
816         struct lustre_cfg     *lcfg;
817         int rc;
818
819         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
820                cmd, s1, s2, s3, s4);
821
822         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
823         if (s1)
824                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
825         if (s2)
826                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
827         if (s3)
828                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
829         if (s4)
830                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
831
832         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
833         if (!lcfg)
834                 return -ENOMEM;
835         lcfg->lcfg_nid = nid;
836
837         rc = record_lcfg(env, llh, lcfg);
838
839         lustre_cfg_free(lcfg);
840
841         if (rc) {
842                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
843                        cmd, s1, s2, s3, s4);
844         }
845         return rc;
846 }
847
848 static inline int record_add_uuid(const struct lu_env *env,
849                                   struct llog_handle *llh,
850                                   uint64_t nid, char *uuid)
851 {
852         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
853 }
854
855 static inline int record_add_conn(const struct lu_env *env,
856                                   struct llog_handle *llh,
857                                   char *devname, char *uuid)
858 {
859         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
860 }
861
862 static inline int record_attach(const struct lu_env *env,
863                                 struct llog_handle *llh, char *devname,
864                                 char *type, char *uuid)
865 {
866         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
867 }
868
869 static inline int record_setup(const struct lu_env *env,
870                                struct llog_handle *llh, char *devname,
871                                char *s1, char *s2, char *s3, char *s4)
872 {
873         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
874 }
875
876 /**
877  * \retval <0 record processing error
878  * \retval n record is processed. No need copy original one.
879  * \retval 0 record is not processed.
880  */
881 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
882                            struct mgs_replace_uuid_lookup *mrul)
883 {
884         int nids_added = 0;
885         lnet_nid_t nid;
886         char *ptr;
887         int rc;
888
889         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
890                 /* LCFG_ADD_UUID command found. Let's skip original command
891                    and add passed nids */
892                 ptr = mrul->target.mti_params;
893                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
894                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
895                                "device %s\n", libcfs_nid2str(nid),
896                                 mrul->target.mti_params,
897                                 mrul->target.mti_svname);
898                         rc = record_add_uuid(env,
899                                              mrul->temp_llh, nid,
900                                              mrul->target.mti_params);
901                         if (!rc)
902                                 nids_added++;
903                 }
904
905                 if (nids_added == 0) {
906                         CERROR("No new nids were added, nid %s with uuid %s, "
907                                "device %s\n", libcfs_nid2str(nid),
908                                mrul->target.mti_params,
909                                mrul->target.mti_svname);
910                         RETURN(-ENXIO);
911                 } else {
912                         mrul->device_nids_added = 1;
913                 }
914
915                 return nids_added;
916         }
917
918         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
919                 /* LCFG_SETUP command found. UUID should be changed */
920                 rc = record_setup(env,
921                                   mrul->temp_llh,
922                                   /* devname the same */
923                                   lustre_cfg_string(lcfg, 0),
924                                   /* s1 is not changed */
925                                   lustre_cfg_string(lcfg, 1),
926                                   /* new uuid should be
927                                   the full nidlist */
928                                   mrul->target.mti_params,
929                                   /* s3 is not changed */
930                                   lustre_cfg_string(lcfg, 3),
931                                   /* s4 is not changed */
932                                   lustre_cfg_string(lcfg, 4));
933                 return rc ? rc : 1;
934         }
935
936         /* Another commands in target device block */
937         return 0;
938 }
939
940 /**
941  * Handler that called for every record in llog.
942  * Records are processed in order they placed in llog.
943  *
944  * \param[in] llh       log to be processed
945  * \param[in] rec       current record
946  * \param[in] data      mgs_replace_uuid_lookup structure
947  *
948  * \retval 0    success
949  */
950 static int mgs_replace_handler(const struct lu_env *env,
951                                struct llog_handle *llh,
952                                struct llog_rec_hdr *rec,
953                                void *data)
954 {
955         struct llog_rec_hdr local_rec = *rec;
956         struct mgs_replace_uuid_lookup *mrul;
957         struct lustre_cfg *lcfg = REC_DATA(rec);
958         int cfg_len = REC_DATA_LEN(rec);
959         int rc;
960         ENTRY;
961
962         mrul = (struct mgs_replace_uuid_lookup *)data;
963
964         if (rec->lrh_type != OBD_CFG_REC) {
965                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
966                        rec->lrh_type, lcfg->lcfg_command,
967                        lustre_cfg_string(lcfg, 0),
968                        lustre_cfg_string(lcfg, 1));
969                 RETURN(-EINVAL);
970         }
971
972         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
973         if (rc) {
974                 /* Do not copy any invalidated records */
975                 GOTO(skip_out, rc = 0);
976         }
977
978         rc = check_markers(lcfg, mrul);
979         if (rc || mrul->skip_it)
980                 GOTO(skip_out, rc = 0);
981
982         /* Write to new log all commands outside target device block */
983         if (!mrul->in_target_device)
984                 GOTO(copy_out, rc = 0);
985
986         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
987            (failover nids) for this target, assuming that if then
988            primary is changing then so is the failover */
989         if (mrul->device_nids_added &&
990             (lcfg->lcfg_command == LCFG_ADD_UUID ||
991              lcfg->lcfg_command == LCFG_ADD_CONN))
992                 GOTO(skip_out, rc = 0);
993
994         rc = process_command(env, lcfg, mrul);
995         if (rc < 0)
996                 RETURN(rc);
997
998         if (rc)
999                 RETURN(0);
1000 copy_out:
1001         /* Record is placed in temporary llog as is */
1002         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
1003         rc = llog_write(env, mrul->temp_llh, &local_rec, NULL, 0,
1004                         (void *)lcfg, -1);
1005
1006         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1007                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1008                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1009         RETURN(rc);
1010
1011 skip_out:
1012         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1013                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1014                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1015         RETURN(rc);
1016 }
1017
1018 static int mgs_backup_llog(const struct lu_env *env,
1019                            struct obd_device *mgs,
1020                            char *fsname, char *backup)
1021 {
1022         struct obd_uuid *uuid;
1023         struct llog_handle *orig_llh, *bak_llh;
1024         struct llog_ctxt *lctxt;
1025         int rc, rc2;
1026         ENTRY;
1027
1028         lctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1029         if (!lctxt) {
1030                 CERROR("%s: missing llog context\n", mgs->obd_name);
1031                 GOTO(out, rc = -EINVAL);
1032         }
1033
1034         /* Make sure there's no old backup log */
1035         rc = llog_erase(env, lctxt, NULL, backup);
1036         if (rc < 0 && rc != -ENOENT)
1037                 GOTO(out_put, rc);
1038
1039         /* open backup log */
1040         rc = llog_open_create(env, lctxt, &bak_llh, NULL, backup);
1041         if (rc) {
1042                 CERROR("%s: backup logfile open %s: rc = %d\n",
1043                        mgs->obd_name, backup, rc);
1044                 GOTO(out_put, rc);
1045         }
1046
1047         /* set the log header uuid */
1048         OBD_ALLOC_PTR(uuid);
1049         if (uuid == NULL)
1050                 GOTO(out_put, rc = -ENOMEM);
1051         obd_str2uuid(uuid, backup);
1052         rc = llog_init_handle(env, bak_llh, LLOG_F_IS_PLAIN, uuid);
1053         OBD_FREE_PTR(uuid);
1054         if (rc)
1055                 GOTO(out_close1, rc);
1056
1057         /* open original log */
1058         rc = llog_open(env, lctxt, &orig_llh, NULL, fsname,
1059                        LLOG_OPEN_EXISTS);
1060         if (rc < 0) {
1061                 if (rc == -ENOENT)
1062                         rc = 0;
1063                 GOTO(out_close1, rc);
1064         }
1065
1066         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1067         if (rc)
1068                 GOTO(out_close2, rc);
1069
1070         /* Copy remote log */
1071         rc = llog_process(env, orig_llh, llog_copy_handler,
1072                           (void *)bak_llh, NULL);
1073
1074 out_close2:
1075         rc2 = llog_close(env, orig_llh);
1076         if (!rc)
1077                 rc = rc2;
1078 out_close1:
1079         rc2 = llog_close(env, bak_llh);
1080         if (!rc)
1081                 rc = rc2;
1082 out_put:
1083         if (lctxt)
1084                 llog_ctxt_put(lctxt);
1085 out:
1086         if (rc)
1087                 CERROR("%s: Failed to backup log %s: rc = %d\n",
1088                        mgs->obd_name, fsname, rc);
1089         RETURN(rc);
1090 }
1091
1092 static int mgs_log_is_empty(const struct lu_env *env, struct mgs_device *mgs,
1093                             char *name);
1094
1095 static int mgs_replace_nids_log(const struct lu_env *env,
1096                                 struct obd_device *mgs, struct fs_db *fsdb,
1097                                 char *logname, char *devname, char *nids)
1098 {
1099         struct llog_handle *orig_llh, *backup_llh;
1100         struct llog_ctxt *ctxt;
1101         struct mgs_replace_uuid_lookup *mrul;
1102         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1103         char *backup;
1104         int rc, rc2;
1105         ENTRY;
1106
1107         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1108
1109         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1110         LASSERT(ctxt != NULL);
1111
1112         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1113                 /* Log is empty. Nothing to replace */
1114                 GOTO(out_put, rc = 0);
1115         }
1116
1117         OBD_ALLOC(backup, strlen(logname) + 5);
1118         if (backup == NULL)
1119                 GOTO(out_put, rc = -ENOMEM);
1120
1121         sprintf(backup, "%s.bak", logname);
1122
1123         rc = mgs_backup_llog(env, mgs, logname, backup);
1124         if (rc < 0) {
1125                 CERROR("%s: can't make backup for %s: rc = %d\n",
1126                        mgs->obd_name, logname, rc);
1127                 GOTO(out_free,rc);
1128         }
1129
1130         /* Now erase original log file. Connections are not allowed.
1131            Backup is already saved */
1132         rc = llog_erase(env, ctxt, NULL, logname);
1133         if (rc < 0 && rc != -ENOENT)
1134                 GOTO(out_free, rc);
1135
1136         /* open local log */
1137         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1138         if (rc)
1139                 GOTO(out_restore, rc);
1140
1141         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1142         if (rc)
1143                 GOTO(out_closel, rc);
1144
1145         /* open backup llog */
1146         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1147                        LLOG_OPEN_EXISTS);
1148         if (rc)
1149                 GOTO(out_closel, rc);
1150
1151         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1152         if (rc)
1153                 GOTO(out_close, rc);
1154
1155         if (llog_get_size(backup_llh) <= 1)
1156                 GOTO(out_close, rc = 0);
1157
1158         OBD_ALLOC_PTR(mrul);
1159         if (!mrul)
1160                 GOTO(out_close, rc = -ENOMEM);
1161         /* devname is only needed information to replace UUID records */
1162         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1163         /* parse nids later */
1164         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1165         /* Copy records to this temporary llog */
1166         mrul->temp_llh = orig_llh;
1167
1168         rc = llog_process(env, backup_llh, mgs_replace_handler,
1169                           (void *)mrul, NULL);
1170         OBD_FREE_PTR(mrul);
1171 out_close:
1172         rc2 = llog_close(NULL, backup_llh);
1173         if (!rc)
1174                 rc = rc2;
1175 out_closel:
1176         rc2 = llog_close(NULL, orig_llh);
1177         if (!rc)
1178                 rc = rc2;
1179
1180 out_restore:
1181         if (rc) {
1182                 CERROR("%s: llog should be restored: rc = %d\n",
1183                        mgs->obd_name, rc);
1184                 rc2 = mgs_backup_llog(env, mgs, backup, logname);
1185                 if (rc2 < 0)
1186                         CERROR("%s: can't restore backup %s: rc = %d\n",
1187                                mgs->obd_name, logname, rc2);
1188         }
1189
1190 out_free:
1191         OBD_FREE(backup, strlen(backup) + 5);
1192
1193 out_put:
1194         llog_ctxt_put(ctxt);
1195
1196         if (rc)
1197                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1198                        mgs->obd_name, logname, rc);
1199
1200         RETURN(rc);
1201 }
1202
1203 /**
1204  * Parse device name and get file system name and/or device index
1205  *
1206  * \param[in]   devname device name (ex. lustre-MDT0000)
1207  * \param[out]  fsname  file system name(optional)
1208  * \param[out]  index   device index(optional)
1209  *
1210  * \retval 0    success
1211  */
1212 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1213 {
1214         char *ptr;
1215         ENTRY;
1216
1217         /* Extract fsname */
1218         ptr = strrchr(devname, '-');
1219
1220         if (fsname) {
1221                 if (!ptr) {
1222                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1223                                devname);
1224                         RETURN(-EINVAL);
1225                 }
1226                 memset(fsname, 0, MTI_NAME_MAXLEN);
1227                 strncpy(fsname, devname, ptr - devname);
1228                 fsname[MTI_NAME_MAXLEN - 1] = 0;
1229         }
1230
1231         if (index) {
1232                 if (server_name2index(ptr, index, NULL) < 0) {
1233                         CDEBUG(D_MGS, "Device name with wrong index\n");
1234                         RETURN(-EINVAL);
1235                 }
1236         }
1237
1238         RETURN(0);
1239 }
1240
1241 static int only_mgs_is_running(struct obd_device *mgs_obd)
1242 {
1243         /* TDB: Is global variable with devices count exists? */
1244         int num_devices = get_devices_count();
1245         /* osd, MGS and MGC + self_export
1246            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1247         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1248 }
1249
1250 static int name_create_mdt(char **logname, char *fsname, int i)
1251 {
1252         char mdt_index[9];
1253
1254         sprintf(mdt_index, "-MDT%04x", i);
1255         return name_create(logname, fsname, mdt_index);
1256 }
1257
1258 /**
1259  * Replace nids for \a device to \a nids values
1260  *
1261  * \param obd           MGS obd device
1262  * \param devname       nids need to be replaced for this device
1263  * (ex. lustre-OST0000)
1264  * \param nids          nids list (ex. nid1,nid2,nid3)
1265  *
1266  * \retval 0    success
1267  */
1268 int mgs_replace_nids(const struct lu_env *env,
1269                      struct mgs_device *mgs,
1270                      char *devname, char *nids)
1271 {
1272         /* Assume fsname is part of device name */
1273         char fsname[MTI_NAME_MAXLEN];
1274         int rc;
1275         __u32 index;
1276         char *logname;
1277         struct fs_db *fsdb;
1278         unsigned int i;
1279         int conn_state;
1280         struct obd_device *mgs_obd = mgs->mgs_obd;
1281         ENTRY;
1282
1283         /* We can only change NIDs if no other nodes are connected */
1284         spin_lock(&mgs_obd->obd_dev_lock);
1285         conn_state = mgs_obd->obd_no_conn;
1286         mgs_obd->obd_no_conn = 1;
1287         spin_unlock(&mgs_obd->obd_dev_lock);
1288
1289         /* We can not change nids if not only MGS is started */
1290         if (!only_mgs_is_running(mgs_obd)) {
1291                 CERROR("Only MGS is allowed to be started\n");
1292                 GOTO(out, rc = -EINPROGRESS);
1293         }
1294
1295         /* Get fsname and index*/
1296         rc = mgs_parse_devname(devname, fsname, &index);
1297         if (rc)
1298                 GOTO(out, rc);
1299
1300         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1301         if (rc) {
1302                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1303                 GOTO(out, rc);
1304         }
1305
1306         /* Process client llogs */
1307         name_create(&logname, fsname, "-client");
1308         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1309         name_destroy(&logname);
1310         if (rc) {
1311                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1312                        fsname, devname, rc);
1313                 GOTO(out, rc);
1314         }
1315
1316         /* Process MDT llogs */
1317         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1318                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1319                         continue;
1320                 name_create_mdt(&logname, fsname, i);
1321                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1322                 name_destroy(&logname);
1323                 if (rc)
1324                         GOTO(out, rc);
1325         }
1326
1327 out:
1328         spin_lock(&mgs_obd->obd_dev_lock);
1329         mgs_obd->obd_no_conn = conn_state;
1330         spin_unlock(&mgs_obd->obd_dev_lock);
1331
1332         RETURN(rc);
1333 }
1334
1335 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1336                             char *devname, struct lov_desc *desc)
1337 {
1338         struct mgs_thread_info *mgi = mgs_env_info(env);
1339         struct lustre_cfg *lcfg;
1340         int rc;
1341
1342         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1343         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1344         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1345         if (!lcfg)
1346                 return -ENOMEM;
1347         rc = record_lcfg(env, llh, lcfg);
1348
1349         lustre_cfg_free(lcfg);
1350         return rc;
1351 }
1352
1353 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1354                             char *devname, struct lmv_desc *desc)
1355 {
1356         struct mgs_thread_info *mgi = mgs_env_info(env);
1357         struct lustre_cfg *lcfg;
1358         int rc;
1359
1360         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1361         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1362         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1363
1364         rc = record_lcfg(env, llh, lcfg);
1365
1366         lustre_cfg_free(lcfg);
1367         return rc;
1368 }
1369
1370 static inline int record_mdc_add(const struct lu_env *env,
1371                                  struct llog_handle *llh,
1372                                  char *logname, char *mdcuuid,
1373                                  char *mdtuuid, char *index,
1374                                  char *gen)
1375 {
1376         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1377                            mdtuuid,index,gen,mdcuuid);
1378 }
1379
1380 static inline int record_lov_add(const struct lu_env *env,
1381                                  struct llog_handle *llh,
1382                                  char *lov_name, char *ost_uuid,
1383                                  char *index, char *gen)
1384 {
1385         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1386                            ost_uuid,index,gen,0);
1387 }
1388
1389 static inline int record_mount_opt(const struct lu_env *env,
1390                                    struct llog_handle *llh,
1391                                    char *profile, char *lov_name,
1392                                    char *mdc_name)
1393 {
1394         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1395                            profile,lov_name,mdc_name,0);
1396 }
1397
1398 static int record_marker(const struct lu_env *env,
1399                          struct llog_handle *llh,
1400                          struct fs_db *fsdb, __u32 flags,
1401                          char *tgtname, char *comment)
1402 {
1403         struct mgs_thread_info *mgi = mgs_env_info(env);
1404         struct lustre_cfg *lcfg;
1405         int rc;
1406
1407         if (flags & CM_START)
1408                 fsdb->fsdb_gen++;
1409         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1410         mgi->mgi_marker.cm_flags = flags;
1411         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1412         strncpy(mgi->mgi_marker.cm_tgtname, tgtname,
1413                 sizeof(mgi->mgi_marker.cm_tgtname));
1414         strncpy(mgi->mgi_marker.cm_comment, comment,
1415                 sizeof(mgi->mgi_marker.cm_comment));
1416         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1417         mgi->mgi_marker.cm_canceltime = 0;
1418         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1419         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1420                             sizeof(mgi->mgi_marker));
1421         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1422         if (!lcfg)
1423                 return -ENOMEM;
1424         rc = record_lcfg(env, llh, lcfg);
1425
1426         lustre_cfg_free(lcfg);
1427         return rc;
1428 }
1429
1430 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1431                             struct llog_handle **llh, char *name)
1432 {
1433         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1434         struct llog_ctxt        *ctxt;
1435         int                      rc = 0;
1436
1437         if (*llh)
1438                 GOTO(out, rc = -EBUSY);
1439
1440         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1441         if (!ctxt)
1442                 GOTO(out, rc = -ENODEV);
1443         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1444
1445         rc = llog_open_create(env, ctxt, llh, NULL, name);
1446         if (rc)
1447                 GOTO(out_ctxt, rc);
1448         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1449         if (rc)
1450                 llog_close(env, *llh);
1451 out_ctxt:
1452         llog_ctxt_put(ctxt);
1453 out:
1454         if (rc) {
1455                 CERROR("%s: can't start log %s: rc = %d\n",
1456                        mgs->mgs_obd->obd_name, name, rc);
1457                 *llh = NULL;
1458         }
1459         RETURN(rc);
1460 }
1461
1462 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1463 {
1464         int rc;
1465
1466         rc = llog_close(env, *llh);
1467         *llh = NULL;
1468
1469         return rc;
1470 }
1471
1472 static int mgs_log_is_empty(const struct lu_env *env,
1473                             struct mgs_device *mgs, char *name)
1474 {
1475         struct llog_handle *llh;
1476         struct llog_ctxt *ctxt;
1477         int rc = 0;
1478
1479         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1480         LASSERT(ctxt != NULL);
1481         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1482         if (rc < 0) {
1483                 if (rc == -ENOENT)
1484                         rc = 0;
1485                 GOTO(out_ctxt, rc);
1486         }
1487
1488         llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1489         if (rc)
1490                 GOTO(out_close, rc);
1491         rc = llog_get_size(llh);
1492
1493 out_close:
1494         llog_close(env, llh);
1495 out_ctxt:
1496         llog_ctxt_put(ctxt);
1497         /* header is record 1 */
1498         return (rc <= 1);
1499 }
1500
1501 /******************** config "macros" *********************/
1502
1503 /* write an lcfg directly into a log (with markers) */
1504 static int mgs_write_log_direct(const struct lu_env *env,
1505                                 struct mgs_device *mgs, struct fs_db *fsdb,
1506                                 char *logname, struct lustre_cfg *lcfg,
1507                                 char *devname, char *comment)
1508 {
1509         struct llog_handle *llh = NULL;
1510         int rc;
1511         ENTRY;
1512
1513         if (!lcfg)
1514                 RETURN(-ENOMEM);
1515
1516         rc = record_start_log(env, mgs, &llh, logname);
1517         if (rc)
1518                 RETURN(rc);
1519
1520         /* FIXME These should be a single journal transaction */
1521         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1522         if (rc)
1523                 GOTO(out_end, rc);
1524         rc = record_lcfg(env, llh, lcfg);
1525         if (rc)
1526                 GOTO(out_end, rc);
1527         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1528         if (rc)
1529                 GOTO(out_end, rc);
1530 out_end:
1531         record_end_log(env, &llh);
1532         RETURN(rc);
1533 }
1534
1535 /* write the lcfg in all logs for the given fs */
1536 int mgs_write_log_direct_all(const struct lu_env *env,
1537                              struct mgs_device *mgs,
1538                              struct fs_db *fsdb,
1539                              struct mgs_target_info *mti,
1540                              struct lustre_cfg *lcfg,
1541                              char *devname, char *comment,
1542                              int server_only)
1543 {
1544         cfs_list_t list;
1545         struct mgs_direntry *dirent, *n;
1546         char *fsname = mti->mti_fsname;
1547         char *logname;
1548         int rc = 0, len = strlen(fsname);
1549         ENTRY;
1550
1551         /* We need to set params for any future logs
1552            as well. FIXME Append this file to every new log.
1553            Actually, we should store as params (text), not llogs.  Or
1554            in a database. */
1555         rc = name_create(&logname, fsname, "-params");
1556         if (rc)
1557                 RETURN(rc);
1558         if (mgs_log_is_empty(env, mgs, logname)) {
1559                 struct llog_handle *llh = NULL;
1560                 rc = record_start_log(env, mgs, &llh, logname);
1561                 record_end_log(env, &llh);
1562         }
1563         name_destroy(&logname);
1564         if (rc)
1565                 RETURN(rc);
1566
1567         /* Find all the logs in the CONFIGS directory */
1568         rc = class_dentry_readdir(env, mgs, &list);
1569         if (rc)
1570                 RETURN(rc);
1571
1572         /* Could use fsdb index maps instead of directory listing */
1573         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1574                 cfs_list_del(&dirent->list);
1575                 /* don't write to sptlrpc rule log */
1576                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1577                         goto next;
1578
1579                 /* caller wants write server logs only */
1580                 if (server_only && strstr(dirent->name, "-client") != NULL)
1581                         goto next;
1582
1583                 if (strncmp(fsname, dirent->name, len) == 0) {
1584                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1585                         /* Erase any old settings of this same parameter */
1586                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1587                                         devname, comment, CM_SKIP);
1588                         if (rc < 0)
1589                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1590                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1591                         /* Write the new one */
1592                         if (lcfg) {
1593                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1594                                                           dirent->name,
1595                                                           lcfg, devname,
1596                                                           comment);
1597                                 if (rc)
1598                                         CERROR("%s: writing log %s: rc = %d\n",
1599                                                mgs->mgs_obd->obd_name,
1600                                                dirent->name, rc);
1601                         }
1602                 }
1603 next:
1604                 mgs_direntry_free(dirent);
1605         }
1606
1607         RETURN(rc);
1608 }
1609
1610 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1611                                     struct mgs_device *mgs,
1612                                     struct fs_db *fsdb,
1613                                     struct mgs_target_info *mti,
1614                                     char *logname);
1615 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1616                                     struct mgs_device *mgs,
1617                                     struct fs_db *fsdb,
1618                                     struct mgs_target_info *mti,
1619                                     char *logname, char *suffix, char *lovname,
1620                                     enum lustre_sec_part sec_part, int flags);
1621 static int name_create_mdt_and_lov(char **logname, char **lovname,
1622                                    struct fs_db *fsdb, int i);
1623
1624 static int add_param(char *params, char *key, char *val)
1625 {
1626         char *start = params + strlen(params);
1627         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1628         int keylen = 0;
1629
1630         if (key != NULL)
1631                 keylen = strlen(key);
1632         if (start + 1 + keylen + strlen(val) >= end) {
1633                 CERROR("params are too long: %s %s%s\n",
1634                        params, key != NULL ? key : "", val);
1635                 return -EINVAL;
1636         }
1637
1638         sprintf(start, " %s%s", key != NULL ? key : "", val);
1639         return 0;
1640 }
1641
1642 static int mgs_steal_llog_handler(const struct lu_env *env,
1643                                   struct llog_handle *llh,
1644                                   struct llog_rec_hdr *rec, void *data)
1645 {
1646         struct mgs_device *mgs;
1647         struct obd_device *obd;
1648         struct mgs_target_info *mti, *tmti;
1649         struct fs_db *fsdb;
1650         int cfg_len = rec->lrh_len;
1651         char *cfg_buf = (char*) (rec + 1);
1652         struct lustre_cfg *lcfg;
1653         int rc = 0;
1654         struct llog_handle *mdt_llh = NULL;
1655         static int got_an_osc_or_mdc = 0;
1656         /* 0: not found any osc/mdc;
1657            1: found osc;
1658            2: found mdc;
1659         */
1660         static int last_step = -1;
1661
1662         ENTRY;
1663
1664         mti = ((struct temp_comp*)data)->comp_mti;
1665         tmti = ((struct temp_comp*)data)->comp_tmti;
1666         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1667         obd = ((struct temp_comp *)data)->comp_obd;
1668         mgs = lu2mgs_dev(obd->obd_lu_dev);
1669         LASSERT(mgs);
1670
1671         if (rec->lrh_type != OBD_CFG_REC) {
1672                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1673                 RETURN(-EINVAL);
1674         }
1675
1676         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1677         if (rc) {
1678                 CERROR("Insane cfg\n");
1679                 RETURN(rc);
1680         }
1681
1682         lcfg = (struct lustre_cfg *)cfg_buf;
1683
1684         if (lcfg->lcfg_command == LCFG_MARKER) {
1685                 struct cfg_marker *marker;
1686                 marker = lustre_cfg_buf(lcfg, 1);
1687                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1688                     (marker->cm_flags & CM_START)){
1689                         got_an_osc_or_mdc = 1;
1690                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1691                                 sizeof(tmti->mti_svname));
1692                         rc = record_start_log(env, mgs, &mdt_llh,
1693                                               mti->mti_svname);
1694                         if (rc)
1695                                 RETURN(rc);
1696                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1697                                            mti->mti_svname,"add osc(copied)");
1698                         record_end_log(env, &mdt_llh);
1699                         last_step = marker->cm_step;
1700                         RETURN(rc);
1701                 }
1702                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1703                     (marker->cm_flags & CM_END)){
1704                         LASSERT(last_step == marker->cm_step);
1705                         last_step = -1;
1706                         got_an_osc_or_mdc = 0;
1707                         rc = record_start_log(env, mgs, &mdt_llh,
1708                                               mti->mti_svname);
1709                         if (rc)
1710                                 RETURN(rc);
1711                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1712                                            mti->mti_svname,"add osc(copied)");
1713                         record_end_log(env, &mdt_llh);
1714                         RETURN(rc);
1715                 }
1716                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1717                     (marker->cm_flags & CM_START)){
1718                         got_an_osc_or_mdc = 2;
1719                         last_step = marker->cm_step;
1720                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1721                                strlen(marker->cm_tgtname));
1722
1723                         RETURN(rc);
1724                 }
1725                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1726                     (marker->cm_flags & CM_END)){
1727                         LASSERT(last_step == marker->cm_step);
1728                         last_step = -1;
1729                         got_an_osc_or_mdc = 0;
1730                         RETURN(rc);
1731                 }
1732         }
1733
1734         if (got_an_osc_or_mdc == 0 || last_step < 0)
1735                 RETURN(rc);
1736
1737         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1738                 uint64_t nodenid = lcfg->lcfg_nid;
1739
1740                 if (strlen(tmti->mti_uuid) == 0) {
1741                         /* target uuid not set, this config record is before
1742                          * LCFG_SETUP, this nid is one of target node nid.
1743                          */
1744                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1745                         tmti->mti_nid_count++;
1746                 } else {
1747                         /* failover node nid */
1748                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1749                                        libcfs_nid2str(nodenid));
1750                 }
1751
1752                 RETURN(rc);
1753         }
1754
1755         if (lcfg->lcfg_command == LCFG_SETUP) {
1756                 char *target;
1757
1758                 target = lustre_cfg_string(lcfg, 1);
1759                 memcpy(tmti->mti_uuid, target, strlen(target));
1760                 RETURN(rc);
1761         }
1762
1763         /* ignore client side sptlrpc_conf_log */
1764         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1765                 RETURN(rc);
1766
1767         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1768                 int index;
1769
1770                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1771                         RETURN (-EINVAL);
1772
1773                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1774                        strlen(mti->mti_fsname));
1775                 tmti->mti_stripe_index = index;
1776
1777                 rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, tmti,
1778                                               mti->mti_svname);
1779                 memset(tmti, 0, sizeof(*tmti));
1780                 RETURN(rc);
1781         }
1782
1783         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1784                 int index;
1785                 char mdt_index[9];
1786                 char *logname, *lovname;
1787
1788                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1789                                              mti->mti_stripe_index);
1790                 if (rc)
1791                         RETURN(rc);
1792                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1793
1794                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1795                         name_destroy(&logname);
1796                         name_destroy(&lovname);
1797                         RETURN(-EINVAL);
1798                 }
1799
1800                 tmti->mti_stripe_index = index;
1801                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1802                                          mdt_index, lovname,
1803                                          LUSTRE_SP_MDT, 0);
1804                 name_destroy(&logname);
1805                 name_destroy(&lovname);
1806                 RETURN(rc);
1807         }
1808         RETURN(rc);
1809 }
1810
1811 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1812 /* stealed from mgs_get_fsdb_from_llog*/
1813 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1814                                               struct mgs_device *mgs,
1815                                               char *client_name,
1816                                               struct temp_comp* comp)
1817 {
1818         struct llog_handle *loghandle;
1819         struct mgs_target_info *tmti;
1820         struct llog_ctxt *ctxt;
1821         int rc;
1822
1823         ENTRY;
1824
1825         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1826         LASSERT(ctxt != NULL);
1827
1828         OBD_ALLOC_PTR(tmti);
1829         if (tmti == NULL)
1830                 GOTO(out_ctxt, rc = -ENOMEM);
1831
1832         comp->comp_tmti = tmti;
1833         comp->comp_obd = mgs->mgs_obd;
1834
1835         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1836                        LLOG_OPEN_EXISTS);
1837         if (rc < 0) {
1838                 if (rc == -ENOENT)
1839                         rc = 0;
1840                 GOTO(out_pop, rc);
1841         }
1842
1843         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1844         if (rc)
1845                 GOTO(out_close, rc);
1846
1847         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1848                                   (void *)comp, NULL, false);
1849         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1850 out_close:
1851         llog_close(env, loghandle);
1852 out_pop:
1853         OBD_FREE_PTR(tmti);
1854 out_ctxt:
1855         llog_ctxt_put(ctxt);
1856         RETURN(rc);
1857 }
1858
1859 /* lmv is the second thing for client logs */
1860 /* copied from mgs_write_log_lov. Please refer to that.  */
1861 static int mgs_write_log_lmv(const struct lu_env *env,
1862                              struct mgs_device *mgs,
1863                              struct fs_db *fsdb,
1864                              struct mgs_target_info *mti,
1865                              char *logname, char *lmvname)
1866 {
1867         struct llog_handle *llh = NULL;
1868         struct lmv_desc *lmvdesc;
1869         char *uuid;
1870         int rc = 0;
1871         ENTRY;
1872
1873         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1874
1875         OBD_ALLOC_PTR(lmvdesc);
1876         if (lmvdesc == NULL)
1877                 RETURN(-ENOMEM);
1878         lmvdesc->ld_active_tgt_count = 0;
1879         lmvdesc->ld_tgt_count = 0;
1880         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1881         uuid = (char *)lmvdesc->ld_uuid.uuid;
1882
1883         rc = record_start_log(env, mgs, &llh, logname);
1884         if (rc)
1885                 GOTO(out_free, rc);
1886         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1887         if (rc)
1888                 GOTO(out_end, rc);
1889         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1890         if (rc)
1891                 GOTO(out_end, rc);
1892         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1893         if (rc)
1894                 GOTO(out_end, rc);
1895         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1896         if (rc)
1897                 GOTO(out_end, rc);
1898 out_end:
1899         record_end_log(env, &llh);
1900 out_free:
1901         OBD_FREE_PTR(lmvdesc);
1902         RETURN(rc);
1903 }
1904
1905 /* lov is the first thing in the mdt and client logs */
1906 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1907                              struct fs_db *fsdb, struct mgs_target_info *mti,
1908                              char *logname, char *lovname)
1909 {
1910         struct llog_handle *llh = NULL;
1911         struct lov_desc *lovdesc;
1912         char *uuid;
1913         int rc = 0;
1914         ENTRY;
1915
1916         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1917
1918         /*
1919         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1920         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1921               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1922         */
1923
1924         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1925         OBD_ALLOC_PTR(lovdesc);
1926         if (lovdesc == NULL)
1927                 RETURN(-ENOMEM);
1928         lovdesc->ld_magic = LOV_DESC_MAGIC;
1929         lovdesc->ld_tgt_count = 0;
1930         /* Defaults.  Can be changed later by lcfg config_param */
1931         lovdesc->ld_default_stripe_count = 1;
1932         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1933         lovdesc->ld_default_stripe_size = 1024 * 1024;
1934         lovdesc->ld_default_stripe_offset = -1;
1935         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1936         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1937         /* can these be the same? */
1938         uuid = (char *)lovdesc->ld_uuid.uuid;
1939
1940         /* This should always be the first entry in a log.
1941         rc = mgs_clear_log(obd, logname); */
1942         rc = record_start_log(env, mgs, &llh, logname);
1943         if (rc)
1944                 GOTO(out_free, rc);
1945         /* FIXME these should be a single journal transaction */
1946         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1947         if (rc)
1948                 GOTO(out_end, rc);
1949         rc = record_attach(env, llh, lovname, "lov", uuid);
1950         if (rc)
1951                 GOTO(out_end, rc);
1952         rc = record_lov_setup(env, llh, lovname, lovdesc);
1953         if (rc)
1954                 GOTO(out_end, rc);
1955         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1956         if (rc)
1957                 GOTO(out_end, rc);
1958         EXIT;
1959 out_end:
1960         record_end_log(env, &llh);
1961 out_free:
1962         OBD_FREE_PTR(lovdesc);
1963         return rc;
1964 }
1965
1966 /* add failnids to open log */
1967 static int mgs_write_log_failnids(const struct lu_env *env,
1968                                   struct mgs_target_info *mti,
1969                                   struct llog_handle *llh,
1970                                   char *cliname)
1971 {
1972         char *failnodeuuid = NULL;
1973         char *ptr = mti->mti_params;
1974         lnet_nid_t nid;
1975         int rc = 0;
1976
1977         /*
1978         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1979         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1980         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1981         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1982         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1983         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1984         */
1985
1986         /* Pull failnid info out of params string */
1987         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1988                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1989                         if (failnodeuuid == NULL) {
1990                                 /* We don't know the failover node name,
1991                                    so just use the first nid as the uuid */
1992                                 rc = name_create(&failnodeuuid,
1993                                                  libcfs_nid2str(nid), "");
1994                                 if (rc)
1995                                         return rc;
1996                         }
1997                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1998                                "client %s\n", libcfs_nid2str(nid),
1999                                failnodeuuid, cliname);
2000                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2001                 }
2002                 if (failnodeuuid)
2003                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2004         }
2005
2006         name_destroy(&failnodeuuid);
2007         return rc;
2008 }
2009
2010 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2011                                     struct mgs_device *mgs,
2012                                     struct fs_db *fsdb,
2013                                     struct mgs_target_info *mti,
2014                                     char *logname, char *lmvname)
2015 {
2016         struct llog_handle *llh = NULL;
2017         char *mdcname = NULL;
2018         char *nodeuuid = NULL;
2019         char *mdcuuid = NULL;
2020         char *lmvuuid = NULL;
2021         char index[6];
2022         int i, rc;
2023         ENTRY;
2024
2025         if (mgs_log_is_empty(env, mgs, logname)) {
2026                 CERROR("log is empty! Logical error\n");
2027                 RETURN(-EINVAL);
2028         }
2029
2030         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2031                mti->mti_svname, logname, lmvname);
2032
2033         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2034         if (rc)
2035                 RETURN(rc);
2036         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2037         if (rc)
2038                 GOTO(out_free, rc);
2039         rc = name_create(&mdcuuid, mdcname, "_UUID");
2040         if (rc)
2041                 GOTO(out_free, rc);
2042         rc = name_create(&lmvuuid, lmvname, "_UUID");
2043         if (rc)
2044                 GOTO(out_free, rc);
2045
2046         rc = record_start_log(env, mgs, &llh, logname);
2047         if (rc)
2048                 GOTO(out_free, rc);
2049         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2050                            "add mdc");
2051         if (rc)
2052                 GOTO(out_end, rc);
2053         for (i = 0; i < mti->mti_nid_count; i++) {
2054                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2055                        libcfs_nid2str(mti->mti_nids[i]));
2056
2057                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2058                 if (rc)
2059                         GOTO(out_end, rc);
2060         }
2061
2062         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2063         if (rc)
2064                 GOTO(out_end, rc);
2065         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2066         if (rc)
2067                 GOTO(out_end, rc);
2068         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2069         if (rc)
2070                 GOTO(out_end, rc);
2071         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2072         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2073                             index, "1");
2074         if (rc)
2075                 GOTO(out_end, rc);
2076         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2077                            "add mdc");
2078         if (rc)
2079                 GOTO(out_end, rc);
2080 out_end:
2081         record_end_log(env, &llh);
2082 out_free:
2083         name_destroy(&lmvuuid);
2084         name_destroy(&mdcuuid);
2085         name_destroy(&mdcname);
2086         name_destroy(&nodeuuid);
2087         RETURN(rc);
2088 }
2089
2090 /* add new mdc to already existent MDS */
2091 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
2092                                     struct mgs_device *mgs,
2093                                     struct fs_db *fsdb,
2094                                     struct mgs_target_info *mti,
2095                                     char *logname)
2096 {
2097         struct llog_handle *llh = NULL;
2098         char *nodeuuid = NULL;
2099         char *mdcname = NULL;
2100         char *mdcuuid = NULL;
2101         char *mdtuuid = NULL;
2102         int idx = mti->mti_stripe_index;
2103         char index[9];
2104         int i, rc;
2105
2106         ENTRY;
2107         if (mgs_log_is_empty(env, mgs, logname)) {
2108                 CERROR("log is empty! Logical error\n");
2109                 RETURN (-EINVAL);
2110         }
2111
2112         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
2113
2114         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2115         if (rc)
2116                 RETURN(rc);
2117         snprintf(index, sizeof(index), "-mdc%04x", idx);
2118         rc = name_create(&mdcname, logname, index);
2119         if (rc)
2120                 GOTO(out_free, rc);
2121         rc = name_create(&mdcuuid, mdcname, "_UUID");
2122         if (rc)
2123                 GOTO(out_free, rc);
2124         rc = name_create(&mdtuuid, logname, "_UUID");
2125         if (rc)
2126                 GOTO(out_free, rc);
2127
2128         rc = record_start_log(env, mgs, &llh, logname);
2129         if (rc)
2130                 GOTO(out_free, rc);
2131         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
2132         if (rc)
2133                 GOTO(out_end, rc);
2134         for (i = 0; i < mti->mti_nid_count; i++) {
2135                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2136                        libcfs_nid2str(mti->mti_nids[i]));
2137                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2138                 if (rc)
2139                         GOTO(out_end, rc);
2140         }
2141         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
2142         if (rc)
2143                 GOTO(out_end, rc);
2144         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2148         if (rc)
2149                 GOTO(out_end, rc);
2150         snprintf(index, sizeof(index), "%d", idx);
2151
2152         rc = record_mdc_add(env, llh, logname, mdcuuid, mti->mti_uuid,
2153                             index, "1");
2154         if (rc)
2155                 GOTO(out_end, rc);
2156         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add mdc");
2157         if (rc)
2158                 GOTO(out_end, rc);
2159 out_end:
2160         record_end_log(env, &llh);
2161 out_free:
2162         name_destroy(&mdtuuid);
2163         name_destroy(&mdcuuid);
2164         name_destroy(&mdcname);
2165         name_destroy(&nodeuuid);
2166         RETURN(rc);
2167 }
2168
2169 static int mgs_write_log_mdt0(const struct lu_env *env,
2170                               struct mgs_device *mgs,
2171                               struct fs_db *fsdb,
2172                               struct mgs_target_info *mti)
2173 {
2174         char *log = mti->mti_svname;
2175         struct llog_handle *llh = NULL;
2176         char *uuid, *lovname;
2177         char mdt_index[6];
2178         char *ptr = mti->mti_params;
2179         int rc = 0, failout = 0;
2180         ENTRY;
2181
2182         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2183         if (uuid == NULL)
2184                 RETURN(-ENOMEM);
2185
2186         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2187                 failout = (strncmp(ptr, "failout", 7) == 0);
2188
2189         rc = name_create(&lovname, log, "-mdtlov");
2190         if (rc)
2191                 GOTO(out_free, rc);
2192         if (mgs_log_is_empty(env, mgs, log)) {
2193                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2194                 if (rc)
2195                         GOTO(out_lod, rc);
2196         }
2197
2198         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2199
2200         rc = record_start_log(env, mgs, &llh, log);
2201         if (rc)
2202                 GOTO(out_lod, rc);
2203
2204         /* add MDT itself */
2205
2206         /* FIXME this whole fn should be a single journal transaction */
2207         sprintf(uuid, "%s_UUID", log);
2208         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2209         if (rc)
2210                 GOTO(out_lod, rc);
2211         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2212         if (rc)
2213                 GOTO(out_end, rc);
2214         rc = record_mount_opt(env, llh, log, lovname, NULL);
2215         if (rc)
2216                 GOTO(out_end, rc);
2217         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2218                         failout ? "n" : "f");
2219         if (rc)
2220                 GOTO(out_end, rc);
2221         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2222         if (rc)
2223                 GOTO(out_end, rc);
2224 out_end:
2225         record_end_log(env, &llh);
2226 out_lod:
2227         name_destroy(&lovname);
2228 out_free:
2229         OBD_FREE(uuid, sizeof(struct obd_uuid));
2230         RETURN(rc);
2231 }
2232
2233 static int name_create_mdt_and_lov(char **logname, char **lovname,
2234                                    struct fs_db *fsdb, int i)
2235 {
2236         int rc;
2237
2238         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2239         if (rc)
2240                 return rc;
2241         /* COMPAT_180 */
2242         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2243                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2244         else
2245                 rc = name_create(lovname, *logname, "-mdtlov");
2246         if (rc) {
2247                 name_destroy(logname);
2248                 *logname = NULL;
2249         }
2250         return rc;
2251 }
2252
2253 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2254                                        struct fs_db *fsdb, int i)
2255 {
2256         char suffix[16];
2257
2258         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2259                 sprintf(suffix, "-osc");
2260         else
2261                 sprintf(suffix, "-osc-MDT%04x", i);
2262         return name_create(oscname, ostname, suffix);
2263 }
2264
2265 /* envelope method for all layers log */
2266 static int mgs_write_log_mdt(const struct lu_env *env,
2267                              struct mgs_device *mgs,
2268                              struct fs_db *fsdb,
2269                              struct mgs_target_info *mti)
2270 {
2271         struct mgs_thread_info *mgi = mgs_env_info(env);
2272         struct llog_handle *llh = NULL;
2273         char *cliname;
2274         int rc, i = 0;
2275         ENTRY;
2276
2277         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2278
2279         if (mti->mti_uuid[0] == '\0') {
2280                 /* Make up our own uuid */
2281                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2282                          "%s_UUID", mti->mti_svname);
2283         }
2284
2285         /* add mdt */
2286         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2287         if (rc)
2288                 RETURN(rc);
2289         /* Append the mdt info to the client log */
2290         rc = name_create(&cliname, mti->mti_fsname, "-client");
2291         if (rc)
2292                 RETURN(rc);
2293
2294         if (mgs_log_is_empty(env, mgs, cliname)) {
2295                 /* Start client log */
2296                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2297                                        fsdb->fsdb_clilov);
2298                 if (rc)
2299                         GOTO(out_free, rc);
2300                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2301                                        fsdb->fsdb_clilmv);
2302                 if (rc)
2303                         GOTO(out_free, rc);
2304         }
2305
2306         /*
2307         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2308         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2309         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2310         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2311         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2312         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2313         */
2314
2315                 /* copy client info about lov/lmv */
2316                 mgi->mgi_comp.comp_mti = mti;
2317                 mgi->mgi_comp.comp_fsdb = fsdb;
2318
2319                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2320                                                         &mgi->mgi_comp);
2321                 if (rc)
2322                         GOTO(out_free, rc);
2323                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2324                                               fsdb->fsdb_clilmv);
2325                 if (rc)
2326                         GOTO(out_free, rc);
2327
2328                 /* add mountopts */
2329                 rc = record_start_log(env, mgs, &llh, cliname);
2330                 if (rc)
2331                         GOTO(out_free, rc);
2332
2333                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2334                                    "mount opts");
2335                 if (rc)
2336                         GOTO(out_end, rc);
2337                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2338                                       fsdb->fsdb_clilmv);
2339                 if (rc)
2340                         GOTO(out_end, rc);
2341                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2342                                    "mount opts");
2343
2344         if (rc)
2345                 GOTO(out_end, rc);
2346         /* for_all_existing_mdt except current one */
2347         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2348                 char *mdtname;
2349                 if (i !=  mti->mti_stripe_index &&
2350                     test_bit(i,  fsdb->fsdb_mdt_index_map)) {
2351                         rc = name_create_mdt(&mdtname, mti->mti_fsname, i);
2352                         if (rc)
2353                                 GOTO(out_end, rc);
2354                         rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, mti, mdtname);
2355                         name_destroy(&mdtname);
2356                         if (rc)
2357                                 GOTO(out_end, rc);
2358                 }
2359         }
2360 out_end:
2361         record_end_log(env, &llh);
2362 out_free:
2363         name_destroy(&cliname);
2364         RETURN(rc);
2365 }
2366
2367 /* Add the ost info to the client/mdt lov */
2368 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2369                                     struct mgs_device *mgs, struct fs_db *fsdb,
2370                                     struct mgs_target_info *mti,
2371                                     char *logname, char *suffix, char *lovname,
2372                                     enum lustre_sec_part sec_part, int flags)
2373 {
2374         struct llog_handle *llh = NULL;
2375         char *nodeuuid = NULL;
2376         char *oscname = NULL;
2377         char *oscuuid = NULL;
2378         char *lovuuid = NULL;
2379         char *svname = NULL;
2380         char index[6];
2381         int i, rc;
2382
2383         ENTRY;
2384         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2385                mti->mti_svname, logname);
2386
2387         if (mgs_log_is_empty(env, mgs, logname)) {
2388                 CERROR("log is empty! Logical error\n");
2389                 RETURN (-EINVAL);
2390         }
2391
2392         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2393         if (rc)
2394                 RETURN(rc);
2395         rc = name_create(&svname, mti->mti_svname, "-osc");
2396         if (rc)
2397                 GOTO(out_free, rc);
2398         /* for the system upgraded from old 1.8, keep using the old osc naming
2399          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2400         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2401                 rc = name_create(&oscname, svname, "");
2402         else
2403                 rc = name_create(&oscname, svname, suffix);
2404         if (rc)
2405                 GOTO(out_free, rc);
2406         rc = name_create(&oscuuid, oscname, "_UUID");
2407         if (rc)
2408                 GOTO(out_free, rc);
2409         rc = name_create(&lovuuid, lovname, "_UUID");
2410         if (rc)
2411                 GOTO(out_free, rc);
2412
2413         /*
2414         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2415         multihomed (#4)
2416         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2417         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2418         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2419         failover (#6,7)
2420         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2421         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2422         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2423         */
2424
2425         rc = record_start_log(env, mgs, &llh, logname);
2426         if (rc)
2427                 GOTO(out_free, rc);
2428
2429         /* FIXME these should be a single journal transaction */
2430         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2431                            "add osc");
2432         if (rc)
2433                 GOTO(out_end, rc);
2434
2435         /* NB: don't change record order, because upon MDT steal OSC config
2436          * from client, it treats all nids before LCFG_SETUP as target nids
2437          * (multiple interfaces), while nids after as failover node nids.
2438          * See mgs_steal_llog_handler() LCFG_ADD_UUID.
2439          */
2440         for (i = 0; i < mti->mti_nid_count; i++) {
2441                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2442                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2443                 if (rc)
2444                         GOTO(out_end, rc);
2445         }
2446         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2447         if (rc)
2448                 GOTO(out_end, rc);
2449         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2450         if (rc)
2451                 GOTO(out_end, rc);
2452         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2453         if (rc)
2454                 GOTO(out_end, rc);
2455         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2456         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2457         if (rc)
2458                 GOTO(out_end, rc);
2459         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2460                            "add osc");
2461         if (rc)
2462                 GOTO(out_end, rc);
2463 out_end:
2464         record_end_log(env, &llh);
2465 out_free:
2466         name_destroy(&lovuuid);
2467         name_destroy(&oscuuid);
2468         name_destroy(&oscname);
2469         name_destroy(&svname);
2470         name_destroy(&nodeuuid);
2471         RETURN(rc);
2472 }
2473
2474 static int mgs_write_log_ost(const struct lu_env *env,
2475                              struct mgs_device *mgs, struct fs_db *fsdb,
2476                              struct mgs_target_info *mti)
2477 {
2478         struct llog_handle *llh = NULL;
2479         char *logname, *lovname;
2480         char *ptr = mti->mti_params;
2481         int rc, flags = 0, failout = 0, i;
2482         ENTRY;
2483
2484         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2485
2486         /* The ost startup log */
2487
2488         /* If the ost log already exists, that means that someone reformatted
2489            the ost and it called target_add again. */
2490         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2491                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2492                                    "exists, yet the server claims it never "
2493                                    "registered. It may have been reformatted, "
2494                                    "or the index changed. writeconf the MDT to "
2495                                    "regenerate all logs.\n", mti->mti_svname);
2496                 RETURN(-EALREADY);
2497         }
2498
2499         /*
2500         attach obdfilter ost1 ost1_UUID
2501         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2502         */
2503         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2504                 failout = (strncmp(ptr, "failout", 7) == 0);
2505         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2506         if (rc)
2507                 RETURN(rc);
2508         /* FIXME these should be a single journal transaction */
2509         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2510         if (rc)
2511                 GOTO(out_end, rc);
2512         if (*mti->mti_uuid == '\0')
2513                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2514                          "%s_UUID", mti->mti_svname);
2515         rc = record_attach(env, llh, mti->mti_svname,
2516                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2517         if (rc)
2518                 GOTO(out_end, rc);
2519         rc = record_setup(env, llh, mti->mti_svname,
2520                           "dev"/*ignored*/, "type"/*ignored*/,
2521                           failout ? "n" : "f", 0/*options*/);
2522         if (rc)
2523                 GOTO(out_end, rc);
2524         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2525         if (rc)
2526                 GOTO(out_end, rc);
2527 out_end:
2528         record_end_log(env, &llh);
2529         if (rc)
2530                 RETURN(rc);
2531         /* We also have to update the other logs where this osc is part of
2532            the lov */
2533
2534         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2535                 /* If we're upgrading, the old mdt log already has our
2536                    entry. Let's do a fake one for fun. */
2537                 /* Note that we can't add any new failnids, since we don't
2538                    know the old osc names. */
2539                 flags = CM_SKIP | CM_UPGRADE146;
2540
2541         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2542                 /* If the update flag isn't set, don't update client/mdt
2543                    logs. */
2544                 flags |= CM_SKIP;
2545                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2546                               "the MDT first to regenerate it.\n",
2547                               mti->mti_svname);
2548         }
2549
2550         /* Add ost to all MDT lov defs */
2551         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2552                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2553                         char mdt_index[9];
2554
2555                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2556                                                      i);
2557                         if (rc)
2558                                 RETURN(rc);
2559                         sprintf(mdt_index, "-MDT%04x", i);
2560                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2561                                                       logname, mdt_index,
2562                                                       lovname, LUSTRE_SP_MDT,
2563                                                       flags);
2564                         name_destroy(&logname);
2565                         name_destroy(&lovname);
2566                         if (rc)
2567                                 RETURN(rc);
2568                 }
2569         }
2570
2571         /* Append ost info to the client log */
2572         rc = name_create(&logname, mti->mti_fsname, "-client");
2573         if (rc)
2574                 RETURN(rc);
2575         if (mgs_log_is_empty(env, mgs, logname)) {
2576                 /* Start client log */
2577                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2578                                        fsdb->fsdb_clilov);
2579                 if (rc)
2580                         GOTO(out_free, rc);
2581                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2582                                        fsdb->fsdb_clilmv);
2583                 if (rc)
2584                         GOTO(out_free, rc);
2585         }
2586         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2587                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2588 out_free:
2589         name_destroy(&logname);
2590         RETURN(rc);
2591 }
2592
2593 static __inline__ int mgs_param_empty(char *ptr)
2594 {
2595         char *tmp;
2596
2597         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2598                 return 1;
2599         return 0;
2600 }
2601
2602 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2603                                           struct mgs_device *mgs,
2604                                           struct fs_db *fsdb,
2605                                           struct mgs_target_info *mti,
2606                                           char *logname, char *cliname)
2607 {
2608         int rc;
2609         struct llog_handle *llh = NULL;
2610
2611         if (mgs_param_empty(mti->mti_params)) {
2612                 /* Remove _all_ failnids */
2613                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2614                                 mti->mti_svname, "add failnid", CM_SKIP);
2615                 return rc < 0 ? rc : 0;
2616         }
2617
2618         /* Otherwise failover nids are additive */
2619         rc = record_start_log(env, mgs, &llh, logname);
2620         if (rc)
2621                 return rc;
2622                 /* FIXME this should be a single journal transaction */
2623         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2624                            "add failnid");
2625         if (rc)
2626                 goto out_end;
2627         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2628         if (rc)
2629                 goto out_end;
2630         rc = record_marker(env, llh, fsdb, CM_END,
2631                            mti->mti_svname, "add failnid");
2632 out_end:
2633         record_end_log(env, &llh);
2634         return rc;
2635 }
2636
2637
2638 /* Add additional failnids to an existing log.
2639    The mdc/osc must have been added to logs first */
2640 /* tcp nids must be in dotted-quad ascii -
2641    we can't resolve hostnames from the kernel. */
2642 static int mgs_write_log_add_failnid(const struct lu_env *env,
2643                                      struct mgs_device *mgs,
2644                                      struct fs_db *fsdb,
2645                                      struct mgs_target_info *mti)
2646 {
2647         char *logname, *cliname;
2648         int rc;
2649         ENTRY;
2650
2651         /* FIXME we currently can't erase the failnids
2652          * given when a target first registers, since they aren't part of
2653          * an "add uuid" stanza */
2654
2655         /* Verify that we know about this target */
2656         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2657                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2658                                    "yet. It must be started before failnids "
2659                                    "can be added.\n", mti->mti_svname);
2660                 RETURN(-ENOENT);
2661         }
2662
2663         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2664         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2665                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2666         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2667                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2668         } else {
2669                 RETURN(-EINVAL);
2670         }
2671         if (rc)
2672                 RETURN(rc);
2673         /* Add failover nids to the client log */
2674         rc = name_create(&logname, mti->mti_fsname, "-client");
2675         if (rc) {
2676                 name_destroy(&cliname);
2677                 RETURN(rc);
2678         }
2679         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2680         name_destroy(&logname);
2681         name_destroy(&cliname);
2682         if (rc)
2683                 RETURN(rc);
2684
2685         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2686                 /* Add OST failover nids to the MDT logs as well */
2687                 int i;
2688
2689                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2690                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2691                                 continue;
2692                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2693                         if (rc)
2694                                 RETURN(rc);
2695                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2696                                                  fsdb, i);
2697                         if (rc) {
2698                                 name_destroy(&logname);
2699                                 RETURN(rc);
2700                         }
2701                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2702                                                             mti, logname,
2703                                                             cliname);
2704                         name_destroy(&cliname);
2705                         name_destroy(&logname);
2706                         if (rc)
2707                                 RETURN(rc);
2708                 }
2709         }
2710
2711         RETURN(rc);
2712 }
2713
2714 static int mgs_wlp_lcfg(const struct lu_env *env,
2715                         struct mgs_device *mgs, struct fs_db *fsdb,
2716                         struct mgs_target_info *mti,
2717                         char *logname, struct lustre_cfg_bufs *bufs,
2718                         char *tgtname, char *ptr)
2719 {
2720         char comment[MTI_NAME_MAXLEN];
2721         char *tmp;
2722         struct lustre_cfg *lcfg;
2723         int rc, del;
2724
2725         /* Erase any old settings of this same parameter */
2726         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2727         comment[MTI_NAME_MAXLEN - 1] = 0;
2728         /* But don't try to match the value. */
2729         if ((tmp = strchr(comment, '=')))
2730             *tmp = 0;
2731         /* FIXME we should skip settings that are the same as old values */
2732         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2733         if (rc < 0)
2734                 return rc;
2735         del = mgs_param_empty(ptr);
2736
2737         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2738                       "Sett" : "Modify", tgtname, comment, logname);
2739         if (del)
2740                 return rc;
2741
2742         lustre_cfg_bufs_reset(bufs, tgtname);
2743         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2744         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2745         if (!lcfg)
2746                 return -ENOMEM;
2747         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2748         lustre_cfg_free(lcfg);
2749         return rc;
2750 }
2751
2752 /* write global variable settings into log */
2753 static int mgs_write_log_sys(const struct lu_env *env,
2754                              struct mgs_device *mgs, struct fs_db *fsdb,
2755                              struct mgs_target_info *mti, char *sys, char *ptr)
2756 {
2757         struct mgs_thread_info *mgi = mgs_env_info(env);
2758         struct lustre_cfg *lcfg;
2759         char *tmp, sep;
2760         int rc, cmd, convert = 1;
2761
2762         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2763                 cmd = LCFG_SET_TIMEOUT;
2764         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2765                 cmd = LCFG_SET_LDLM_TIMEOUT;
2766         /* Check for known params here so we can return error to lctl */
2767         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2769                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2770                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2771                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2772                 cmd = LCFG_PARAM;
2773         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2774                 convert = 0; /* Don't convert string value to integer */
2775                 cmd = LCFG_PARAM;
2776         } else {
2777                 return -EINVAL;
2778         }
2779
2780         if (mgs_param_empty(ptr))
2781                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2782         else
2783                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2784
2785         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2786         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2787         if (!convert && *tmp != '\0')
2788                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2789         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2790         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2791         /* truncate the comment to the parameter name */
2792         ptr = tmp - 1;
2793         sep = *ptr;
2794         *ptr = '\0';
2795         /* modify all servers and clients */
2796         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2797                                       *tmp == '\0' ? NULL : lcfg,
2798                                       mti->mti_fsname, sys, 0);
2799         if (rc == 0 && *tmp != '\0') {
2800                 switch (cmd) {
2801                 case LCFG_SET_TIMEOUT:
2802                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2803                                 class_process_config(lcfg);
2804                         break;
2805                 case LCFG_SET_LDLM_TIMEOUT:
2806                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2807                                 class_process_config(lcfg);
2808                         break;
2809                 default:
2810                         break;
2811                 }
2812         }
2813         *ptr = sep;
2814         lustre_cfg_free(lcfg);
2815         return rc;
2816 }
2817
2818 /* write quota settings into log */
2819 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2820                                struct fs_db *fsdb, struct mgs_target_info *mti,
2821                                char *quota, char *ptr)
2822 {
2823         struct mgs_thread_info  *mgi = mgs_env_info(env);
2824         struct lustre_cfg       *lcfg;
2825         char                    *tmp;
2826         char                     sep;
2827         int                      rc, cmd = LCFG_PARAM;
2828
2829         /* support only 'meta' and 'data' pools so far */
2830         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2831             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2832                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2833                        "& quota.ost are)\n", ptr);
2834                 return -EINVAL;
2835         }
2836
2837         if (*tmp == '\0') {
2838                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2839         } else {
2840                 CDEBUG(D_MGS, "global '%s'\n", quota);
2841
2842                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2843                     strcmp(tmp, "none") != 0) {
2844                         CERROR("enable option(%s) isn't supported\n", tmp);
2845                         return -EINVAL;
2846                 }
2847         }
2848
2849         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2850         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2851         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2852         /* truncate the comment to the parameter name */
2853         ptr = tmp - 1;
2854         sep = *ptr;
2855         *ptr = '\0';
2856
2857         /* XXX we duplicated quota enable information in all server
2858          *     config logs, it should be moved to a separate config
2859          *     log once we cleanup the config log for global param. */
2860         /* modify all servers */
2861         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2862                                       *tmp == '\0' ? NULL : lcfg,
2863                                       mti->mti_fsname, quota, 1);
2864         *ptr = sep;
2865         lustre_cfg_free(lcfg);
2866         return rc;
2867 }
2868
2869 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2870                                    struct mgs_device *mgs,
2871                                    struct fs_db *fsdb,
2872                                    struct mgs_target_info *mti,
2873                                    char *param)
2874 {
2875         struct mgs_thread_info *mgi = mgs_env_info(env);
2876         struct llog_handle     *llh = NULL;
2877         char                   *logname;
2878         char                   *comment, *ptr;
2879         struct lustre_cfg      *lcfg;
2880         int                     rc, len;
2881         ENTRY;
2882
2883         /* get comment */
2884         ptr = strchr(param, '=');
2885         LASSERT(ptr);
2886         len = ptr - param;
2887
2888         OBD_ALLOC(comment, len + 1);
2889         if (comment == NULL)
2890                 RETURN(-ENOMEM);
2891         strncpy(comment, param, len);
2892         comment[len] = '\0';
2893
2894         /* prepare lcfg */
2895         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2896         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2897         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2898         if (lcfg == NULL)
2899                 GOTO(out_comment, rc = -ENOMEM);
2900
2901         /* construct log name */
2902         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2903         if (rc)
2904                 GOTO(out_lcfg, rc);
2905
2906         if (mgs_log_is_empty(env, mgs, logname)) {
2907                 rc = record_start_log(env, mgs, &llh, logname);
2908                 if (rc)
2909                         GOTO(out, rc);
2910                 record_end_log(env, &llh);
2911         }
2912
2913         /* obsolete old one */
2914         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2915                         comment, CM_SKIP);
2916         if (rc < 0)
2917                 GOTO(out, rc);
2918         /* write the new one */
2919         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2920                                   mti->mti_svname, comment);
2921         if (rc)
2922                 CERROR("err %d writing log %s\n", rc, logname);
2923 out:
2924         name_destroy(&logname);
2925 out_lcfg:
2926         lustre_cfg_free(lcfg);
2927 out_comment:
2928         OBD_FREE(comment, len + 1);
2929         RETURN(rc);
2930 }
2931
2932 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2933                                         char *param)
2934 {
2935         char    *ptr;
2936
2937         /* disable the adjustable udesc parameter for now, i.e. use default
2938          * setting that client always ship udesc to MDT if possible. to enable
2939          * it simply remove the following line */
2940         goto error_out;
2941
2942         ptr = strchr(param, '=');
2943         if (ptr == NULL)
2944                 goto error_out;
2945         *ptr++ = '\0';
2946
2947         if (strcmp(param, PARAM_SRPC_UDESC))
2948                 goto error_out;
2949
2950         if (strcmp(ptr, "yes") == 0) {
2951                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2952                 CWARN("Enable user descriptor shipping from client to MDT\n");
2953         } else if (strcmp(ptr, "no") == 0) {
2954                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2955                 CWARN("Disable user descriptor shipping from client to MDT\n");
2956         } else {
2957                 *(ptr - 1) = '=';
2958                 goto error_out;
2959         }
2960         return 0;
2961
2962 error_out:
2963         CERROR("Invalid param: %s\n", param);
2964         return -EINVAL;
2965 }
2966
2967 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2968                                   const char *svname,
2969                                   char *param)
2970 {
2971         struct sptlrpc_rule      rule;
2972         struct sptlrpc_rule_set *rset;
2973         int                      rc;
2974         ENTRY;
2975
2976         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2977                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2978                 RETURN(-EINVAL);
2979         }
2980
2981         if (strncmp(param, PARAM_SRPC_UDESC,
2982                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2983                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2984         }
2985
2986         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2987                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2988                 RETURN(-EINVAL);
2989         }
2990
2991         param += sizeof(PARAM_SRPC_FLVR) - 1;
2992
2993         rc = sptlrpc_parse_rule(param, &rule);
2994         if (rc)
2995                 RETURN(rc);
2996
2997         /* mgs rules implies must be mgc->mgs */
2998         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2999                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3000                      rule.sr_from != LUSTRE_SP_ANY) ||
3001                     (rule.sr_to != LUSTRE_SP_MGS &&
3002                      rule.sr_to != LUSTRE_SP_ANY))
3003                         RETURN(-EINVAL);
3004         }
3005
3006         /* preapre room for this coming rule. svcname format should be:
3007          * - fsname: general rule
3008          * - fsname-tgtname: target-specific rule
3009          */
3010         if (strchr(svname, '-')) {
3011                 struct mgs_tgt_srpc_conf *tgtconf;
3012                 int                       found = 0;
3013
3014                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3015                      tgtconf = tgtconf->mtsc_next) {
3016                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3017                                 found = 1;
3018                                 break;
3019                         }
3020                 }
3021
3022                 if (!found) {
3023                         int name_len;
3024
3025                         OBD_ALLOC_PTR(tgtconf);
3026                         if (tgtconf == NULL)
3027                                 RETURN(-ENOMEM);
3028
3029                         name_len = strlen(svname);
3030
3031                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3032                         if (tgtconf->mtsc_tgt == NULL) {
3033                                 OBD_FREE_PTR(tgtconf);
3034                                 RETURN(-ENOMEM);
3035                         }
3036                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3037
3038                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3039                         fsdb->fsdb_srpc_tgt = tgtconf;
3040                 }
3041
3042                 rset = &tgtconf->mtsc_rset;
3043         } else {
3044                 rset = &fsdb->fsdb_srpc_gen;
3045         }
3046
3047         rc = sptlrpc_rule_set_merge(rset, &rule);
3048
3049         RETURN(rc);
3050 }
3051
3052 static int mgs_srpc_set_param(const struct lu_env *env,
3053                               struct mgs_device *mgs,
3054                               struct fs_db *fsdb,
3055                               struct mgs_target_info *mti,
3056                               char *param)
3057 {
3058         char                   *copy;
3059         int                     rc, copy_size;
3060         ENTRY;
3061
3062 #ifndef HAVE_GSS
3063         RETURN(-EINVAL);
3064 #endif
3065         /* keep a copy of original param, which could be destroied
3066          * during parsing */
3067         copy_size = strlen(param) + 1;
3068         OBD_ALLOC(copy, copy_size);
3069         if (copy == NULL)
3070                 return -ENOMEM;
3071         memcpy(copy, param, copy_size);
3072
3073         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3074         if (rc)
3075                 goto out_free;
3076
3077         /* previous steps guaranteed the syntax is correct */
3078         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3079         if (rc)
3080                 goto out_free;
3081
3082         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3083                 /*
3084                  * for mgs rules, make them effective immediately.
3085                  */
3086                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3087                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3088                                                  &fsdb->fsdb_srpc_gen);
3089         }
3090
3091 out_free:
3092         OBD_FREE(copy, copy_size);
3093         RETURN(rc);
3094 }
3095
3096 struct mgs_srpc_read_data {
3097         struct fs_db   *msrd_fsdb;
3098         int             msrd_skip;
3099 };
3100
3101 static int mgs_srpc_read_handler(const struct lu_env *env,
3102                                  struct llog_handle *llh,
3103                                  struct llog_rec_hdr *rec, void *data)
3104 {
3105         struct mgs_srpc_read_data *msrd = data;
3106         struct cfg_marker         *marker;
3107         struct lustre_cfg         *lcfg = REC_DATA(rec);
3108         char                      *svname, *param;
3109         int                        cfg_len, rc;
3110         ENTRY;
3111
3112         if (rec->lrh_type != OBD_CFG_REC) {
3113                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3114                 RETURN(-EINVAL);
3115         }
3116
3117         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3118                   sizeof(struct llog_rec_tail);
3119
3120         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3121         if (rc) {
3122                 CERROR("Insane cfg\n");
3123                 RETURN(rc);
3124         }
3125
3126         if (lcfg->lcfg_command == LCFG_MARKER) {
3127                 marker = lustre_cfg_buf(lcfg, 1);
3128
3129                 if (marker->cm_flags & CM_START &&
3130                     marker->cm_flags & CM_SKIP)
3131                         msrd->msrd_skip = 1;
3132                 if (marker->cm_flags & CM_END)
3133                         msrd->msrd_skip = 0;
3134
3135                 RETURN(0);
3136         }
3137
3138         if (msrd->msrd_skip)
3139                 RETURN(0);
3140
3141         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3142                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3143                 RETURN(0);
3144         }
3145
3146         svname = lustre_cfg_string(lcfg, 0);
3147         if (svname == NULL) {
3148                 CERROR("svname is empty\n");
3149                 RETURN(0);
3150         }
3151
3152         param = lustre_cfg_string(lcfg, 1);
3153         if (param == NULL) {
3154                 CERROR("param is empty\n");
3155                 RETURN(0);
3156         }
3157
3158         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3159         if (rc)
3160                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3161
3162         RETURN(0);
3163 }
3164
3165 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3166                                 struct mgs_device *mgs,
3167                                 struct fs_db *fsdb)
3168 {
3169         struct llog_handle        *llh = NULL;
3170         struct llog_ctxt          *ctxt;
3171         char                      *logname;
3172         struct mgs_srpc_read_data  msrd;
3173         int                        rc;
3174         ENTRY;
3175
3176         /* construct log name */
3177         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3178         if (rc)
3179                 RETURN(rc);
3180
3181         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3182         LASSERT(ctxt != NULL);
3183
3184         if (mgs_log_is_empty(env, mgs, logname))
3185                 GOTO(out, rc = 0);
3186
3187         rc = llog_open(env, ctxt, &llh, NULL, logname,
3188                        LLOG_OPEN_EXISTS);
3189         if (rc < 0) {
3190                 if (rc == -ENOENT)
3191                         rc = 0;
3192                 GOTO(out, rc);
3193         }
3194
3195         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3196         if (rc)
3197                 GOTO(out_close, rc);
3198
3199         if (llog_get_size(llh) <= 1)
3200                 GOTO(out_close, rc = 0);
3201
3202         msrd.msrd_fsdb = fsdb;
3203         msrd.msrd_skip = 0;
3204
3205         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3206                           NULL);
3207
3208 out_close:
3209         llog_close(env, llh);
3210 out:
3211         llog_ctxt_put(ctxt);
3212         name_destroy(&logname);
3213
3214         if (rc)
3215                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3216         RETURN(rc);
3217 }
3218
3219 /* Permanent settings of all parameters by writing into the appropriate
3220  * configuration logs.
3221  * A parameter with null value ("<param>='\0'") means to erase it out of
3222  * the logs.
3223  */
3224 static int mgs_write_log_param(const struct lu_env *env,
3225                                struct mgs_device *mgs, struct fs_db *fsdb,
3226                                struct mgs_target_info *mti, char *ptr)
3227 {
3228         struct mgs_thread_info *mgi = mgs_env_info(env);
3229         char *logname;
3230         char *tmp;
3231         int rc = 0, rc2 = 0;
3232         ENTRY;
3233
3234         /* For various parameter settings, we have to figure out which logs
3235            care about them (e.g. both mdt and client for lov settings) */
3236         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3237
3238         /* The params are stored in MOUNT_DATA_FILE and modified via
3239            tunefs.lustre, or set using lctl conf_param */
3240
3241         /* Processed in lustre_start_mgc */
3242         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3243                 GOTO(end, rc);
3244
3245         /* Processed in ost/mdt */
3246         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3247                 GOTO(end, rc);
3248
3249         /* Processed in mgs_write_log_ost */
3250         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3251                 if (mti->mti_flags & LDD_F_PARAM) {
3252                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3253                                            "changed with tunefs.lustre"
3254                                            "and --writeconf\n", ptr);
3255                         rc = -EPERM;
3256                 }
3257                 GOTO(end, rc);
3258         }
3259
3260         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3261                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3262                 GOTO(end, rc);
3263         }
3264
3265         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3266                 /* Add a failover nidlist */
3267                 rc = 0;
3268                 /* We already processed failovers params for new
3269                    targets in mgs_write_log_target */
3270                 if (mti->mti_flags & LDD_F_PARAM) {
3271                         CDEBUG(D_MGS, "Adding failnode\n");
3272                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3273                 }
3274                 GOTO(end, rc);
3275         }
3276
3277         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3278                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3279                 GOTO(end, rc);
3280         }
3281
3282         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3283                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3284                 GOTO(end, rc);
3285         }
3286
3287         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3288                 /* active=0 means off, anything else means on */
3289                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3290                 int i;
3291
3292                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3293                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3294                                            "be (de)activated.\n",
3295                                            mti->mti_svname);
3296                         GOTO(end, rc = -EINVAL);
3297                 }
3298                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3299                               flag ? "de": "re", mti->mti_svname);
3300                 /* Modify clilov */
3301                 rc = name_create(&logname, mti->mti_fsname, "-client");
3302                 if (rc)
3303                         GOTO(end, rc);
3304                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3305                                 mti->mti_svname, "add osc", flag);
3306                 name_destroy(&logname);
3307                 if (rc)
3308                         goto active_err;
3309                 /* Modify mdtlov */
3310                 /* Add to all MDT logs for CMD */
3311                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3312                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3313                                 continue;
3314                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3315                         if (rc)
3316                                 GOTO(end, rc);
3317                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3318                                         mti->mti_svname, "add osc", flag);
3319                         name_destroy(&logname);
3320                         if (rc)
3321                                 goto active_err;
3322                 }
3323         active_err:
3324                 if (rc) {
3325                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3326                                            "log (%d). No permanent "
3327                                            "changes were made to the "
3328                                            "config log.\n",
3329                                            mti->mti_svname, rc);
3330                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3331                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3332                                                    " because the log"
3333                                                    "is in the old 1.4"
3334                                                    "style. Consider "
3335                                                    " --writeconf to "
3336                                                    "update the logs.\n");
3337                         GOTO(end, rc);
3338                 }
3339                 /* Fall through to osc proc for deactivating live OSC
3340                    on running MDT / clients. */
3341         }
3342         /* Below here, let obd's XXX_process_config methods handle it */
3343
3344         /* All lov. in proc */
3345         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3346                 char *mdtlovname;
3347
3348                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3349                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3350                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3351                                            "set on the MDT, not %s. "
3352                                            "Ignoring.\n",
3353                                            mti->mti_svname);
3354                         GOTO(end, rc = 0);
3355                 }
3356
3357                 /* Modify mdtlov */
3358                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3359                         GOTO(end, rc = -ENODEV);
3360
3361                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3362                                              mti->mti_stripe_index);
3363                 if (rc)
3364                         GOTO(end, rc);
3365                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3366                                   &mgi->mgi_bufs, mdtlovname, ptr);
3367                 name_destroy(&logname);
3368                 name_destroy(&mdtlovname);
3369                 if (rc)
3370                         GOTO(end, rc);
3371
3372                 /* Modify clilov */
3373                 rc = name_create(&logname, mti->mti_fsname, "-client");
3374                 if (rc)
3375                         GOTO(end, rc);
3376                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3377                                   fsdb->fsdb_clilov, ptr);
3378                 name_destroy(&logname);
3379                 GOTO(end, rc);
3380         }
3381
3382         /* All osc., mdc., llite. params in proc */
3383         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3384             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3385             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3386                 char *cname;
3387
3388                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3389                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3390                                            " cannot be modified. Consider"
3391                                            " updating the configuration with"
3392                                            " --writeconf\n",
3393                                            mti->mti_svname);
3394                         GOTO(end, rc = -EINVAL);
3395                 }
3396                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3397                         rc = name_create(&cname, mti->mti_fsname, "-client");
3398                         /* Add the client type to match the obdname in
3399                            class_config_llog_handler */
3400                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3401                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3402                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3403                         rc = name_create(&cname, mti->mti_svname, "-osc");
3404                 } else {
3405                         GOTO(end, rc = -EINVAL);
3406                 }
3407                 if (rc)
3408                         GOTO(end, rc);
3409
3410                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3411
3412                 /* Modify client */
3413                 rc = name_create(&logname, mti->mti_fsname, "-client");
3414                 if (rc) {
3415                         name_destroy(&cname);
3416                         GOTO(end, rc);
3417                 }
3418                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3419                                   cname, ptr);
3420
3421                 /* osc params affect the MDT as well */
3422                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3423                         int i;
3424
3425                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3426                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3427                                         continue;
3428                                 name_destroy(&cname);
3429                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3430                                                          fsdb, i);
3431                                 name_destroy(&logname);
3432                                 if (rc)
3433                                         break;
3434                                 rc = name_create_mdt(&logname,
3435                                                      mti->mti_fsname, i);
3436                                 if (rc)
3437                                         break;
3438                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3439                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3440                                                           mti, logname,
3441                                                           &mgi->mgi_bufs,
3442                                                           cname, ptr);
3443                                         if (rc)
3444                                                 break;
3445                                 }
3446                         }
3447                 }
3448                 name_destroy(&logname);
3449                 name_destroy(&cname);
3450                 GOTO(end, rc);
3451         }
3452
3453         /* All mdt. params in proc */
3454         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3455                 int i;
3456                 __u32 idx;
3457
3458                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3459                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3460                             MTI_NAME_MAXLEN) == 0)
3461                         /* device is unspecified completely? */
3462                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3463                 else
3464                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3465                 if (rc < 0)
3466                         goto active_err;
3467                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3468                         goto active_err;
3469                 if (rc & LDD_F_SV_ALL) {
3470                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3471                                 if (!test_bit(i,
3472                                                   fsdb->fsdb_mdt_index_map))
3473                                         continue;
3474                                 rc = name_create_mdt(&logname,
3475                                                 mti->mti_fsname, i);
3476                                 if (rc)
3477                                         goto active_err;
3478                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3479                                                   logname, &mgi->mgi_bufs,
3480                                                   logname, ptr);
3481                                 name_destroy(&logname);
3482                                 if (rc)
3483                                         goto active_err;
3484                         }
3485                 } else {
3486                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3487                                           mti->mti_svname, &mgi->mgi_bufs,
3488                                           mti->mti_svname, ptr);
3489                         if (rc)
3490                                 goto active_err;
3491                 }
3492                 GOTO(end, rc);
3493         }
3494
3495         /* All mdd., ost. params in proc */
3496         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3497             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
3498                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3499                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3500                         GOTO(end, rc = -ENODEV);
3501
3502                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3503                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3504                 GOTO(end, rc);
3505         }
3506
3507         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3508         rc2 = -ENOSYS;
3509
3510 end:
3511         if (rc)
3512                 CERROR("err %d on param '%s'\n", rc, ptr);
3513
3514         RETURN(rc ?: rc2);
3515 }
3516
3517 /* Not implementing automatic failover nid addition at this time. */
3518 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3519                       struct mgs_target_info *mti)
3520 {
3521 #if 0
3522         struct fs_db *fsdb;
3523         int rc;
3524         ENTRY;
3525
3526         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3527         if (rc)
3528                 RETURN(rc);
3529
3530         if (mgs_log_is_empty(obd, mti->mti_svname))
3531                 /* should never happen */
3532                 RETURN(-ENOENT);
3533
3534         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3535
3536         /* FIXME We can just check mti->params to see if we're already in
3537            the failover list.  Modify mti->params for rewriting back at
3538            server_register_target(). */
3539
3540         mutex_lock(&fsdb->fsdb_mutex);
3541         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3542         mutex_unlock(&fsdb->fsdb_mutex);
3543
3544         RETURN(rc);
3545 #endif
3546         return 0;
3547 }
3548
3549 int mgs_write_log_target(const struct lu_env *env,
3550                          struct mgs_device *mgs,
3551                          struct mgs_target_info *mti,
3552                          struct fs_db *fsdb)
3553 {
3554         int rc = -EINVAL;
3555         char *buf, *params;
3556         ENTRY;
3557
3558         /* set/check the new target index */
3559         rc = mgs_set_index(env, mgs, mti);
3560         if (rc < 0) {
3561                 CERROR("Can't get index (%d)\n", rc);
3562                 RETURN(rc);
3563         }
3564
3565         if (rc == EALREADY) {
3566                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3567                               mti->mti_stripe_index, mti->mti_svname);
3568                 /* We would like to mark old log sections as invalid
3569                    and add new log sections in the client and mdt logs.
3570                    But if we add new sections, then live clients will
3571                    get repeat setup instructions for already running
3572                    osc's. So don't update the client/mdt logs. */
3573                 mti->mti_flags &= ~LDD_F_UPDATE;
3574         }
3575
3576         mutex_lock(&fsdb->fsdb_mutex);
3577
3578         if (mti->mti_flags &
3579             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3580                 /* Generate a log from scratch */
3581                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3582                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3583                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3584                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3585                 } else {
3586                         CERROR("Unknown target type %#x, can't create log for "
3587                                "%s\n", mti->mti_flags, mti->mti_svname);
3588                 }
3589                 if (rc) {
3590                         CERROR("Can't write logs for %s (%d)\n",
3591                                mti->mti_svname, rc);
3592                         GOTO(out_up, rc);
3593                 }
3594         } else {
3595                 /* Just update the params from tunefs in mgs_write_log_params */
3596                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3597                 mti->mti_flags |= LDD_F_PARAM;
3598         }
3599
3600         /* allocate temporary buffer, where class_get_next_param will
3601            make copy of a current  parameter */
3602         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3603         if (buf == NULL)
3604                 GOTO(out_up, rc = -ENOMEM);
3605         params = mti->mti_params;
3606         while (params != NULL) {
3607                 rc = class_get_next_param(&params, buf);
3608                 if (rc) {
3609                         if (rc == 1)
3610                                 /* there is no next parameter, that is
3611                                    not an error */
3612                                 rc = 0;
3613                         break;
3614                 }
3615                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3616                        params, buf);
3617                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3618                 if (rc)
3619                         break;
3620         }
3621
3622         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3623
3624 out_up:
3625         mutex_unlock(&fsdb->fsdb_mutex);
3626         RETURN(rc);
3627 }
3628
3629 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3630 {
3631         struct llog_ctxt        *ctxt;
3632         int                      rc = 0;
3633
3634         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3635         if (ctxt == NULL) {
3636                 CERROR("%s: MGS config context doesn't exist\n",
3637                        mgs->mgs_obd->obd_name);
3638                 rc = -ENODEV;
3639         } else {
3640                 rc = llog_erase(env, ctxt, NULL, name);
3641                 /* llog may not exist */
3642                 if (rc == -ENOENT)
3643                         rc = 0;
3644                 llog_ctxt_put(ctxt);
3645         }
3646
3647         if (rc)
3648                 CERROR("%s: failed to clear log %s: %d\n",
3649                        mgs->mgs_obd->obd_name, name, rc);
3650
3651         return rc;
3652 }
3653
3654 /* erase all logs for the given fs */
3655 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3656 {
3657         struct fs_db *fsdb;
3658         cfs_list_t list;
3659         struct mgs_direntry *dirent, *n;
3660         int rc, len = strlen(fsname);
3661         char *suffix;
3662         ENTRY;
3663
3664         /* Find all the logs in the CONFIGS directory */
3665         rc = class_dentry_readdir(env, mgs, &list);
3666         if (rc)
3667                 RETURN(rc);
3668
3669         mutex_lock(&mgs->mgs_mutex);
3670
3671         /* Delete the fs db */
3672         fsdb = mgs_find_fsdb(mgs, fsname);
3673         if (fsdb)
3674                 mgs_free_fsdb(mgs, fsdb);
3675
3676         mutex_unlock(&mgs->mgs_mutex);
3677
3678         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3679                 cfs_list_del(&dirent->list);
3680                 suffix = strrchr(dirent->name, '-');
3681                 if (suffix != NULL) {
3682                         if ((len == suffix - dirent->name) &&
3683                             (strncmp(fsname, dirent->name, len) == 0)) {
3684                                 CDEBUG(D_MGS, "Removing log %s\n",
3685                                        dirent->name);
3686                                 mgs_erase_log(env, mgs, dirent->name);
3687                         }
3688                 }
3689                 mgs_direntry_free(dirent);
3690         }
3691
3692         RETURN(rc);
3693 }
3694
3695 /* from llog_swab */
3696 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3697 {
3698         int i;
3699         ENTRY;
3700
3701         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3702         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3703
3704         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3705         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3706         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3707         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3708
3709         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3710         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3711                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3712                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3713                                i, lcfg->lcfg_buflens[i],
3714                                lustre_cfg_string(lcfg, i));
3715                 }
3716         EXIT;
3717 }
3718
3719 /* Set a permanent (config log) param for a target or fs
3720  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3721  *             buf1 contains the single parameter
3722  */
3723 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3724                  struct lustre_cfg *lcfg, char *fsname)
3725 {
3726         struct fs_db *fsdb;
3727         struct mgs_target_info *mti;
3728         char *devname, *param;
3729         char *ptr, *tmp;
3730         __u32 index;
3731         int rc = 0;
3732         ENTRY;
3733
3734         print_lustre_cfg(lcfg);
3735
3736         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3737         devname = lustre_cfg_string(lcfg, 0);
3738         param = lustre_cfg_string(lcfg, 1);
3739         if (!devname) {
3740                 /* Assume device name embedded in param:
3741                    lustre-OST0000.osc.max_dirty_mb=32 */
3742                 ptr = strchr(param, '.');
3743                 if (ptr) {
3744                         devname = param;
3745                         *ptr = 0;
3746                         param = ptr + 1;
3747                 }
3748         }
3749         if (!devname) {
3750                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3751                 RETURN(-ENOSYS);
3752         }
3753
3754         rc = mgs_parse_devname(devname, fsname, NULL);
3755         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3756                 /* param related to llite isn't allowed to set by OST or MDT */
3757                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3758                                    sizeof(PARAM_LLITE)) == 0)
3759                         RETURN(-EINVAL);
3760         } else {
3761                 /* assume devname is the fsname */
3762                 memset(fsname, 0, MTI_NAME_MAXLEN);
3763                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3764                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3765         }
3766         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3767
3768         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3769         if (rc)
3770                 RETURN(rc);
3771         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3772             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3773                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3774                        "is '%s'\n", fsname, devname);
3775                 mgs_free_fsdb(mgs, fsdb);
3776                 RETURN(-EINVAL);
3777         }
3778
3779         /* Create a fake mti to hold everything */
3780         OBD_ALLOC_PTR(mti);
3781         if (!mti)
3782                 GOTO(out, rc = -ENOMEM);
3783         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3784         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3785         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3786         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3787         if (rc < 0)
3788                 /* Not a valid server; may be only fsname */
3789                 rc = 0;
3790         else
3791                 /* Strip -osc or -mdc suffix from svname */
3792                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3793                                      mti->mti_svname))
3794                         GOTO(out, rc = -EINVAL);
3795
3796         mti->mti_flags = rc | LDD_F_PARAM;
3797
3798         mutex_lock(&fsdb->fsdb_mutex);
3799         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3800         mutex_unlock(&fsdb->fsdb_mutex);
3801
3802         /*
3803          * Revoke lock so everyone updates.  Should be alright if
3804          * someone was already reading while we were updating the logs,
3805          * so we don't really need to hold the lock while we're
3806          * writing (above).
3807          */
3808         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3809 out:
3810         OBD_FREE_PTR(mti);
3811         RETURN(rc);
3812 }
3813
3814 static int mgs_write_log_pool(const struct lu_env *env,
3815                               struct mgs_device *mgs, char *logname,
3816                               struct fs_db *fsdb, char *lovname,
3817                               enum lcfg_command_type cmd,
3818                               char *poolname, char *fsname,
3819                               char *ostname, char *comment)
3820 {
3821         struct llog_handle *llh = NULL;
3822         int rc;
3823
3824         rc = record_start_log(env, mgs, &llh, logname);
3825         if (rc)
3826                 return rc;
3827         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3828         if (rc)
3829                 goto out;
3830         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3831         if (rc)
3832                 goto out;
3833         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3834 out:
3835         record_end_log(env, &llh);
3836         return rc;
3837 }
3838
3839 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3840                  enum lcfg_command_type cmd, char *fsname,
3841                  char *poolname, char *ostname)
3842 {
3843         struct fs_db *fsdb;
3844         char *lovname;
3845         char *logname;
3846         char *label = NULL, *canceled_label = NULL;
3847         int label_sz;
3848         struct mgs_target_info *mti = NULL;
3849         int rc, i;
3850         ENTRY;
3851
3852         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3853         if (rc) {
3854                 CERROR("Can't get db for %s\n", fsname);
3855                 RETURN(rc);
3856         }
3857         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3858                 CERROR("%s is not defined\n", fsname);
3859                 mgs_free_fsdb(mgs, fsdb);
3860                 RETURN(-EINVAL);
3861         }
3862
3863         label_sz = 10 + strlen(fsname) + strlen(poolname);
3864
3865         /* check if ostname match fsname */
3866         if (ostname != NULL) {
3867                 char *ptr;
3868
3869                 ptr = strrchr(ostname, '-');
3870                 if ((ptr == NULL) ||
3871                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3872                         RETURN(-EINVAL);
3873                 label_sz += strlen(ostname);
3874         }
3875
3876         OBD_ALLOC(label, label_sz);
3877         if (label == NULL)
3878                 RETURN(-ENOMEM);
3879
3880         switch(cmd) {
3881         case LCFG_POOL_NEW:
3882                 sprintf(label,
3883                         "new %s.%s", fsname, poolname);
3884                 break;
3885         case LCFG_POOL_ADD:
3886                 sprintf(label,
3887                         "add %s.%s.%s", fsname, poolname, ostname);
3888                 break;
3889         case LCFG_POOL_REM:
3890                 OBD_ALLOC(canceled_label, label_sz);
3891                 if (canceled_label == NULL)
3892                         GOTO(out_label, rc = -ENOMEM);
3893                 sprintf(label,
3894                         "rem %s.%s.%s", fsname, poolname, ostname);
3895                 sprintf(canceled_label,
3896                         "add %s.%s.%s", fsname, poolname, ostname);
3897                 break;
3898         case LCFG_POOL_DEL:
3899                 OBD_ALLOC(canceled_label, label_sz);
3900                 if (canceled_label == NULL)
3901                         GOTO(out_label, rc = -ENOMEM);
3902                 sprintf(label,
3903                         "del %s.%s", fsname, poolname);
3904                 sprintf(canceled_label,
3905                         "new %s.%s", fsname, poolname);
3906                 break;
3907         default:
3908                 break;
3909         }
3910
3911         mutex_lock(&fsdb->fsdb_mutex);
3912
3913         if (canceled_label != NULL) {
3914                 OBD_ALLOC_PTR(mti);
3915                 if (mti == NULL)
3916                         GOTO(out_cancel, rc = -ENOMEM);
3917         }
3918
3919         /* write pool def to all MDT logs */
3920         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3921                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3922                         rc = name_create_mdt_and_lov(&logname, &lovname,
3923                                                      fsdb, i);
3924                         if (rc) {
3925                                 mutex_unlock(&fsdb->fsdb_mutex);
3926                                 GOTO(out_mti, rc);
3927                         }
3928                         if (canceled_label != NULL) {
3929                                 strcpy(mti->mti_svname, "lov pool");
3930                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3931                                                 lovname, canceled_label,
3932                                                 CM_SKIP);
3933                         }
3934
3935                         if (rc >= 0)
3936                                 rc = mgs_write_log_pool(env, mgs, logname,
3937                                                         fsdb, lovname, cmd,
3938                                                         fsname, poolname,
3939                                                         ostname, label);
3940                         name_destroy(&logname);
3941                         name_destroy(&lovname);
3942                         if (rc) {
3943                                 mutex_unlock(&fsdb->fsdb_mutex);
3944                                 GOTO(out_mti, rc);
3945                         }
3946                 }
3947         }
3948
3949         rc = name_create(&logname, fsname, "-client");
3950         if (rc) {
3951                 mutex_unlock(&fsdb->fsdb_mutex);
3952                 GOTO(out_mti, rc);
3953         }
3954         if (canceled_label != NULL) {
3955                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3956                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3957                 if (rc < 0) {
3958                         mutex_unlock(&fsdb->fsdb_mutex);
3959                         name_destroy(&logname);
3960                         GOTO(out_mti, rc);
3961                 }
3962         }
3963
3964         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3965                                 cmd, fsname, poolname, ostname, label);
3966         mutex_unlock(&fsdb->fsdb_mutex);
3967         name_destroy(&logname);
3968         /* request for update */
3969         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3970
3971         EXIT;
3972 out_mti:
3973         if (mti != NULL)
3974                 OBD_FREE_PTR(mti);
3975 out_cancel:
3976         if (canceled_label != NULL)
3977                 OBD_FREE(canceled_label, label_sz);
3978 out_label:
3979         OBD_FREE(label, label_sz);
3980         return rc;
3981 }
3982
3983