Whamcloud - gitweb
32a422af8693e49cbcbc4fd0a6c8e96355bc7fae
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
566                                    "but the max index is %d.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 /* Header and tail are added back to lrh_len in
648                    llog_lvfs_write_rec */
649                 rec->lrh_len = cfg_len;
650                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
651                                 rec->lrh_index);
652                 if (!rc)
653                          mml->mml_modified++;
654         }
655
656         RETURN(rc);
657 }
658
659 /**
660  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
661  * Return code:
662  * 0 - modified successfully,
663  * 1 - no modification was done
664  * negative - error
665  */
666 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
667                       struct fs_db *fsdb, struct mgs_target_info *mti,
668                       char *logname, char *devname, char *comment, int flags)
669 {
670         struct llog_handle *loghandle;
671         struct llog_ctxt *ctxt;
672         struct mgs_modify_lookup *mml;
673         int rc;
674
675         ENTRY;
676
677         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
678         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
679                flags);
680
681         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
682         LASSERT(ctxt != NULL);
683         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
684         if (rc < 0) {
685                 if (rc == -ENOENT)
686                         rc = 0;
687                 GOTO(out_pop, rc);
688         }
689
690         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
691         if (rc)
692                 GOTO(out_close, rc);
693
694         if (llog_get_size(loghandle) <= 1)
695                 GOTO(out_close, rc = 0);
696
697         OBD_ALLOC_PTR(mml);
698         if (!mml)
699                 GOTO(out_close, rc = -ENOMEM);
700         strcpy(mml->mml_marker.cm_comment, comment);
701         strcpy(mml->mml_marker.cm_tgtname, devname);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710         OBD_FREE_PTR(mml);
711
712 out_close:
713         llog_close(env, loghandle);
714 out_pop:
715         if (rc < 0)
716                 CERROR("%s: modify %s/%s failed: rc = %d\n",
717                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /** This structure is passed to mgs_replace_handler */
723 struct mgs_replace_uuid_lookup {
724         /* Nids are replaced for this target device */
725         struct mgs_target_info target;
726         /* Temporary modified llog */
727         struct llog_handle *temp_llh;
728         /* Flag is set if in target block*/
729         int in_target_device;
730         /* Nids already added. Just skip (multiple nids) */
731         int device_nids_added;
732         /* Flag is set if this block should not be copied */
733         int skip_it;
734 };
735
736 /**
737  * Check: a) if block should be skipped
738  * b) is it target block
739  *
740  * \param[in] lcfg
741  * \param[in] mrul
742  *
743  * \retval 0 should not to be skipped
744  * \retval 1 should to be skipped
745  */
746 static int check_markers(struct lustre_cfg *lcfg,
747                          struct mgs_replace_uuid_lookup *mrul)
748 {
749          struct cfg_marker *marker;
750
751         /* Track markers. Find given device */
752         if (lcfg->lcfg_command == LCFG_MARKER) {
753                 marker = lustre_cfg_buf(lcfg, 1);
754                 /* Clean llog from records marked as CM_EXCLUDE.
755                    CM_SKIP records are used for "active" command
756                    and can be restored if needed */
757                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
758                     (CM_EXCLUDE | CM_START)) {
759                         mrul->skip_it = 1;
760                         return 1;
761                 }
762
763                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
764                     (CM_EXCLUDE | CM_END)) {
765                         mrul->skip_it = 0;
766                         return 1;
767                 }
768
769                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
770                         LASSERT(!(marker->cm_flags & CM_START) ||
771                                 !(marker->cm_flags & CM_END));
772                         if (marker->cm_flags & CM_START) {
773                                 mrul->in_target_device = 1;
774                                 mrul->device_nids_added = 0;
775                         } else if (marker->cm_flags & CM_END)
776                                 mrul->in_target_device = 0;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
784                        struct lustre_cfg *lcfg)
785 {
786         struct llog_rec_hdr      rec;
787         int                      buflen, rc;
788
789         if (!lcfg || !llh)
790                 return -ENOMEM;
791
792         LASSERT(llh->lgh_ctxt);
793
794         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
795                                 lcfg->lcfg_buflens);
796         rec.lrh_len = llog_data_len(buflen);
797         rec.lrh_type = OBD_CFG_REC;
798
799         /* idx = -1 means append */
800         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
801         if (rc)
802                 CERROR("failed %d\n", rc);
803         return rc;
804 }
805
806 static int record_base(const struct lu_env *env, struct llog_handle *llh,
807                      char *cfgname, lnet_nid_t nid, int cmd,
808                      char *s1, char *s2, char *s3, char *s4)
809 {
810         struct mgs_thread_info *mgi = mgs_env_info(env);
811         struct lustre_cfg     *lcfg;
812         int rc;
813
814         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
815                cmd, s1, s2, s3, s4);
816
817         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
818         if (s1)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
820         if (s2)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
822         if (s3)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
824         if (s4)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
826
827         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         lcfg->lcfg_nid = nid;
831
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835
836         if (rc) {
837                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
838                        cmd, s1, s2, s3, s4);
839         }
840         return rc;
841 }
842
843 static inline int record_add_uuid(const struct lu_env *env,
844                                   struct llog_handle *llh,
845                                   uint64_t nid, char *uuid)
846 {
847         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
848 }
849
850 static inline int record_add_conn(const struct lu_env *env,
851                                   struct llog_handle *llh,
852                                   char *devname, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
855 }
856
857 static inline int record_attach(const struct lu_env *env,
858                                 struct llog_handle *llh, char *devname,
859                                 char *type, char *uuid)
860 {
861         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
862 }
863
864 static inline int record_setup(const struct lu_env *env,
865                                struct llog_handle *llh, char *devname,
866                                char *s1, char *s2, char *s3, char *s4)
867 {
868         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
869 }
870
871 /**
872  * \retval <0 record processing error
873  * \retval n record is processed. No need copy original one.
874  * \retval 0 record is not processed.
875  */
876 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
877                            struct mgs_replace_uuid_lookup *mrul)
878 {
879         int nids_added = 0;
880         lnet_nid_t nid;
881         char *ptr;
882         int rc;
883
884         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
885                 /* LCFG_ADD_UUID command found. Let's skip original command
886                    and add passed nids */
887                 ptr = mrul->target.mti_params;
888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
889                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
890                                "device %s\n", libcfs_nid2str(nid),
891                                 mrul->target.mti_params,
892                                 mrul->target.mti_svname);
893                         rc = record_add_uuid(env,
894                                              mrul->temp_llh, nid,
895                                              mrul->target.mti_params);
896                         if (!rc)
897                                 nids_added++;
898                 }
899
900                 if (nids_added == 0) {
901                         CERROR("No new nids were added, nid %s with uuid %s, "
902                                "device %s\n", libcfs_nid2str(nid),
903                                mrul->target.mti_params,
904                                mrul->target.mti_svname);
905                         RETURN(-ENXIO);
906                 } else {
907                         mrul->device_nids_added = 1;
908                 }
909
910                 return nids_added;
911         }
912
913         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
914                 /* LCFG_SETUP command found. UUID should be changed */
915                 rc = record_setup(env,
916                                   mrul->temp_llh,
917                                   /* devname the same */
918                                   lustre_cfg_string(lcfg, 0),
919                                   /* s1 is not changed */
920                                   lustre_cfg_string(lcfg, 1),
921                                   /* new uuid should be
922                                   the full nidlist */
923                                   mrul->target.mti_params,
924                                   /* s3 is not changed */
925                                   lustre_cfg_string(lcfg, 3),
926                                   /* s4 is not changed */
927                                   lustre_cfg_string(lcfg, 4));
928                 return rc ? rc : 1;
929         }
930
931         /* Another commands in target device block */
932         return 0;
933 }
934
935 /**
936  * Handler that called for every record in llog.
937  * Records are processed in order they placed in llog.
938  *
939  * \param[in] llh       log to be processed
940  * \param[in] rec       current record
941  * \param[in] data      mgs_replace_uuid_lookup structure
942  *
943  * \retval 0    success
944  */
945 static int mgs_replace_handler(const struct lu_env *env,
946                                struct llog_handle *llh,
947                                struct llog_rec_hdr *rec,
948                                void *data)
949 {
950         struct llog_rec_hdr local_rec = *rec;
951         struct mgs_replace_uuid_lookup *mrul;
952         struct lustre_cfg *lcfg = REC_DATA(rec);
953         int cfg_len = REC_DATA_LEN(rec);
954         int rc;
955         ENTRY;
956
957         mrul = (struct mgs_replace_uuid_lookup *)data;
958
959         if (rec->lrh_type != OBD_CFG_REC) {
960                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
961                        rec->lrh_type, lcfg->lcfg_command,
962                        lustre_cfg_string(lcfg, 0),
963                        lustre_cfg_string(lcfg, 1));
964                 RETURN(-EINVAL);
965         }
966
967         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
968         if (rc) {
969                 /* Do not copy any invalidated records */
970                 GOTO(skip_out, rc = 0);
971         }
972
973         rc = check_markers(lcfg, mrul);
974         if (rc || mrul->skip_it)
975                 GOTO(skip_out, rc = 0);
976
977         /* Write to new log all commands outside target device block */
978         if (!mrul->in_target_device)
979                 GOTO(copy_out, rc = 0);
980
981         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
982            (failover nids) for this target, assuming that if then
983            primary is changing then so is the failover */
984         if (mrul->device_nids_added &&
985             (lcfg->lcfg_command == LCFG_ADD_UUID ||
986              lcfg->lcfg_command == LCFG_ADD_CONN))
987                 GOTO(skip_out, rc = 0);
988
989         rc = process_command(env, lcfg, mrul);
990         if (rc < 0)
991                 RETURN(rc);
992
993         if (rc)
994                 RETURN(0);
995 copy_out:
996         /* Record is placed in temporary llog as is */
997         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
998         rc = llog_write(env, mrul->temp_llh, &local_rec, NULL, 0,
999                         (void *)lcfg, -1);
1000
1001         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1002                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1003                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1004         RETURN(rc);
1005
1006 skip_out:
1007         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1008                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1009                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1010         RETURN(rc);
1011 }
1012
1013 static int mgs_backup_llog(const struct lu_env *env,
1014                            struct obd_device *mgs,
1015                            char *fsname, char *backup)
1016 {
1017         struct obd_uuid *uuid;
1018         struct llog_handle *orig_llh, *bak_llh;
1019         struct llog_ctxt *lctxt;
1020         int rc, rc2;
1021         ENTRY;
1022
1023         lctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         if (!lctxt) {
1025                 CERROR("%s: missing llog context\n", mgs->obd_name);
1026                 GOTO(out, rc = -EINVAL);
1027         }
1028
1029         /* Make sure there's no old backup log */
1030         rc = llog_erase(env, lctxt, NULL, backup);
1031         if (rc < 0 && rc != -ENOENT)
1032                 GOTO(out_put, rc);
1033
1034         /* open backup log */
1035         rc = llog_open_create(env, lctxt, &bak_llh, NULL, backup);
1036         if (rc) {
1037                 CERROR("%s: backup logfile open %s: rc = %d\n",
1038                        mgs->obd_name, backup, rc);
1039                 GOTO(out_put, rc);
1040         }
1041
1042         /* set the log header uuid */
1043         OBD_ALLOC_PTR(uuid);
1044         if (uuid == NULL)
1045                 GOTO(out_put, rc = -ENOMEM);
1046         obd_str2uuid(uuid, backup);
1047         rc = llog_init_handle(env, bak_llh, LLOG_F_IS_PLAIN, uuid);
1048         OBD_FREE_PTR(uuid);
1049         if (rc)
1050                 GOTO(out_close1, rc);
1051
1052         /* open original log */
1053         rc = llog_open(env, lctxt, &orig_llh, NULL, fsname,
1054                        LLOG_OPEN_EXISTS);
1055         if (rc < 0) {
1056                 if (rc == -ENOENT)
1057                         rc = 0;
1058                 GOTO(out_close1, rc);
1059         }
1060
1061         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close2, rc);
1064
1065         /* Copy remote log */
1066         rc = llog_process(env, orig_llh, llog_copy_handler,
1067                           (void *)bak_llh, NULL);
1068
1069 out_close2:
1070         rc2 = llog_close(env, orig_llh);
1071         if (!rc)
1072                 rc = rc2;
1073 out_close1:
1074         rc2 = llog_close(env, bak_llh);
1075         if (!rc)
1076                 rc = rc2;
1077 out_put:
1078         if (lctxt)
1079                 llog_ctxt_put(lctxt);
1080 out:
1081         if (rc)
1082                 CERROR("%s: Failed to backup log %s: rc = %d\n",
1083                        mgs->obd_name, fsname, rc);
1084         RETURN(rc);
1085 }
1086
1087 static int mgs_log_is_empty(const struct lu_env *env, struct mgs_device *mgs,
1088                             char *name);
1089
1090 static int mgs_replace_nids_log(const struct lu_env *env,
1091                                 struct obd_device *mgs, struct fs_db *fsdb,
1092                                 char *logname, char *devname, char *nids)
1093 {
1094         struct llog_handle *orig_llh, *backup_llh;
1095         struct llog_ctxt *ctxt;
1096         struct mgs_replace_uuid_lookup *mrul;
1097         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1098         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1099         char *backup;
1100         int rc, rc2;
1101         ENTRY;
1102
1103         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1104
1105         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1106         LASSERT(ctxt != NULL);
1107
1108         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1109                 /* Log is empty. Nothing to replace */
1110                 GOTO(out_put, rc = 0);
1111         }
1112
1113         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1114         if (backup == NULL)
1115                 GOTO(out_put, rc = -ENOMEM);
1116
1117         sprintf(backup, "%s.bak", logname);
1118
1119         rc = mgs_backup_llog(env, mgs, logname, backup);
1120         if (rc < 0) {
1121                 CERROR("%s: can't make backup for %s: rc = %d\n",
1122                        mgs->obd_name, logname, rc);
1123                 GOTO(out_free,rc);
1124         }
1125
1126         /* Now erase original log file. Connections are not allowed.
1127            Backup is already saved */
1128         rc = llog_erase(env, ctxt, NULL, logname);
1129         if (rc < 0 && rc != -ENOENT)
1130                 GOTO(out_free, rc);
1131
1132         /* open local log */
1133         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1134         if (rc)
1135                 GOTO(out_restore, rc);
1136
1137         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1138         if (rc)
1139                 GOTO(out_closel, rc);
1140
1141         /* open backup llog */
1142         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1143                        LLOG_OPEN_EXISTS);
1144         if (rc)
1145                 GOTO(out_closel, rc);
1146
1147         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1148         if (rc)
1149                 GOTO(out_close, rc);
1150
1151         if (llog_get_size(backup_llh) <= 1)
1152                 GOTO(out_close, rc = 0);
1153
1154         OBD_ALLOC_PTR(mrul);
1155         if (!mrul)
1156                 GOTO(out_close, rc = -ENOMEM);
1157         /* devname is only needed information to replace UUID records */
1158         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1159         /* parse nids later */
1160         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1161         /* Copy records to this temporary llog */
1162         mrul->temp_llh = orig_llh;
1163
1164         rc = llog_process(env, backup_llh, mgs_replace_handler,
1165                           (void *)mrul, NULL);
1166         OBD_FREE_PTR(mrul);
1167 out_close:
1168         rc2 = llog_close(NULL, backup_llh);
1169         if (!rc)
1170                 rc = rc2;
1171 out_closel:
1172         rc2 = llog_close(NULL, orig_llh);
1173         if (!rc)
1174                 rc = rc2;
1175
1176 out_restore:
1177         if (rc) {
1178                 CERROR("%s: llog should be restored: rc = %d\n",
1179                        mgs->obd_name, rc);
1180                 rc2 = mgs_backup_llog(env, mgs, backup, logname);
1181                 if (rc2 < 0)
1182                         CERROR("%s: can't restore backup %s: rc = %d\n",
1183                                mgs->obd_name, logname, rc2);
1184         }
1185
1186 out_free:
1187         OBD_FREE(backup, strlen(backup) + 1);
1188
1189 out_put:
1190         llog_ctxt_put(ctxt);
1191
1192         if (rc)
1193                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1194                        mgs->obd_name, logname, rc);
1195
1196         RETURN(rc);
1197 }
1198
1199 /**
1200  * Parse device name and get file system name and/or device index
1201  *
1202  * \param[in]   devname device name (ex. lustre-MDT0000)
1203  * \param[out]  fsname  file system name(optional)
1204  * \param[out]  index   device index(optional)
1205  *
1206  * \retval 0    success
1207  */
1208 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1209 {
1210         char *ptr;
1211         ENTRY;
1212
1213         /* Extract fsname */
1214         ptr = strrchr(devname, '-');
1215
1216         if (fsname) {
1217                 if (!ptr) {
1218                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1219                                devname);
1220                         RETURN(-EINVAL);
1221                 }
1222                 memset(fsname, 0, MTI_NAME_MAXLEN);
1223                 strncpy(fsname, devname, ptr - devname);
1224                 fsname[MTI_NAME_MAXLEN - 1] = 0;
1225         }
1226
1227         if (index) {
1228                 if (server_name2index(ptr, index, NULL) < 0) {
1229                         CDEBUG(D_MGS, "Device name with wrong index\n");
1230                         RETURN(-EINVAL);
1231                 }
1232         }
1233
1234         RETURN(0);
1235 }
1236
1237 static int only_mgs_is_running(struct obd_device *mgs_obd)
1238 {
1239         /* TDB: Is global variable with devices count exists? */
1240         int num_devices = get_devices_count();
1241         /* osd, MGS and MGC + self_export
1242            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1243         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1244 }
1245
1246 static int name_create_mdt(char **logname, char *fsname, int i)
1247 {
1248         char mdt_index[9];
1249
1250         sprintf(mdt_index, "-MDT%04x", i);
1251         return name_create(logname, fsname, mdt_index);
1252 }
1253
1254 /**
1255  * Replace nids for \a device to \a nids values
1256  *
1257  * \param obd           MGS obd device
1258  * \param devname       nids need to be replaced for this device
1259  * (ex. lustre-OST0000)
1260  * \param nids          nids list (ex. nid1,nid2,nid3)
1261  *
1262  * \retval 0    success
1263  */
1264 int mgs_replace_nids(const struct lu_env *env,
1265                      struct mgs_device *mgs,
1266                      char *devname, char *nids)
1267 {
1268         /* Assume fsname is part of device name */
1269         char fsname[MTI_NAME_MAXLEN];
1270         int rc;
1271         __u32 index;
1272         char *logname;
1273         struct fs_db *fsdb;
1274         unsigned int i;
1275         int conn_state;
1276         struct obd_device *mgs_obd = mgs->mgs_obd;
1277         ENTRY;
1278
1279         /* We can only change NIDs if no other nodes are connected */
1280         spin_lock(&mgs_obd->obd_dev_lock);
1281         conn_state = mgs_obd->obd_no_conn;
1282         mgs_obd->obd_no_conn = 1;
1283         spin_unlock(&mgs_obd->obd_dev_lock);
1284
1285         /* We can not change nids if not only MGS is started */
1286         if (!only_mgs_is_running(mgs_obd)) {
1287                 CERROR("Only MGS is allowed to be started\n");
1288                 GOTO(out, rc = -EINPROGRESS);
1289         }
1290
1291         /* Get fsname and index*/
1292         rc = mgs_parse_devname(devname, fsname, &index);
1293         if (rc)
1294                 GOTO(out, rc);
1295
1296         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1297         if (rc) {
1298                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1299                 GOTO(out, rc);
1300         }
1301
1302         /* Process client llogs */
1303         name_create(&logname, fsname, "-client");
1304         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1305         name_destroy(&logname);
1306         if (rc) {
1307                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1308                        fsname, devname, rc);
1309                 GOTO(out, rc);
1310         }
1311
1312         /* Process MDT llogs */
1313         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1314                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1315                         continue;
1316                 name_create_mdt(&logname, fsname, i);
1317                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1318                 name_destroy(&logname);
1319                 if (rc)
1320                         GOTO(out, rc);
1321         }
1322
1323 out:
1324         spin_lock(&mgs_obd->obd_dev_lock);
1325         mgs_obd->obd_no_conn = conn_state;
1326         spin_unlock(&mgs_obd->obd_dev_lock);
1327
1328         RETURN(rc);
1329 }
1330
1331 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1332                             char *devname, struct lov_desc *desc)
1333 {
1334         struct mgs_thread_info *mgi = mgs_env_info(env);
1335         struct lustre_cfg *lcfg;
1336         int rc;
1337
1338         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1339         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1340         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1341         if (!lcfg)
1342                 return -ENOMEM;
1343         rc = record_lcfg(env, llh, lcfg);
1344
1345         lustre_cfg_free(lcfg);
1346         return rc;
1347 }
1348
1349 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1350                             char *devname, struct lmv_desc *desc)
1351 {
1352         struct mgs_thread_info *mgi = mgs_env_info(env);
1353         struct lustre_cfg *lcfg;
1354         int rc;
1355
1356         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1357         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1358         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1359
1360         rc = record_lcfg(env, llh, lcfg);
1361
1362         lustre_cfg_free(lcfg);
1363         return rc;
1364 }
1365
1366 static inline int record_mdc_add(const struct lu_env *env,
1367                                  struct llog_handle *llh,
1368                                  char *logname, char *mdcuuid,
1369                                  char *mdtuuid, char *index,
1370                                  char *gen)
1371 {
1372         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1373                            mdtuuid,index,gen,mdcuuid);
1374 }
1375
1376 static inline int record_lov_add(const struct lu_env *env,
1377                                  struct llog_handle *llh,
1378                                  char *lov_name, char *ost_uuid,
1379                                  char *index, char *gen)
1380 {
1381         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1382                            ost_uuid, index, gen, 0);
1383 }
1384
1385 static inline int record_mount_opt(const struct lu_env *env,
1386                                    struct llog_handle *llh,
1387                                    char *profile, char *lov_name,
1388                                    char *mdc_name)
1389 {
1390         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1391                            profile,lov_name,mdc_name,0);
1392 }
1393
1394 static int record_marker(const struct lu_env *env,
1395                          struct llog_handle *llh,
1396                          struct fs_db *fsdb, __u32 flags,
1397                          char *tgtname, char *comment)
1398 {
1399         struct mgs_thread_info *mgi = mgs_env_info(env);
1400         struct lustre_cfg *lcfg;
1401         int rc;
1402         int cplen = 0;
1403
1404         if (flags & CM_START)
1405                 fsdb->fsdb_gen++;
1406         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1407         mgi->mgi_marker.cm_flags = flags;
1408         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1409         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1410                         sizeof(mgi->mgi_marker.cm_tgtname));
1411         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1412                 return -E2BIG;
1413         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1414                         sizeof(mgi->mgi_marker.cm_comment));
1415         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1416                 return -E2BIG;
1417         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1418         mgi->mgi_marker.cm_canceltime = 0;
1419         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1420         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1421                             sizeof(mgi->mgi_marker));
1422         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1423         if (!lcfg)
1424                 return -ENOMEM;
1425         rc = record_lcfg(env, llh, lcfg);
1426
1427         lustre_cfg_free(lcfg);
1428         return rc;
1429 }
1430
1431 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1432                             struct llog_handle **llh, char *name)
1433 {
1434         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1435         struct llog_ctxt        *ctxt;
1436         int                      rc = 0;
1437
1438         if (*llh)
1439                 GOTO(out, rc = -EBUSY);
1440
1441         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1442         if (!ctxt)
1443                 GOTO(out, rc = -ENODEV);
1444         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1445
1446         rc = llog_open_create(env, ctxt, llh, NULL, name);
1447         if (rc)
1448                 GOTO(out_ctxt, rc);
1449         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1450         if (rc)
1451                 llog_close(env, *llh);
1452 out_ctxt:
1453         llog_ctxt_put(ctxt);
1454 out:
1455         if (rc) {
1456                 CERROR("%s: can't start log %s: rc = %d\n",
1457                        mgs->mgs_obd->obd_name, name, rc);
1458                 *llh = NULL;
1459         }
1460         RETURN(rc);
1461 }
1462
1463 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1464 {
1465         int rc;
1466
1467         rc = llog_close(env, *llh);
1468         *llh = NULL;
1469
1470         return rc;
1471 }
1472
1473 static int mgs_log_is_empty(const struct lu_env *env,
1474                             struct mgs_device *mgs, char *name)
1475 {
1476         struct llog_handle *llh;
1477         struct llog_ctxt *ctxt;
1478         int rc = 0;
1479
1480         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1481         LASSERT(ctxt != NULL);
1482         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1483         if (rc < 0) {
1484                 if (rc == -ENOENT)
1485                         rc = 0;
1486                 GOTO(out_ctxt, rc);
1487         }
1488
1489         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1490         if (rc)
1491                 GOTO(out_close, rc);
1492         rc = llog_get_size(llh);
1493
1494 out_close:
1495         llog_close(env, llh);
1496 out_ctxt:
1497         llog_ctxt_put(ctxt);
1498         /* header is record 1 */
1499         return (rc <= 1);
1500 }
1501
1502 /******************** config "macros" *********************/
1503
1504 /* write an lcfg directly into a log (with markers) */
1505 static int mgs_write_log_direct(const struct lu_env *env,
1506                                 struct mgs_device *mgs, struct fs_db *fsdb,
1507                                 char *logname, struct lustre_cfg *lcfg,
1508                                 char *devname, char *comment)
1509 {
1510         struct llog_handle *llh = NULL;
1511         int rc;
1512         ENTRY;
1513
1514         if (!lcfg)
1515                 RETURN(-ENOMEM);
1516
1517         rc = record_start_log(env, mgs, &llh, logname);
1518         if (rc)
1519                 RETURN(rc);
1520
1521         /* FIXME These should be a single journal transaction */
1522         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1523         if (rc)
1524                 GOTO(out_end, rc);
1525         rc = record_lcfg(env, llh, lcfg);
1526         if (rc)
1527                 GOTO(out_end, rc);
1528         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1529         if (rc)
1530                 GOTO(out_end, rc);
1531 out_end:
1532         record_end_log(env, &llh);
1533         RETURN(rc);
1534 }
1535
1536 /* write the lcfg in all logs for the given fs */
1537 int mgs_write_log_direct_all(const struct lu_env *env,
1538                              struct mgs_device *mgs,
1539                              struct fs_db *fsdb,
1540                              struct mgs_target_info *mti,
1541                              struct lustre_cfg *lcfg,
1542                              char *devname, char *comment,
1543                              int server_only)
1544 {
1545         cfs_list_t list;
1546         struct mgs_direntry *dirent, *n;
1547         char *fsname = mti->mti_fsname;
1548         char *logname;
1549         int rc = 0, len = strlen(fsname);
1550         ENTRY;
1551
1552         /* We need to set params for any future logs
1553            as well. FIXME Append this file to every new log.
1554            Actually, we should store as params (text), not llogs.  Or
1555            in a database. */
1556         rc = name_create(&logname, fsname, "-params");
1557         if (rc)
1558                 RETURN(rc);
1559         if (mgs_log_is_empty(env, mgs, logname)) {
1560                 struct llog_handle *llh = NULL;
1561                 rc = record_start_log(env, mgs, &llh, logname);
1562                 record_end_log(env, &llh);
1563         }
1564         name_destroy(&logname);
1565         if (rc)
1566                 RETURN(rc);
1567
1568         /* Find all the logs in the CONFIGS directory */
1569         rc = class_dentry_readdir(env, mgs, &list);
1570         if (rc)
1571                 RETURN(rc);
1572
1573         /* Could use fsdb index maps instead of directory listing */
1574         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1575                 cfs_list_del(&dirent->list);
1576                 /* don't write to sptlrpc rule log */
1577                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1578                         goto next;
1579
1580                 /* caller wants write server logs only */
1581                 if (server_only && strstr(dirent->name, "-client") != NULL)
1582                         goto next;
1583
1584                 if (strncmp(fsname, dirent->name, len) == 0) {
1585                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1586                         /* Erase any old settings of this same parameter */
1587                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1588                                         devname, comment, CM_SKIP);
1589                         if (rc < 0)
1590                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1591                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1592                         /* Write the new one */
1593                         if (lcfg) {
1594                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1595                                                           dirent->name,
1596                                                           lcfg, devname,
1597                                                           comment);
1598                                 if (rc)
1599                                         CERROR("%s: writing log %s: rc = %d\n",
1600                                                mgs->mgs_obd->obd_name,
1601                                                dirent->name, rc);
1602                         }
1603                 }
1604 next:
1605                 mgs_direntry_free(dirent);
1606         }
1607
1608         RETURN(rc);
1609 }
1610
1611 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1612                                     struct mgs_device *mgs,
1613                                     struct fs_db *fsdb,
1614                                     struct mgs_target_info *mti,
1615                                     int index, char *logname);
1616 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1617                                     struct mgs_device *mgs,
1618                                     struct fs_db *fsdb,
1619                                     struct mgs_target_info *mti,
1620                                     char *logname, char *suffix, char *lovname,
1621                                     enum lustre_sec_part sec_part, int flags);
1622 static int name_create_mdt_and_lov(char **logname, char **lovname,
1623                                    struct fs_db *fsdb, int i);
1624
1625 static int add_param(char *params, char *key, char *val)
1626 {
1627         char *start = params + strlen(params);
1628         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1629         int keylen = 0;
1630
1631         if (key != NULL)
1632                 keylen = strlen(key);
1633         if (start + 1 + keylen + strlen(val) >= end) {
1634                 CERROR("params are too long: %s %s%s\n",
1635                        params, key != NULL ? key : "", val);
1636                 return -EINVAL;
1637         }
1638
1639         sprintf(start, " %s%s", key != NULL ? key : "", val);
1640         return 0;
1641 }
1642
1643 /**
1644  * Walk through client config log record and convert the related records
1645  * into the target.
1646  **/
1647 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1648                                          struct llog_handle *llh,
1649                                          struct llog_rec_hdr *rec, void *data)
1650 {
1651         struct mgs_device *mgs;
1652         struct obd_device *obd;
1653         struct mgs_target_info *mti, *tmti;
1654         struct fs_db *fsdb;
1655         int cfg_len = rec->lrh_len;
1656         char *cfg_buf = (char*) (rec + 1);
1657         struct lustre_cfg *lcfg;
1658         int rc = 0;
1659         struct llog_handle *mdt_llh = NULL;
1660         static int got_an_osc_or_mdc = 0;
1661         /* 0: not found any osc/mdc;
1662            1: found osc;
1663            2: found mdc;
1664         */
1665         static int last_step = -1;
1666         int cplen = 0;
1667
1668         ENTRY;
1669
1670         mti = ((struct temp_comp*)data)->comp_mti;
1671         tmti = ((struct temp_comp*)data)->comp_tmti;
1672         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1673         obd = ((struct temp_comp *)data)->comp_obd;
1674         mgs = lu2mgs_dev(obd->obd_lu_dev);
1675         LASSERT(mgs);
1676
1677         if (rec->lrh_type != OBD_CFG_REC) {
1678                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1679                 RETURN(-EINVAL);
1680         }
1681
1682         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1683         if (rc) {
1684                 CERROR("Insane cfg\n");
1685                 RETURN(rc);
1686         }
1687
1688         lcfg = (struct lustre_cfg *)cfg_buf;
1689
1690         if (lcfg->lcfg_command == LCFG_MARKER) {
1691                 struct cfg_marker *marker;
1692                 marker = lustre_cfg_buf(lcfg, 1);
1693                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1694                     (marker->cm_flags & CM_START) &&
1695                      !(marker->cm_flags & CM_SKIP)) {
1696                         got_an_osc_or_mdc = 1;
1697                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1698                                         sizeof(tmti->mti_svname));
1699                         if (cplen >= sizeof(tmti->mti_svname))
1700                                 RETURN(-E2BIG);
1701                         rc = record_start_log(env, mgs, &mdt_llh,
1702                                               mti->mti_svname);
1703                         if (rc)
1704                                 RETURN(rc);
1705                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1706                                            mti->mti_svname, "add osc(copied)");
1707                         record_end_log(env, &mdt_llh);
1708                         last_step = marker->cm_step;
1709                         RETURN(rc);
1710                 }
1711                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1712                     (marker->cm_flags & CM_END) &&
1713                      !(marker->cm_flags & CM_SKIP)) {
1714                         LASSERT(last_step == marker->cm_step);
1715                         last_step = -1;
1716                         got_an_osc_or_mdc = 0;
1717                         memset(tmti, 0, sizeof(*tmti));
1718                         rc = record_start_log(env, mgs, &mdt_llh,
1719                                               mti->mti_svname);
1720                         if (rc)
1721                                 RETURN(rc);
1722                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1723                                            mti->mti_svname, "add osc(copied)");
1724                         record_end_log(env, &mdt_llh);
1725                         RETURN(rc);
1726                 }
1727                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1728                     (marker->cm_flags & CM_START) &&
1729                      !(marker->cm_flags & CM_SKIP)) {
1730                         got_an_osc_or_mdc = 2;
1731                         last_step = marker->cm_step;
1732                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1733                                strlen(marker->cm_tgtname));
1734
1735                         RETURN(rc);
1736                 }
1737                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1738                     (marker->cm_flags & CM_END) &&
1739                      !(marker->cm_flags & CM_SKIP)) {
1740                         LASSERT(last_step == marker->cm_step);
1741                         last_step = -1;
1742                         got_an_osc_or_mdc = 0;
1743                         memset(tmti, 0, sizeof(*tmti));
1744                         RETURN(rc);
1745                 }
1746         }
1747
1748         if (got_an_osc_or_mdc == 0 || last_step < 0)
1749                 RETURN(rc);
1750
1751         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1752                 uint64_t nodenid = lcfg->lcfg_nid;
1753
1754                 if (strlen(tmti->mti_uuid) == 0) {
1755                         /* target uuid not set, this config record is before
1756                          * LCFG_SETUP, this nid is one of target node nid.
1757                          */
1758                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1759                         tmti->mti_nid_count++;
1760                 } else {
1761                         /* failover node nid */
1762                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1763                                        libcfs_nid2str(nodenid));
1764                 }
1765
1766                 RETURN(rc);
1767         }
1768
1769         if (lcfg->lcfg_command == LCFG_SETUP) {
1770                 char *target;
1771
1772                 target = lustre_cfg_string(lcfg, 1);
1773                 memcpy(tmti->mti_uuid, target, strlen(target));
1774                 RETURN(rc);
1775         }
1776
1777         /* ignore client side sptlrpc_conf_log */
1778         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1779                 RETURN(rc);
1780
1781         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1782                 int index;
1783
1784                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1785                         RETURN (-EINVAL);
1786
1787                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1788                        strlen(mti->mti_fsname));
1789                 tmti->mti_stripe_index = index;
1790
1791                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1792                                               mti->mti_stripe_index,
1793                                               mti->mti_svname);
1794                 memset(tmti, 0, sizeof(*tmti));
1795                 RETURN(rc);
1796         }
1797
1798         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1799                 int index;
1800                 char mdt_index[9];
1801                 char *logname, *lovname;
1802
1803                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1804                                              mti->mti_stripe_index);
1805                 if (rc)
1806                         RETURN(rc);
1807                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1808
1809                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1810                         name_destroy(&logname);
1811                         name_destroy(&lovname);
1812                         RETURN(-EINVAL);
1813                 }
1814
1815                 tmti->mti_stripe_index = index;
1816                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1817                                          mdt_index, lovname,
1818                                          LUSTRE_SP_MDT, 0);
1819                 name_destroy(&logname);
1820                 name_destroy(&lovname);
1821                 RETURN(rc);
1822         }
1823         RETURN(rc);
1824 }
1825
1826 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1827 /* stealed from mgs_get_fsdb_from_llog*/
1828 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1829                                               struct mgs_device *mgs,
1830                                               char *client_name,
1831                                               struct temp_comp* comp)
1832 {
1833         struct llog_handle *loghandle;
1834         struct mgs_target_info *tmti;
1835         struct llog_ctxt *ctxt;
1836         int rc;
1837
1838         ENTRY;
1839
1840         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1841         LASSERT(ctxt != NULL);
1842
1843         OBD_ALLOC_PTR(tmti);
1844         if (tmti == NULL)
1845                 GOTO(out_ctxt, rc = -ENOMEM);
1846
1847         comp->comp_tmti = tmti;
1848         comp->comp_obd = mgs->mgs_obd;
1849
1850         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1851                        LLOG_OPEN_EXISTS);
1852         if (rc < 0) {
1853                 if (rc == -ENOENT)
1854                         rc = 0;
1855                 GOTO(out_pop, rc);
1856         }
1857
1858         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1859         if (rc)
1860                 GOTO(out_close, rc);
1861
1862         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1863                                   (void *)comp, NULL, false);
1864         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1865 out_close:
1866         llog_close(env, loghandle);
1867 out_pop:
1868         OBD_FREE_PTR(tmti);
1869 out_ctxt:
1870         llog_ctxt_put(ctxt);
1871         RETURN(rc);
1872 }
1873
1874 /* lmv is the second thing for client logs */
1875 /* copied from mgs_write_log_lov. Please refer to that.  */
1876 static int mgs_write_log_lmv(const struct lu_env *env,
1877                              struct mgs_device *mgs,
1878                              struct fs_db *fsdb,
1879                              struct mgs_target_info *mti,
1880                              char *logname, char *lmvname)
1881 {
1882         struct llog_handle *llh = NULL;
1883         struct lmv_desc *lmvdesc;
1884         char *uuid;
1885         int rc = 0;
1886         ENTRY;
1887
1888         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1889
1890         OBD_ALLOC_PTR(lmvdesc);
1891         if (lmvdesc == NULL)
1892                 RETURN(-ENOMEM);
1893         lmvdesc->ld_active_tgt_count = 0;
1894         lmvdesc->ld_tgt_count = 0;
1895         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1896         uuid = (char *)lmvdesc->ld_uuid.uuid;
1897
1898         rc = record_start_log(env, mgs, &llh, logname);
1899         if (rc)
1900                 GOTO(out_free, rc);
1901         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1902         if (rc)
1903                 GOTO(out_end, rc);
1904         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1905         if (rc)
1906                 GOTO(out_end, rc);
1907         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1908         if (rc)
1909                 GOTO(out_end, rc);
1910         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1911         if (rc)
1912                 GOTO(out_end, rc);
1913 out_end:
1914         record_end_log(env, &llh);
1915 out_free:
1916         OBD_FREE_PTR(lmvdesc);
1917         RETURN(rc);
1918 }
1919
1920 /* lov is the first thing in the mdt and client logs */
1921 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1922                              struct fs_db *fsdb, struct mgs_target_info *mti,
1923                              char *logname, char *lovname)
1924 {
1925         struct llog_handle *llh = NULL;
1926         struct lov_desc *lovdesc;
1927         char *uuid;
1928         int rc = 0;
1929         ENTRY;
1930
1931         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1932
1933         /*
1934         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1935         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1936               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1937         */
1938
1939         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1940         OBD_ALLOC_PTR(lovdesc);
1941         if (lovdesc == NULL)
1942                 RETURN(-ENOMEM);
1943         lovdesc->ld_magic = LOV_DESC_MAGIC;
1944         lovdesc->ld_tgt_count = 0;
1945         /* Defaults.  Can be changed later by lcfg config_param */
1946         lovdesc->ld_default_stripe_count = 1;
1947         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1948         lovdesc->ld_default_stripe_size = 1024 * 1024;
1949         lovdesc->ld_default_stripe_offset = -1;
1950         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1951         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1952         /* can these be the same? */
1953         uuid = (char *)lovdesc->ld_uuid.uuid;
1954
1955         /* This should always be the first entry in a log.
1956         rc = mgs_clear_log(obd, logname); */
1957         rc = record_start_log(env, mgs, &llh, logname);
1958         if (rc)
1959                 GOTO(out_free, rc);
1960         /* FIXME these should be a single journal transaction */
1961         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1962         if (rc)
1963                 GOTO(out_end, rc);
1964         rc = record_attach(env, llh, lovname, "lov", uuid);
1965         if (rc)
1966                 GOTO(out_end, rc);
1967         rc = record_lov_setup(env, llh, lovname, lovdesc);
1968         if (rc)
1969                 GOTO(out_end, rc);
1970         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1971         if (rc)
1972                 GOTO(out_end, rc);
1973         EXIT;
1974 out_end:
1975         record_end_log(env, &llh);
1976 out_free:
1977         OBD_FREE_PTR(lovdesc);
1978         return rc;
1979 }
1980
1981 /* add failnids to open log */
1982 static int mgs_write_log_failnids(const struct lu_env *env,
1983                                   struct mgs_target_info *mti,
1984                                   struct llog_handle *llh,
1985                                   char *cliname)
1986 {
1987         char *failnodeuuid = NULL;
1988         char *ptr = mti->mti_params;
1989         lnet_nid_t nid;
1990         int rc = 0;
1991
1992         /*
1993         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1994         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1995         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1996         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1997         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1998         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1999         */
2000
2001         /* Pull failnid info out of params string */
2002         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2003                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2004                         if (failnodeuuid == NULL) {
2005                                 /* We don't know the failover node name,
2006                                    so just use the first nid as the uuid */
2007                                 rc = name_create(&failnodeuuid,
2008                                                  libcfs_nid2str(nid), "");
2009                                 if (rc)
2010                                         return rc;
2011                         }
2012                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2013                                "client %s\n", libcfs_nid2str(nid),
2014                                failnodeuuid, cliname);
2015                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2016                 }
2017                 if (failnodeuuid)
2018                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2019         }
2020
2021         name_destroy(&failnodeuuid);
2022         return rc;
2023 }
2024
2025 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2026                                     struct mgs_device *mgs,
2027                                     struct fs_db *fsdb,
2028                                     struct mgs_target_info *mti,
2029                                     char *logname, char *lmvname)
2030 {
2031         struct llog_handle *llh = NULL;
2032         char *mdcname = NULL;
2033         char *nodeuuid = NULL;
2034         char *mdcuuid = NULL;
2035         char *lmvuuid = NULL;
2036         char index[6];
2037         int i, rc;
2038         ENTRY;
2039
2040         if (mgs_log_is_empty(env, mgs, logname)) {
2041                 CERROR("log is empty! Logical error\n");
2042                 RETURN(-EINVAL);
2043         }
2044
2045         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2046                mti->mti_svname, logname, lmvname);
2047
2048         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2049         if (rc)
2050                 RETURN(rc);
2051         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2052         if (rc)
2053                 GOTO(out_free, rc);
2054         rc = name_create(&mdcuuid, mdcname, "_UUID");
2055         if (rc)
2056                 GOTO(out_free, rc);
2057         rc = name_create(&lmvuuid, lmvname, "_UUID");
2058         if (rc)
2059                 GOTO(out_free, rc);
2060
2061         rc = record_start_log(env, mgs, &llh, logname);
2062         if (rc)
2063                 GOTO(out_free, rc);
2064         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2065                            "add mdc");
2066         if (rc)
2067                 GOTO(out_end, rc);
2068         for (i = 0; i < mti->mti_nid_count; i++) {
2069                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2070                        libcfs_nid2str(mti->mti_nids[i]));
2071
2072                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2073                 if (rc)
2074                         GOTO(out_end, rc);
2075         }
2076
2077         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2078         if (rc)
2079                 GOTO(out_end, rc);
2080         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2081         if (rc)
2082                 GOTO(out_end, rc);
2083         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2084         if (rc)
2085                 GOTO(out_end, rc);
2086         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2087         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2088                             index, "1");
2089         if (rc)
2090                 GOTO(out_end, rc);
2091         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2092                            "add mdc");
2093         if (rc)
2094                 GOTO(out_end, rc);
2095 out_end:
2096         record_end_log(env, &llh);
2097 out_free:
2098         name_destroy(&lmvuuid);
2099         name_destroy(&mdcuuid);
2100         name_destroy(&mdcname);
2101         name_destroy(&nodeuuid);
2102         RETURN(rc);
2103 }
2104
2105 static inline int name_create_lov(char **lovname, char *mdtname,
2106                                   struct fs_db *fsdb, int index)
2107 {
2108         /* COMPAT_180 */
2109         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2110                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2111         else
2112                 return name_create(lovname, mdtname, "-mdtlov");
2113 }
2114
2115 static int name_create_mdt_and_lov(char **logname, char **lovname,
2116                                    struct fs_db *fsdb, int i)
2117 {
2118         int rc;
2119
2120         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2121         if (rc)
2122                 return rc;
2123         /* COMPAT_180 */
2124         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2125                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2126         else
2127                 rc = name_create(lovname, *logname, "-mdtlov");
2128         if (rc) {
2129                 name_destroy(logname);
2130                 *logname = NULL;
2131         }
2132         return rc;
2133 }
2134
2135 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2136                                       struct fs_db *fsdb, int i)
2137 {
2138         char suffix[16];
2139
2140         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2141                 sprintf(suffix, "-osc");
2142         else
2143                 sprintf(suffix, "-osc-MDT%04x", i);
2144         return name_create(oscname, ostname, suffix);
2145 }
2146
2147 /* add new mdc to already existent MDS */
2148 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2149                                     struct mgs_device *mgs,
2150                                     struct fs_db *fsdb,
2151                                     struct mgs_target_info *mti,
2152                                     int mdt_index, char *logname)
2153 {
2154         struct llog_handle      *llh = NULL;
2155         char    *nodeuuid = NULL;
2156         char    *ospname = NULL;
2157         char    *lovuuid = NULL;
2158         char    *mdtuuid = NULL;
2159         char    *svname = NULL;
2160         char    *mdtname = NULL;
2161         char    *lovname = NULL;
2162         char    index_str[16];
2163         int     i, rc;
2164
2165         ENTRY;
2166         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2167                 CERROR("log is empty! Logical error\n");
2168                 RETURN (-EINVAL);
2169         }
2170
2171         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2172                logname);
2173
2174         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2175         if (rc)
2176                 RETURN(rc);
2177
2178         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2179         if (rc)
2180                 GOTO(out_destory, rc);
2181
2182         rc = name_create(&svname, mdtname, "-osp");
2183         if (rc)
2184                 GOTO(out_destory, rc);
2185
2186         sprintf(index_str, "-MDT%04x", mdt_index);
2187         rc = name_create(&ospname, svname, index_str);
2188         if (rc)
2189                 GOTO(out_destory, rc);
2190
2191         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2192         if (rc)
2193                 GOTO(out_destory, rc);
2194
2195         rc = name_create(&lovuuid, lovname, "_UUID");
2196         if (rc)
2197                 GOTO(out_destory, rc);
2198
2199         rc = name_create(&mdtuuid, mdtname, "_UUID");
2200         if (rc)
2201                 GOTO(out_destory, rc);
2202
2203         rc = record_start_log(env, mgs, &llh, logname);
2204         if (rc)
2205                 GOTO(out_destory, rc);
2206
2207         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2208                            "add osp");
2209         if (rc)
2210                 GOTO(out_destory, rc);
2211
2212         for (i = 0; i < mti->mti_nid_count; i++) {
2213                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2214                        libcfs_nid2str(mti->mti_nids[i]));
2215                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2216                 if (rc)
2217                         GOTO(out_end, rc);
2218         }
2219
2220         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2221         if (rc)
2222                 GOTO(out_end, rc);
2223
2224         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2225                           NULL, NULL);
2226         if (rc)
2227                 GOTO(out_end, rc);
2228
2229         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2230         if (rc)
2231                 GOTO(out_end, rc);
2232
2233         /* Add mdc(osp) to lod */
2234         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2235                  mti->mti_stripe_index);
2236         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2237                          index_str, "1", NULL);
2238         if (rc)
2239                 GOTO(out_end, rc);
2240
2241         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2242         if (rc)
2243                 GOTO(out_end, rc);
2244
2245 out_end:
2246         record_end_log(env, &llh);
2247
2248 out_destory:
2249         name_destroy(&mdtuuid);
2250         name_destroy(&lovuuid);
2251         name_destroy(&lovname);
2252         name_destroy(&ospname);
2253         name_destroy(&svname);
2254         name_destroy(&nodeuuid);
2255         name_destroy(&mdtname);
2256         RETURN(rc);
2257 }
2258
2259 static int mgs_write_log_mdt0(const struct lu_env *env,
2260                               struct mgs_device *mgs,
2261                               struct fs_db *fsdb,
2262                               struct mgs_target_info *mti)
2263 {
2264         char *log = mti->mti_svname;
2265         struct llog_handle *llh = NULL;
2266         char *uuid, *lovname;
2267         char mdt_index[6];
2268         char *ptr = mti->mti_params;
2269         int rc = 0, failout = 0;
2270         ENTRY;
2271
2272         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2273         if (uuid == NULL)
2274                 RETURN(-ENOMEM);
2275
2276         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2277                 failout = (strncmp(ptr, "failout", 7) == 0);
2278
2279         rc = name_create(&lovname, log, "-mdtlov");
2280         if (rc)
2281                 GOTO(out_free, rc);
2282         if (mgs_log_is_empty(env, mgs, log)) {
2283                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2284                 if (rc)
2285                         GOTO(out_lod, rc);
2286         }
2287
2288         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2289
2290         rc = record_start_log(env, mgs, &llh, log);
2291         if (rc)
2292                 GOTO(out_lod, rc);
2293
2294         /* add MDT itself */
2295
2296         /* FIXME this whole fn should be a single journal transaction */
2297         sprintf(uuid, "%s_UUID", log);
2298         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2299         if (rc)
2300                 GOTO(out_lod, rc);
2301         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2302         if (rc)
2303                 GOTO(out_end, rc);
2304         rc = record_mount_opt(env, llh, log, lovname, NULL);
2305         if (rc)
2306                 GOTO(out_end, rc);
2307         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2308                         failout ? "n" : "f");
2309         if (rc)
2310                 GOTO(out_end, rc);
2311         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2312         if (rc)
2313                 GOTO(out_end, rc);
2314 out_end:
2315         record_end_log(env, &llh);
2316 out_lod:
2317         name_destroy(&lovname);
2318 out_free:
2319         OBD_FREE(uuid, sizeof(struct obd_uuid));
2320         RETURN(rc);
2321 }
2322
2323 /* envelope method for all layers log */
2324 static int mgs_write_log_mdt(const struct lu_env *env,
2325                              struct mgs_device *mgs,
2326                              struct fs_db *fsdb,
2327                              struct mgs_target_info *mti)
2328 {
2329         struct mgs_thread_info *mgi = mgs_env_info(env);
2330         struct llog_handle *llh = NULL;
2331         char *cliname;
2332         int rc, i = 0;
2333         ENTRY;
2334
2335         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2336
2337         if (mti->mti_uuid[0] == '\0') {
2338                 /* Make up our own uuid */
2339                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2340                          "%s_UUID", mti->mti_svname);
2341         }
2342
2343         /* add mdt */
2344         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2345         if (rc)
2346                 RETURN(rc);
2347         /* Append the mdt info to the client log */
2348         rc = name_create(&cliname, mti->mti_fsname, "-client");
2349         if (rc)
2350                 RETURN(rc);
2351
2352         if (mgs_log_is_empty(env, mgs, cliname)) {
2353                 /* Start client log */
2354                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2355                                        fsdb->fsdb_clilov);
2356                 if (rc)
2357                         GOTO(out_free, rc);
2358                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2359                                        fsdb->fsdb_clilmv);
2360                 if (rc)
2361                         GOTO(out_free, rc);
2362         }
2363
2364         /*
2365         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2366         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2367         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2368         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2369         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2370         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2371         */
2372
2373                 /* copy client info about lov/lmv */
2374                 mgi->mgi_comp.comp_mti = mti;
2375                 mgi->mgi_comp.comp_fsdb = fsdb;
2376
2377                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2378                                                         &mgi->mgi_comp);
2379                 if (rc)
2380                         GOTO(out_free, rc);
2381                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2382                                               fsdb->fsdb_clilmv);
2383                 if (rc)
2384                         GOTO(out_free, rc);
2385
2386                 /* add mountopts */
2387                 rc = record_start_log(env, mgs, &llh, cliname);
2388                 if (rc)
2389                         GOTO(out_free, rc);
2390
2391                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2392                                    "mount opts");
2393                 if (rc)
2394                         GOTO(out_end, rc);
2395                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2396                                       fsdb->fsdb_clilmv);
2397                 if (rc)
2398                         GOTO(out_end, rc);
2399                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2400                                    "mount opts");
2401
2402         if (rc)
2403                 GOTO(out_end, rc);
2404
2405         /* for_all_existing_mdt except current one */
2406         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2407                 if (i !=  mti->mti_stripe_index &&
2408                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2409                         char *logname;
2410
2411                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2412                         if (rc)
2413                                 GOTO(out_end, rc);
2414
2415                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2416                                                       i, logname);
2417                         name_destroy(&logname);
2418                         if (rc)
2419                                 GOTO(out_end, rc);
2420                 }
2421         }
2422 out_end:
2423         record_end_log(env, &llh);
2424 out_free:
2425         name_destroy(&cliname);
2426         RETURN(rc);
2427 }
2428
2429 /* Add the ost info to the client/mdt lov */
2430 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2431                                     struct mgs_device *mgs, struct fs_db *fsdb,
2432                                     struct mgs_target_info *mti,
2433                                     char *logname, char *suffix, char *lovname,
2434                                     enum lustre_sec_part sec_part, int flags)
2435 {
2436         struct llog_handle *llh = NULL;
2437         char *nodeuuid = NULL;
2438         char *oscname = NULL;
2439         char *oscuuid = NULL;
2440         char *lovuuid = NULL;
2441         char *svname = NULL;
2442         char index[6];
2443         int i, rc;
2444
2445         ENTRY;
2446         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2447                mti->mti_svname, logname);
2448
2449         if (mgs_log_is_empty(env, mgs, logname)) {
2450                 CERROR("log is empty! Logical error\n");
2451                 RETURN (-EINVAL);
2452         }
2453
2454         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2455         if (rc)
2456                 RETURN(rc);
2457         rc = name_create(&svname, mti->mti_svname, "-osc");
2458         if (rc)
2459                 GOTO(out_free, rc);
2460
2461         /* for the system upgraded from old 1.8, keep using the old osc naming
2462          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2463         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2464                 rc = name_create(&oscname, svname, "");
2465         else
2466                 rc = name_create(&oscname, svname, suffix);
2467         if (rc)
2468                 GOTO(out_free, rc);
2469
2470         rc = name_create(&oscuuid, oscname, "_UUID");
2471         if (rc)
2472                 GOTO(out_free, rc);
2473         rc = name_create(&lovuuid, lovname, "_UUID");
2474         if (rc)
2475                 GOTO(out_free, rc);
2476
2477
2478         /*
2479         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2480         multihomed (#4)
2481         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2482         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2483         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2484         failover (#6,7)
2485         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2486         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2487         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2488         */
2489
2490         rc = record_start_log(env, mgs, &llh, logname);
2491         if (rc)
2492                 GOTO(out_free, rc);
2493
2494         /* FIXME these should be a single journal transaction */
2495         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2496                            "add osc");
2497         if (rc)
2498                 GOTO(out_end, rc);
2499
2500         /* NB: don't change record order, because upon MDT steal OSC config
2501          * from client, it treats all nids before LCFG_SETUP as target nids
2502          * (multiple interfaces), while nids after as failover node nids.
2503          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2504          */
2505         for (i = 0; i < mti->mti_nid_count; i++) {
2506                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2507                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2508                 if (rc)
2509                         GOTO(out_end, rc);
2510         }
2511         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2512         if (rc)
2513                 GOTO(out_end, rc);
2514         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2515         if (rc)
2516                 GOTO(out_end, rc);
2517         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2518         if (rc)
2519                 GOTO(out_end, rc);
2520
2521         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2522
2523         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2524         if (rc)
2525                 GOTO(out_end, rc);
2526         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2527                            "add osc");
2528         if (rc)
2529                 GOTO(out_end, rc);
2530 out_end:
2531         record_end_log(env, &llh);
2532 out_free:
2533         name_destroy(&lovuuid);
2534         name_destroy(&oscuuid);
2535         name_destroy(&oscname);
2536         name_destroy(&svname);
2537         name_destroy(&nodeuuid);
2538         RETURN(rc);
2539 }
2540
2541 static int mgs_write_log_ost(const struct lu_env *env,
2542                              struct mgs_device *mgs, struct fs_db *fsdb,
2543                              struct mgs_target_info *mti)
2544 {
2545         struct llog_handle *llh = NULL;
2546         char *logname, *lovname;
2547         char *ptr = mti->mti_params;
2548         int rc, flags = 0, failout = 0, i;
2549         ENTRY;
2550
2551         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2552
2553         /* The ost startup log */
2554
2555         /* If the ost log already exists, that means that someone reformatted
2556            the ost and it called target_add again. */
2557         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2558                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2559                                    "exists, yet the server claims it never "
2560                                    "registered. It may have been reformatted, "
2561                                    "or the index changed. writeconf the MDT to "
2562                                    "regenerate all logs.\n", mti->mti_svname);
2563                 RETURN(-EALREADY);
2564         }
2565
2566         /*
2567         attach obdfilter ost1 ost1_UUID
2568         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2569         */
2570         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2571                 failout = (strncmp(ptr, "failout", 7) == 0);
2572         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2573         if (rc)
2574                 RETURN(rc);
2575         /* FIXME these should be a single journal transaction */
2576         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2577         if (rc)
2578                 GOTO(out_end, rc);
2579         if (*mti->mti_uuid == '\0')
2580                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2581                          "%s_UUID", mti->mti_svname);
2582         rc = record_attach(env, llh, mti->mti_svname,
2583                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2584         if (rc)
2585                 GOTO(out_end, rc);
2586         rc = record_setup(env, llh, mti->mti_svname,
2587                           "dev"/*ignored*/, "type"/*ignored*/,
2588                           failout ? "n" : "f", 0/*options*/);
2589         if (rc)
2590                 GOTO(out_end, rc);
2591         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2592         if (rc)
2593                 GOTO(out_end, rc);
2594 out_end:
2595         record_end_log(env, &llh);
2596         if (rc)
2597                 RETURN(rc);
2598         /* We also have to update the other logs where this osc is part of
2599            the lov */
2600
2601         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2602                 /* If we're upgrading, the old mdt log already has our
2603                    entry. Let's do a fake one for fun. */
2604                 /* Note that we can't add any new failnids, since we don't
2605                    know the old osc names. */
2606                 flags = CM_SKIP | CM_UPGRADE146;
2607
2608         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2609                 /* If the update flag isn't set, don't update client/mdt
2610                    logs. */
2611                 flags |= CM_SKIP;
2612                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2613                               "the MDT first to regenerate it.\n",
2614                               mti->mti_svname);
2615         }
2616
2617         /* Add ost to all MDT lov defs */
2618         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2619                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2620                         char mdt_index[9];
2621
2622                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2623                                                      i);
2624                         if (rc)
2625                                 RETURN(rc);
2626                         sprintf(mdt_index, "-MDT%04x", i);
2627                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2628                                                       logname, mdt_index,
2629                                                       lovname, LUSTRE_SP_MDT,
2630                                                       flags);
2631                         name_destroy(&logname);
2632                         name_destroy(&lovname);
2633                         if (rc)
2634                                 RETURN(rc);
2635                 }
2636         }
2637
2638         /* Append ost info to the client log */
2639         rc = name_create(&logname, mti->mti_fsname, "-client");
2640         if (rc)
2641                 RETURN(rc);
2642         if (mgs_log_is_empty(env, mgs, logname)) {
2643                 /* Start client log */
2644                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2645                                        fsdb->fsdb_clilov);
2646                 if (rc)
2647                         GOTO(out_free, rc);
2648                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2649                                        fsdb->fsdb_clilmv);
2650                 if (rc)
2651                         GOTO(out_free, rc);
2652         }
2653         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2654                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2655 out_free:
2656         name_destroy(&logname);
2657         RETURN(rc);
2658 }
2659
2660 static __inline__ int mgs_param_empty(char *ptr)
2661 {
2662         char *tmp;
2663
2664         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2665                 return 1;
2666         return 0;
2667 }
2668
2669 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2670                                           struct mgs_device *mgs,
2671                                           struct fs_db *fsdb,
2672                                           struct mgs_target_info *mti,
2673                                           char *logname, char *cliname)
2674 {
2675         int rc;
2676         struct llog_handle *llh = NULL;
2677
2678         if (mgs_param_empty(mti->mti_params)) {
2679                 /* Remove _all_ failnids */
2680                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2681                                 mti->mti_svname, "add failnid", CM_SKIP);
2682                 return rc < 0 ? rc : 0;
2683         }
2684
2685         /* Otherwise failover nids are additive */
2686         rc = record_start_log(env, mgs, &llh, logname);
2687         if (rc)
2688                 return rc;
2689                 /* FIXME this should be a single journal transaction */
2690         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2691                            "add failnid");
2692         if (rc)
2693                 goto out_end;
2694         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2695         if (rc)
2696                 goto out_end;
2697         rc = record_marker(env, llh, fsdb, CM_END,
2698                            mti->mti_svname, "add failnid");
2699 out_end:
2700         record_end_log(env, &llh);
2701         return rc;
2702 }
2703
2704
2705 /* Add additional failnids to an existing log.
2706    The mdc/osc must have been added to logs first */
2707 /* tcp nids must be in dotted-quad ascii -
2708    we can't resolve hostnames from the kernel. */
2709 static int mgs_write_log_add_failnid(const struct lu_env *env,
2710                                      struct mgs_device *mgs,
2711                                      struct fs_db *fsdb,
2712                                      struct mgs_target_info *mti)
2713 {
2714         char *logname, *cliname;
2715         int rc;
2716         ENTRY;
2717
2718         /* FIXME we currently can't erase the failnids
2719          * given when a target first registers, since they aren't part of
2720          * an "add uuid" stanza */
2721
2722         /* Verify that we know about this target */
2723         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2724                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2725                                    "yet. It must be started before failnids "
2726                                    "can be added.\n", mti->mti_svname);
2727                 RETURN(-ENOENT);
2728         }
2729
2730         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2731         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2732                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2733         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2734                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2735         } else {
2736                 RETURN(-EINVAL);
2737         }
2738         if (rc)
2739                 RETURN(rc);
2740         /* Add failover nids to the client log */
2741         rc = name_create(&logname, mti->mti_fsname, "-client");
2742         if (rc) {
2743                 name_destroy(&cliname);
2744                 RETURN(rc);
2745         }
2746         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2747         name_destroy(&logname);
2748         name_destroy(&cliname);
2749         if (rc)
2750                 RETURN(rc);
2751
2752         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2753                 /* Add OST failover nids to the MDT logs as well */
2754                 int i;
2755
2756                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2757                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2758                                 continue;
2759                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2760                         if (rc)
2761                                 RETURN(rc);
2762                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2763                                                  fsdb, i);
2764                         if (rc) {
2765                                 name_destroy(&logname);
2766                                 RETURN(rc);
2767                         }
2768                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2769                                                             mti, logname,
2770                                                             cliname);
2771                         name_destroy(&cliname);
2772                         name_destroy(&logname);
2773                         if (rc)
2774                                 RETURN(rc);
2775                 }
2776         }
2777
2778         RETURN(rc);
2779 }
2780
2781 static int mgs_wlp_lcfg(const struct lu_env *env,
2782                         struct mgs_device *mgs, struct fs_db *fsdb,
2783                         struct mgs_target_info *mti,
2784                         char *logname, struct lustre_cfg_bufs *bufs,
2785                         char *tgtname, char *ptr)
2786 {
2787         char comment[MTI_NAME_MAXLEN];
2788         char *tmp;
2789         struct lustre_cfg *lcfg;
2790         int rc, del;
2791
2792         /* Erase any old settings of this same parameter */
2793         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2794         comment[MTI_NAME_MAXLEN - 1] = 0;
2795         /* But don't try to match the value. */
2796         if ((tmp = strchr(comment, '=')))
2797             *tmp = 0;
2798         /* FIXME we should skip settings that are the same as old values */
2799         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2800         if (rc < 0)
2801                 return rc;
2802         del = mgs_param_empty(ptr);
2803
2804         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2805                       "Sett" : "Modify", tgtname, comment, logname);
2806         if (del)
2807                 return rc;
2808
2809         lustre_cfg_bufs_reset(bufs, tgtname);
2810         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2811         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2812         if (!lcfg)
2813                 return -ENOMEM;
2814         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2815         lustre_cfg_free(lcfg);
2816         return rc;
2817 }
2818
2819 /* write global variable settings into log */
2820 static int mgs_write_log_sys(const struct lu_env *env,
2821                              struct mgs_device *mgs, struct fs_db *fsdb,
2822                              struct mgs_target_info *mti, char *sys, char *ptr)
2823 {
2824         struct mgs_thread_info *mgi = mgs_env_info(env);
2825         struct lustre_cfg *lcfg;
2826         char *tmp, sep;
2827         int rc, cmd, convert = 1;
2828
2829         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2830                 cmd = LCFG_SET_TIMEOUT;
2831         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2832                 cmd = LCFG_SET_LDLM_TIMEOUT;
2833         /* Check for known params here so we can return error to lctl */
2834         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2835                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2836                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2837                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2838                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2839                 cmd = LCFG_PARAM;
2840         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2841                 convert = 0; /* Don't convert string value to integer */
2842                 cmd = LCFG_PARAM;
2843         } else {
2844                 return -EINVAL;
2845         }
2846
2847         if (mgs_param_empty(ptr))
2848                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2849         else
2850                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2851
2852         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2853         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2854         if (!convert && *tmp != '\0')
2855                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2856         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2857         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2858         /* truncate the comment to the parameter name */
2859         ptr = tmp - 1;
2860         sep = *ptr;
2861         *ptr = '\0';
2862         /* modify all servers and clients */
2863         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2864                                       *tmp == '\0' ? NULL : lcfg,
2865                                       mti->mti_fsname, sys, 0);
2866         if (rc == 0 && *tmp != '\0') {
2867                 switch (cmd) {
2868                 case LCFG_SET_TIMEOUT:
2869                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2870                                 class_process_config(lcfg);
2871                         break;
2872                 case LCFG_SET_LDLM_TIMEOUT:
2873                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2874                                 class_process_config(lcfg);
2875                         break;
2876                 default:
2877                         break;
2878                 }
2879         }
2880         *ptr = sep;
2881         lustre_cfg_free(lcfg);
2882         return rc;
2883 }
2884
2885 /* write quota settings into log */
2886 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2887                                struct fs_db *fsdb, struct mgs_target_info *mti,
2888                                char *quota, char *ptr)
2889 {
2890         struct mgs_thread_info  *mgi = mgs_env_info(env);
2891         struct lustre_cfg       *lcfg;
2892         char                    *tmp;
2893         char                     sep;
2894         int                      rc, cmd = LCFG_PARAM;
2895
2896         /* support only 'meta' and 'data' pools so far */
2897         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2898             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2899                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2900                        "& quota.ost are)\n", ptr);
2901                 return -EINVAL;
2902         }
2903
2904         if (*tmp == '\0') {
2905                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2906         } else {
2907                 CDEBUG(D_MGS, "global '%s'\n", quota);
2908
2909                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2910                     strcmp(tmp, "none") != 0) {
2911                         CERROR("enable option(%s) isn't supported\n", tmp);
2912                         return -EINVAL;
2913                 }
2914         }
2915
2916         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2917         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2918         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2919         /* truncate the comment to the parameter name */
2920         ptr = tmp - 1;
2921         sep = *ptr;
2922         *ptr = '\0';
2923
2924         /* XXX we duplicated quota enable information in all server
2925          *     config logs, it should be moved to a separate config
2926          *     log once we cleanup the config log for global param. */
2927         /* modify all servers */
2928         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2929                                       *tmp == '\0' ? NULL : lcfg,
2930                                       mti->mti_fsname, quota, 1);
2931         *ptr = sep;
2932         lustre_cfg_free(lcfg);
2933         return rc < 0 ? rc : 0;
2934 }
2935
2936 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2937                                    struct mgs_device *mgs,
2938                                    struct fs_db *fsdb,
2939                                    struct mgs_target_info *mti,
2940                                    char *param)
2941 {
2942         struct mgs_thread_info *mgi = mgs_env_info(env);
2943         struct llog_handle     *llh = NULL;
2944         char                   *logname;
2945         char                   *comment, *ptr;
2946         struct lustre_cfg      *lcfg;
2947         int                     rc, len;
2948         ENTRY;
2949
2950         /* get comment */
2951         ptr = strchr(param, '=');
2952         LASSERT(ptr);
2953         len = ptr - param;
2954
2955         OBD_ALLOC(comment, len + 1);
2956         if (comment == NULL)
2957                 RETURN(-ENOMEM);
2958         strncpy(comment, param, len);
2959         comment[len] = '\0';
2960
2961         /* prepare lcfg */
2962         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2963         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2964         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2965         if (lcfg == NULL)
2966                 GOTO(out_comment, rc = -ENOMEM);
2967
2968         /* construct log name */
2969         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2970         if (rc)
2971                 GOTO(out_lcfg, rc);
2972
2973         if (mgs_log_is_empty(env, mgs, logname)) {
2974                 rc = record_start_log(env, mgs, &llh, logname);
2975                 if (rc)
2976                         GOTO(out, rc);
2977                 record_end_log(env, &llh);
2978         }
2979
2980         /* obsolete old one */
2981         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2982                         comment, CM_SKIP);
2983         if (rc < 0)
2984                 GOTO(out, rc);
2985         /* write the new one */
2986         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2987                                   mti->mti_svname, comment);
2988         if (rc)
2989                 CERROR("err %d writing log %s\n", rc, logname);
2990 out:
2991         name_destroy(&logname);
2992 out_lcfg:
2993         lustre_cfg_free(lcfg);
2994 out_comment:
2995         OBD_FREE(comment, len + 1);
2996         RETURN(rc);
2997 }
2998
2999 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3000                                         char *param)
3001 {
3002         char    *ptr;
3003
3004         /* disable the adjustable udesc parameter for now, i.e. use default
3005          * setting that client always ship udesc to MDT if possible. to enable
3006          * it simply remove the following line */
3007         goto error_out;
3008
3009         ptr = strchr(param, '=');
3010         if (ptr == NULL)
3011                 goto error_out;
3012         *ptr++ = '\0';
3013
3014         if (strcmp(param, PARAM_SRPC_UDESC))
3015                 goto error_out;
3016
3017         if (strcmp(ptr, "yes") == 0) {
3018                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3019                 CWARN("Enable user descriptor shipping from client to MDT\n");
3020         } else if (strcmp(ptr, "no") == 0) {
3021                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3022                 CWARN("Disable user descriptor shipping from client to MDT\n");
3023         } else {
3024                 *(ptr - 1) = '=';
3025                 goto error_out;
3026         }
3027         return 0;
3028
3029 error_out:
3030         CERROR("Invalid param: %s\n", param);
3031         return -EINVAL;
3032 }
3033
3034 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3035                                   const char *svname,
3036                                   char *param)
3037 {
3038         struct sptlrpc_rule      rule;
3039         struct sptlrpc_rule_set *rset;
3040         int                      rc;
3041         ENTRY;
3042
3043         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3044                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3045                 RETURN(-EINVAL);
3046         }
3047
3048         if (strncmp(param, PARAM_SRPC_UDESC,
3049                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3050                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3051         }
3052
3053         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3054                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3055                 RETURN(-EINVAL);
3056         }
3057
3058         param += sizeof(PARAM_SRPC_FLVR) - 1;
3059
3060         rc = sptlrpc_parse_rule(param, &rule);
3061         if (rc)
3062                 RETURN(rc);
3063
3064         /* mgs rules implies must be mgc->mgs */
3065         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3066                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3067                      rule.sr_from != LUSTRE_SP_ANY) ||
3068                     (rule.sr_to != LUSTRE_SP_MGS &&
3069                      rule.sr_to != LUSTRE_SP_ANY))
3070                         RETURN(-EINVAL);
3071         }
3072
3073         /* preapre room for this coming rule. svcname format should be:
3074          * - fsname: general rule
3075          * - fsname-tgtname: target-specific rule
3076          */
3077         if (strchr(svname, '-')) {
3078                 struct mgs_tgt_srpc_conf *tgtconf;
3079                 int                       found = 0;
3080
3081                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3082                      tgtconf = tgtconf->mtsc_next) {
3083                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3084                                 found = 1;
3085                                 break;
3086                         }
3087                 }
3088
3089                 if (!found) {
3090                         int name_len;
3091
3092                         OBD_ALLOC_PTR(tgtconf);
3093                         if (tgtconf == NULL)
3094                                 RETURN(-ENOMEM);
3095
3096                         name_len = strlen(svname);
3097
3098                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3099                         if (tgtconf->mtsc_tgt == NULL) {
3100                                 OBD_FREE_PTR(tgtconf);
3101                                 RETURN(-ENOMEM);
3102                         }
3103                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3104
3105                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3106                         fsdb->fsdb_srpc_tgt = tgtconf;
3107                 }
3108
3109                 rset = &tgtconf->mtsc_rset;
3110         } else {
3111                 rset = &fsdb->fsdb_srpc_gen;
3112         }
3113
3114         rc = sptlrpc_rule_set_merge(rset, &rule);
3115
3116         RETURN(rc);
3117 }
3118
3119 static int mgs_srpc_set_param(const struct lu_env *env,
3120                               struct mgs_device *mgs,
3121                               struct fs_db *fsdb,
3122                               struct mgs_target_info *mti,
3123                               char *param)
3124 {
3125         char                   *copy;
3126         int                     rc, copy_size;
3127         ENTRY;
3128
3129 #ifndef HAVE_GSS
3130         RETURN(-EINVAL);
3131 #endif
3132         /* keep a copy of original param, which could be destroied
3133          * during parsing */
3134         copy_size = strlen(param) + 1;
3135         OBD_ALLOC(copy, copy_size);
3136         if (copy == NULL)
3137                 return -ENOMEM;
3138         memcpy(copy, param, copy_size);
3139
3140         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3141         if (rc)
3142                 goto out_free;
3143
3144         /* previous steps guaranteed the syntax is correct */
3145         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3146         if (rc)
3147                 goto out_free;
3148
3149         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3150                 /*
3151                  * for mgs rules, make them effective immediately.
3152                  */
3153                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3154                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3155                                                  &fsdb->fsdb_srpc_gen);
3156         }
3157
3158 out_free:
3159         OBD_FREE(copy, copy_size);
3160         RETURN(rc);
3161 }
3162
3163 struct mgs_srpc_read_data {
3164         struct fs_db   *msrd_fsdb;
3165         int             msrd_skip;
3166 };
3167
3168 static int mgs_srpc_read_handler(const struct lu_env *env,
3169                                  struct llog_handle *llh,
3170                                  struct llog_rec_hdr *rec, void *data)
3171 {
3172         struct mgs_srpc_read_data *msrd = data;
3173         struct cfg_marker         *marker;
3174         struct lustre_cfg         *lcfg = REC_DATA(rec);
3175         char                      *svname, *param;
3176         int                        cfg_len, rc;
3177         ENTRY;
3178
3179         if (rec->lrh_type != OBD_CFG_REC) {
3180                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3181                 RETURN(-EINVAL);
3182         }
3183
3184         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3185                   sizeof(struct llog_rec_tail);
3186
3187         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3188         if (rc) {
3189                 CERROR("Insane cfg\n");
3190                 RETURN(rc);
3191         }
3192
3193         if (lcfg->lcfg_command == LCFG_MARKER) {
3194                 marker = lustre_cfg_buf(lcfg, 1);
3195
3196                 if (marker->cm_flags & CM_START &&
3197                     marker->cm_flags & CM_SKIP)
3198                         msrd->msrd_skip = 1;
3199                 if (marker->cm_flags & CM_END)
3200                         msrd->msrd_skip = 0;
3201
3202                 RETURN(0);
3203         }
3204
3205         if (msrd->msrd_skip)
3206                 RETURN(0);
3207
3208         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3209                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3210                 RETURN(0);
3211         }
3212
3213         svname = lustre_cfg_string(lcfg, 0);
3214         if (svname == NULL) {
3215                 CERROR("svname is empty\n");
3216                 RETURN(0);
3217         }
3218
3219         param = lustre_cfg_string(lcfg, 1);
3220         if (param == NULL) {
3221                 CERROR("param is empty\n");
3222                 RETURN(0);
3223         }
3224
3225         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3226         if (rc)
3227                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3228
3229         RETURN(0);
3230 }
3231
3232 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3233                                 struct mgs_device *mgs,
3234                                 struct fs_db *fsdb)
3235 {
3236         struct llog_handle        *llh = NULL;
3237         struct llog_ctxt          *ctxt;
3238         char                      *logname;
3239         struct mgs_srpc_read_data  msrd;
3240         int                        rc;
3241         ENTRY;
3242
3243         /* construct log name */
3244         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3245         if (rc)
3246                 RETURN(rc);
3247
3248         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3249         LASSERT(ctxt != NULL);
3250
3251         if (mgs_log_is_empty(env, mgs, logname))
3252                 GOTO(out, rc = 0);
3253
3254         rc = llog_open(env, ctxt, &llh, NULL, logname,
3255                        LLOG_OPEN_EXISTS);
3256         if (rc < 0) {
3257                 if (rc == -ENOENT)
3258                         rc = 0;
3259                 GOTO(out, rc);
3260         }
3261
3262         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3263         if (rc)
3264                 GOTO(out_close, rc);
3265
3266         if (llog_get_size(llh) <= 1)
3267                 GOTO(out_close, rc = 0);
3268
3269         msrd.msrd_fsdb = fsdb;
3270         msrd.msrd_skip = 0;
3271
3272         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3273                           NULL);
3274
3275 out_close:
3276         llog_close(env, llh);
3277 out:
3278         llog_ctxt_put(ctxt);
3279         name_destroy(&logname);
3280
3281         if (rc)
3282                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3283         RETURN(rc);
3284 }
3285
3286 /* Permanent settings of all parameters by writing into the appropriate
3287  * configuration logs.
3288  * A parameter with null value ("<param>='\0'") means to erase it out of
3289  * the logs.
3290  */
3291 static int mgs_write_log_param(const struct lu_env *env,
3292                                struct mgs_device *mgs, struct fs_db *fsdb,
3293                                struct mgs_target_info *mti, char *ptr)
3294 {
3295         struct mgs_thread_info *mgi = mgs_env_info(env);
3296         char *logname;
3297         char *tmp;
3298         int rc = 0, rc2 = 0;
3299         ENTRY;
3300
3301         /* For various parameter settings, we have to figure out which logs
3302            care about them (e.g. both mdt and client for lov settings) */
3303         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3304
3305         /* The params are stored in MOUNT_DATA_FILE and modified via
3306            tunefs.lustre, or set using lctl conf_param */
3307
3308         /* Processed in lustre_start_mgc */
3309         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3310                 GOTO(end, rc);
3311
3312         /* Processed in ost/mdt */
3313         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3314                 GOTO(end, rc);
3315
3316         /* Processed in mgs_write_log_ost */
3317         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3318                 if (mti->mti_flags & LDD_F_PARAM) {
3319                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3320                                            "changed with tunefs.lustre"
3321                                            "and --writeconf\n", ptr);
3322                         rc = -EPERM;
3323                 }
3324                 GOTO(end, rc);
3325         }
3326
3327         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3328                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3329                 GOTO(end, rc);
3330         }
3331
3332         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3333                 /* Add a failover nidlist */
3334                 rc = 0;
3335                 /* We already processed failovers params for new
3336                    targets in mgs_write_log_target */
3337                 if (mti->mti_flags & LDD_F_PARAM) {
3338                         CDEBUG(D_MGS, "Adding failnode\n");
3339                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3340                 }
3341                 GOTO(end, rc);
3342         }
3343
3344         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3345                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3346                 GOTO(end, rc);
3347         }
3348
3349         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3350                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3351                 GOTO(end, rc);
3352         }
3353
3354         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3355                 /* active=0 means off, anything else means on */
3356                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3357                 int i;
3358
3359                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3360                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3361                                            "be (de)activated.\n",
3362                                            mti->mti_svname);
3363                         GOTO(end, rc = -EINVAL);
3364                 }
3365                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3366                               flag ? "de": "re", mti->mti_svname);
3367                 /* Modify clilov */
3368                 rc = name_create(&logname, mti->mti_fsname, "-client");
3369                 if (rc)
3370                         GOTO(end, rc);
3371                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3372                                 mti->mti_svname, "add osc", flag);
3373                 name_destroy(&logname);
3374                 if (rc)
3375                         goto active_err;
3376                 /* Modify mdtlov */
3377                 /* Add to all MDT logs for CMD */
3378                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3379                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3380                                 continue;
3381                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3382                         if (rc)
3383                                 GOTO(end, rc);
3384                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3385                                         mti->mti_svname, "add osc", flag);
3386                         name_destroy(&logname);
3387                         if (rc)
3388                                 goto active_err;
3389                 }
3390         active_err:
3391                 if (rc) {
3392                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3393                                            "log (%d). No permanent "
3394                                            "changes were made to the "
3395                                            "config log.\n",
3396                                            mti->mti_svname, rc);
3397                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3398                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3399                                                    " because the log"
3400                                                    "is in the old 1.4"
3401                                                    "style. Consider "
3402                                                    " --writeconf to "
3403                                                    "update the logs.\n");
3404                         GOTO(end, rc);
3405                 }
3406                 /* Fall through to osc proc for deactivating live OSC
3407                    on running MDT / clients. */
3408         }
3409         /* Below here, let obd's XXX_process_config methods handle it */
3410
3411         /* All lov. in proc */
3412         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3413                 char *mdtlovname;
3414
3415                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3416                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3417                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3418                                            "set on the MDT, not %s. "
3419                                            "Ignoring.\n",
3420                                            mti->mti_svname);
3421                         GOTO(end, rc = 0);
3422                 }
3423
3424                 /* Modify mdtlov */
3425                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3426                         GOTO(end, rc = -ENODEV);
3427
3428                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3429                                              mti->mti_stripe_index);
3430                 if (rc)
3431                         GOTO(end, rc);
3432                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3433                                   &mgi->mgi_bufs, mdtlovname, ptr);
3434                 name_destroy(&logname);
3435                 name_destroy(&mdtlovname);
3436                 if (rc)
3437                         GOTO(end, rc);
3438
3439                 /* Modify clilov */
3440                 rc = name_create(&logname, mti->mti_fsname, "-client");
3441                 if (rc)
3442                         GOTO(end, rc);
3443                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3444                                   fsdb->fsdb_clilov, ptr);
3445                 name_destroy(&logname);
3446                 GOTO(end, rc);
3447         }
3448
3449         /* All osc., mdc., llite. params in proc */
3450         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3451             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3452             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3453                 char *cname;
3454
3455                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3456                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3457                                            " cannot be modified. Consider"
3458                                            " updating the configuration with"
3459                                            " --writeconf\n",
3460                                            mti->mti_svname);
3461                         GOTO(end, rc = -EINVAL);
3462                 }
3463                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3464                         rc = name_create(&cname, mti->mti_fsname, "-client");
3465                         /* Add the client type to match the obdname in
3466                            class_config_llog_handler */
3467                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3468                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3469                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3470                         rc = name_create(&cname, mti->mti_svname, "-osc");
3471                 } else {
3472                         GOTO(end, rc = -EINVAL);
3473                 }
3474                 if (rc)
3475                         GOTO(end, rc);
3476
3477                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3478
3479                 /* Modify client */
3480                 rc = name_create(&logname, mti->mti_fsname, "-client");
3481                 if (rc) {
3482                         name_destroy(&cname);
3483                         GOTO(end, rc);
3484                 }
3485                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3486                                   cname, ptr);
3487
3488                 /* osc params affect the MDT as well */
3489                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3490                         int i;
3491
3492                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3493                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3494                                         continue;
3495                                 name_destroy(&cname);
3496                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3497                                                          fsdb, i);
3498                                 name_destroy(&logname);
3499                                 if (rc)
3500                                         break;
3501                                 rc = name_create_mdt(&logname,
3502                                                      mti->mti_fsname, i);
3503                                 if (rc)
3504                                         break;
3505                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3506                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3507                                                           mti, logname,
3508                                                           &mgi->mgi_bufs,
3509                                                           cname, ptr);
3510                                         if (rc)
3511                                                 break;
3512                                 }
3513                         }
3514                 }
3515                 name_destroy(&logname);
3516                 name_destroy(&cname);
3517                 GOTO(end, rc);
3518         }
3519
3520         /* All mdt. params in proc */
3521         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3522                 int i;
3523                 __u32 idx;
3524
3525                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3526                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3527                             MTI_NAME_MAXLEN) == 0)
3528                         /* device is unspecified completely? */
3529                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3530                 else
3531                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3532                 if (rc < 0)
3533                         goto active_err;
3534                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3535                         goto active_err;
3536                 if (rc & LDD_F_SV_ALL) {
3537                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3538                                 if (!test_bit(i,
3539                                                   fsdb->fsdb_mdt_index_map))
3540                                         continue;
3541                                 rc = name_create_mdt(&logname,
3542                                                 mti->mti_fsname, i);
3543                                 if (rc)
3544                                         goto active_err;
3545                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3546                                                   logname, &mgi->mgi_bufs,
3547                                                   logname, ptr);
3548                                 name_destroy(&logname);
3549                                 if (rc)
3550                                         goto active_err;
3551                         }
3552                 } else {
3553                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3554                                           mti->mti_svname, &mgi->mgi_bufs,
3555                                           mti->mti_svname, ptr);
3556                         if (rc)
3557                                 goto active_err;
3558                 }
3559                 GOTO(end, rc);
3560         }
3561
3562         /* All mdd., ost. params in proc */
3563         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3564             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
3565                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3566                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3567                         GOTO(end, rc = -ENODEV);
3568
3569                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3570                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3571                 GOTO(end, rc);
3572         }
3573
3574         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3575         rc2 = -ENOSYS;
3576
3577 end:
3578         if (rc)
3579                 CERROR("err %d on param '%s'\n", rc, ptr);
3580
3581         RETURN(rc ?: rc2);
3582 }
3583
3584 /* Not implementing automatic failover nid addition at this time. */
3585 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3586                       struct mgs_target_info *mti)
3587 {
3588 #if 0
3589         struct fs_db *fsdb;
3590         int rc;
3591         ENTRY;
3592
3593         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3594         if (rc)
3595                 RETURN(rc);
3596
3597         if (mgs_log_is_empty(obd, mti->mti_svname))
3598                 /* should never happen */
3599                 RETURN(-ENOENT);
3600
3601         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3602
3603         /* FIXME We can just check mti->params to see if we're already in
3604            the failover list.  Modify mti->params for rewriting back at
3605            server_register_target(). */
3606
3607         mutex_lock(&fsdb->fsdb_mutex);
3608         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3609         mutex_unlock(&fsdb->fsdb_mutex);
3610
3611         RETURN(rc);
3612 #endif
3613         return 0;
3614 }
3615
3616 int mgs_write_log_target(const struct lu_env *env,
3617                          struct mgs_device *mgs,
3618                          struct mgs_target_info *mti,
3619                          struct fs_db *fsdb)
3620 {
3621         int rc = -EINVAL;
3622         char *buf, *params;
3623         ENTRY;
3624
3625         /* set/check the new target index */
3626         rc = mgs_set_index(env, mgs, mti);
3627         if (rc < 0) {
3628                 CERROR("Can't get index (%d)\n", rc);
3629                 RETURN(rc);
3630         }
3631
3632         if (rc == EALREADY) {
3633                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3634                               mti->mti_stripe_index, mti->mti_svname);
3635                 /* We would like to mark old log sections as invalid
3636                    and add new log sections in the client and mdt logs.
3637                    But if we add new sections, then live clients will
3638                    get repeat setup instructions for already running
3639                    osc's. So don't update the client/mdt logs. */
3640                 mti->mti_flags &= ~LDD_F_UPDATE;
3641                 rc = 0;
3642         }
3643
3644         mutex_lock(&fsdb->fsdb_mutex);
3645
3646         if (mti->mti_flags &
3647             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3648                 /* Generate a log from scratch */
3649                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3650                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3651                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3652                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3653                 } else {
3654                         CERROR("Unknown target type %#x, can't create log for "
3655                                "%s\n", mti->mti_flags, mti->mti_svname);
3656                 }
3657                 if (rc) {
3658                         CERROR("Can't write logs for %s (%d)\n",
3659                                mti->mti_svname, rc);
3660                         GOTO(out_up, rc);
3661                 }
3662         } else {
3663                 /* Just update the params from tunefs in mgs_write_log_params */
3664                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3665                 mti->mti_flags |= LDD_F_PARAM;
3666         }
3667
3668         /* allocate temporary buffer, where class_get_next_param will
3669            make copy of a current  parameter */
3670         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3671         if (buf == NULL)
3672                 GOTO(out_up, rc = -ENOMEM);
3673         params = mti->mti_params;
3674         while (params != NULL) {
3675                 rc = class_get_next_param(&params, buf);
3676                 if (rc) {
3677                         if (rc == 1)
3678                                 /* there is no next parameter, that is
3679                                    not an error */
3680                                 rc = 0;
3681                         break;
3682                 }
3683                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3684                        params, buf);
3685                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3686                 if (rc)
3687                         break;
3688         }
3689
3690         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3691
3692 out_up:
3693         mutex_unlock(&fsdb->fsdb_mutex);
3694         RETURN(rc);
3695 }
3696
3697 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3698 {
3699         struct llog_ctxt        *ctxt;
3700         int                      rc = 0;
3701
3702         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3703         if (ctxt == NULL) {
3704                 CERROR("%s: MGS config context doesn't exist\n",
3705                        mgs->mgs_obd->obd_name);
3706                 rc = -ENODEV;
3707         } else {
3708                 rc = llog_erase(env, ctxt, NULL, name);
3709                 /* llog may not exist */
3710                 if (rc == -ENOENT)
3711                         rc = 0;
3712                 llog_ctxt_put(ctxt);
3713         }
3714
3715         if (rc)
3716                 CERROR("%s: failed to clear log %s: %d\n",
3717                        mgs->mgs_obd->obd_name, name, rc);
3718
3719         return rc;
3720 }
3721
3722 /* erase all logs for the given fs */
3723 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3724 {
3725         struct fs_db *fsdb;
3726         cfs_list_t list;
3727         struct mgs_direntry *dirent, *n;
3728         int rc, len = strlen(fsname);
3729         char *suffix;
3730         ENTRY;
3731
3732         /* Find all the logs in the CONFIGS directory */
3733         rc = class_dentry_readdir(env, mgs, &list);
3734         if (rc)
3735                 RETURN(rc);
3736
3737         mutex_lock(&mgs->mgs_mutex);
3738
3739         /* Delete the fs db */
3740         fsdb = mgs_find_fsdb(mgs, fsname);
3741         if (fsdb)
3742                 mgs_free_fsdb(mgs, fsdb);
3743
3744         mutex_unlock(&mgs->mgs_mutex);
3745
3746         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3747                 cfs_list_del(&dirent->list);
3748                 suffix = strrchr(dirent->name, '-');
3749                 if (suffix != NULL) {
3750                         if ((len == suffix - dirent->name) &&
3751                             (strncmp(fsname, dirent->name, len) == 0)) {
3752                                 CDEBUG(D_MGS, "Removing log %s\n",
3753                                        dirent->name);
3754                                 mgs_erase_log(env, mgs, dirent->name);
3755                         }
3756                 }
3757                 mgs_direntry_free(dirent);
3758         }
3759
3760         RETURN(rc);
3761 }
3762
3763 /* from llog_swab */
3764 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3765 {
3766         int i;
3767         ENTRY;
3768
3769         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3770         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3771
3772         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3773         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3774         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3775         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3776
3777         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3778         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3779                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3780                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3781                                i, lcfg->lcfg_buflens[i],
3782                                lustre_cfg_string(lcfg, i));
3783                 }
3784         EXIT;
3785 }
3786
3787 /* Set a permanent (config log) param for a target or fs
3788  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3789  *             buf1 contains the single parameter
3790  */
3791 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3792                  struct lustre_cfg *lcfg, char *fsname)
3793 {
3794         struct fs_db *fsdb;
3795         struct mgs_target_info *mti;
3796         char *devname, *param;
3797         char *ptr;
3798         const char *tmp;
3799         __u32 index;
3800         int rc = 0;
3801         ENTRY;
3802
3803         print_lustre_cfg(lcfg);
3804
3805         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3806         devname = lustre_cfg_string(lcfg, 0);
3807         param = lustre_cfg_string(lcfg, 1);
3808         if (!devname) {
3809                 /* Assume device name embedded in param:
3810                    lustre-OST0000.osc.max_dirty_mb=32 */
3811                 ptr = strchr(param, '.');
3812                 if (ptr) {
3813                         devname = param;
3814                         *ptr = 0;
3815                         param = ptr + 1;
3816                 }
3817         }
3818         if (!devname) {
3819                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3820                 RETURN(-ENOSYS);
3821         }
3822
3823         rc = mgs_parse_devname(devname, fsname, NULL);
3824         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3825                 /* param related to llite isn't allowed to set by OST or MDT */
3826                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3827                                    sizeof(PARAM_LLITE)) == 0)
3828                         RETURN(-EINVAL);
3829         } else {
3830                 /* assume devname is the fsname */
3831                 memset(fsname, 0, MTI_NAME_MAXLEN);
3832                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3833                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3834         }
3835         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3836
3837         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3838         if (rc)
3839                 RETURN(rc);
3840         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3841             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3842                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3843                        "is '%s'\n", fsname, devname);
3844                 mgs_free_fsdb(mgs, fsdb);
3845                 RETURN(-EINVAL);
3846         }
3847
3848         /* Create a fake mti to hold everything */
3849         OBD_ALLOC_PTR(mti);
3850         if (!mti)
3851                 GOTO(out, rc = -ENOMEM);
3852         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3853             >= sizeof(mti->mti_fsname))
3854                 GOTO(out, rc = -E2BIG);
3855         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3856             >= sizeof(mti->mti_svname))
3857                 GOTO(out, rc = -E2BIG);
3858         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3859             >= sizeof(mti->mti_params))
3860                 GOTO(out, rc = -E2BIG);
3861         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3862         if (rc < 0)
3863                 /* Not a valid server; may be only fsname */
3864                 rc = 0;
3865         else
3866                 /* Strip -osc or -mdc suffix from svname */
3867                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3868                                      mti->mti_svname))
3869                         GOTO(out, rc = -EINVAL);
3870
3871         mti->mti_flags = rc | LDD_F_PARAM;
3872
3873         mutex_lock(&fsdb->fsdb_mutex);
3874         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3875         mutex_unlock(&fsdb->fsdb_mutex);
3876
3877         /*
3878          * Revoke lock so everyone updates.  Should be alright if
3879          * someone was already reading while we were updating the logs,
3880          * so we don't really need to hold the lock while we're
3881          * writing (above).
3882          */
3883         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3884 out:
3885         OBD_FREE_PTR(mti);
3886         RETURN(rc);
3887 }
3888
3889 static int mgs_write_log_pool(const struct lu_env *env,
3890                               struct mgs_device *mgs, char *logname,
3891                               struct fs_db *fsdb, char *lovname,
3892                               enum lcfg_command_type cmd,
3893                               char *poolname, char *fsname,
3894                               char *ostname, char *comment)
3895 {
3896         struct llog_handle *llh = NULL;
3897         int rc;
3898
3899         rc = record_start_log(env, mgs, &llh, logname);
3900         if (rc)
3901                 return rc;
3902         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3903         if (rc)
3904                 goto out;
3905         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3906         if (rc)
3907                 goto out;
3908         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3909 out:
3910         record_end_log(env, &llh);
3911         return rc;
3912 }
3913
3914 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3915                  enum lcfg_command_type cmd, char *fsname,
3916                  char *poolname, char *ostname)
3917 {
3918         struct fs_db *fsdb;
3919         char *lovname;
3920         char *logname;
3921         char *label = NULL, *canceled_label = NULL;
3922         int label_sz;
3923         struct mgs_target_info *mti = NULL;
3924         int rc, i;
3925         ENTRY;
3926
3927         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3928         if (rc) {
3929                 CERROR("Can't get db for %s\n", fsname);
3930                 RETURN(rc);
3931         }
3932         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3933                 CERROR("%s is not defined\n", fsname);
3934                 mgs_free_fsdb(mgs, fsdb);
3935                 RETURN(-EINVAL);
3936         }
3937
3938         label_sz = 10 + strlen(fsname) + strlen(poolname);
3939
3940         /* check if ostname match fsname */
3941         if (ostname != NULL) {
3942                 char *ptr;
3943
3944                 ptr = strrchr(ostname, '-');
3945                 if ((ptr == NULL) ||
3946                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3947                         RETURN(-EINVAL);
3948                 label_sz += strlen(ostname);
3949         }
3950
3951         OBD_ALLOC(label, label_sz);
3952         if (label == NULL)
3953                 RETURN(-ENOMEM);
3954
3955         switch(cmd) {
3956         case LCFG_POOL_NEW:
3957                 sprintf(label,
3958                         "new %s.%s", fsname, poolname);
3959                 break;
3960         case LCFG_POOL_ADD:
3961                 sprintf(label,
3962                         "add %s.%s.%s", fsname, poolname, ostname);
3963                 break;
3964         case LCFG_POOL_REM:
3965                 OBD_ALLOC(canceled_label, label_sz);
3966                 if (canceled_label == NULL)
3967                         GOTO(out_label, rc = -ENOMEM);
3968                 sprintf(label,
3969                         "rem %s.%s.%s", fsname, poolname, ostname);
3970                 sprintf(canceled_label,
3971                         "add %s.%s.%s", fsname, poolname, ostname);
3972                 break;
3973         case LCFG_POOL_DEL:
3974                 OBD_ALLOC(canceled_label, label_sz);
3975                 if (canceled_label == NULL)
3976                         GOTO(out_label, rc = -ENOMEM);
3977                 sprintf(label,
3978                         "del %s.%s", fsname, poolname);
3979                 sprintf(canceled_label,
3980                         "new %s.%s", fsname, poolname);
3981                 break;
3982         default:
3983                 break;
3984         }
3985
3986         mutex_lock(&fsdb->fsdb_mutex);
3987
3988         if (canceled_label != NULL) {
3989                 OBD_ALLOC_PTR(mti);
3990                 if (mti == NULL)
3991                         GOTO(out_cancel, rc = -ENOMEM);
3992         }
3993
3994         /* write pool def to all MDT logs */
3995         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3996                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3997                         rc = name_create_mdt_and_lov(&logname, &lovname,
3998                                                      fsdb, i);
3999                         if (rc) {
4000                                 mutex_unlock(&fsdb->fsdb_mutex);
4001                                 GOTO(out_mti, rc);
4002                         }
4003                         if (canceled_label != NULL) {
4004                                 strcpy(mti->mti_svname, "lov pool");
4005                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4006                                                 lovname, canceled_label,
4007                                                 CM_SKIP);
4008                         }
4009
4010                         if (rc >= 0)
4011                                 rc = mgs_write_log_pool(env, mgs, logname,
4012                                                         fsdb, lovname, cmd,
4013                                                         fsname, poolname,
4014                                                         ostname, label);
4015                         name_destroy(&logname);
4016                         name_destroy(&lovname);
4017                         if (rc) {
4018                                 mutex_unlock(&fsdb->fsdb_mutex);
4019                                 GOTO(out_mti, rc);
4020                         }
4021                 }
4022         }
4023
4024         rc = name_create(&logname, fsname, "-client");
4025         if (rc) {
4026                 mutex_unlock(&fsdb->fsdb_mutex);
4027                 GOTO(out_mti, rc);
4028         }
4029         if (canceled_label != NULL) {
4030                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4031                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4032                 if (rc < 0) {
4033                         mutex_unlock(&fsdb->fsdb_mutex);
4034                         name_destroy(&logname);
4035                         GOTO(out_mti, rc);
4036                 }
4037         }
4038
4039         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4040                                 cmd, fsname, poolname, ostname, label);
4041         mutex_unlock(&fsdb->fsdb_mutex);
4042         name_destroy(&logname);
4043         /* request for update */
4044         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4045
4046         EXIT;
4047 out_mti:
4048         if (mti != NULL)
4049                 OBD_FREE_PTR(mti);
4050 out_cancel:
4051         if (canceled_label != NULL)
4052                 OBD_FREE(canceled_label, label_sz);
4053 out_label:
4054         OBD_FREE(label, label_sz);
4055         return rc;
4056 }
4057
4058