Whamcloud - gitweb
70a83dc10f6ecf1a03ff20e2a7d1732b1be7e708
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(ERR_PTR(-EINVAL));
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(ERR_PTR(-ENOMEM));
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351                 fsdb->fsdb_mgs = mgs;
352         } else {
353                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
354                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
355                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
356                         CERROR("No memory for index maps\n");
357                         GOTO(err, rc = -ENOMEM);
358                 }
359
360                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
361                 if (rc)
362                         GOTO(err, rc);
363                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
364                 if (rc)
365                         GOTO(err, rc);
366
367                 /* initialise data for NID table */
368                 mgs_ir_init_fs(env, mgs, fsdb);
369
370                 lproc_mgs_add_live(mgs, fsdb);
371         }
372
373         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
374
375         RETURN(fsdb);
376 err:
377         if (fsdb->fsdb_ost_index_map)
378                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
379         if (fsdb->fsdb_mdt_index_map)
380                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
381         name_destroy(&fsdb->fsdb_clilov);
382         name_destroy(&fsdb->fsdb_clilmv);
383         OBD_FREE_PTR(fsdb);
384         RETURN(ERR_PTR(rc));
385 }
386
387 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
388 {
389         /* wait for anyone with the sem */
390         mutex_lock(&fsdb->fsdb_mutex);
391         lproc_mgs_del_live(mgs, fsdb);
392         list_del(&fsdb->fsdb_list);
393
394         /* deinitialize fsr */
395         mgs_ir_fini_fs(mgs, fsdb);
396
397         if (fsdb->fsdb_ost_index_map)
398                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
399         if (fsdb->fsdb_mdt_index_map)
400                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
401         name_destroy(&fsdb->fsdb_clilov);
402         name_destroy(&fsdb->fsdb_clilmv);
403         mgs_free_fsdb_srpc(fsdb);
404         mutex_unlock(&fsdb->fsdb_mutex);
405         OBD_FREE_PTR(fsdb);
406 }
407
408 int mgs_init_fsdb_list(struct mgs_device *mgs)
409 {
410         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
411         return 0;
412 }
413
414 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
415 {
416         struct fs_db *fsdb;
417         struct list_head *tmp, *tmp2;
418
419         mutex_lock(&mgs->mgs_mutex);
420         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
421                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
422                 mgs_free_fsdb(mgs, fsdb);
423         }
424         mutex_unlock(&mgs->mgs_mutex);
425         return 0;
426 }
427
428 int mgs_find_or_make_fsdb(const struct lu_env *env,
429                           struct mgs_device *mgs, char *name,
430                           struct fs_db **dbh)
431 {
432         struct fs_db *fsdb;
433         int rc = 0;
434
435         ENTRY;
436         mutex_lock(&mgs->mgs_mutex);
437         fsdb = mgs_find_fsdb(mgs, name);
438         if (fsdb) {
439                 mutex_unlock(&mgs->mgs_mutex);
440                 *dbh = fsdb;
441                 RETURN(0);
442         }
443
444         CDEBUG(D_MGS, "Creating new db\n");
445         fsdb = mgs_new_fsdb(env, mgs, name);
446         /* lock fsdb_mutex until the db is loaded from llogs */
447         if (!IS_ERR(fsdb))
448                 mutex_lock(&fsdb->fsdb_mutex);
449         mutex_unlock(&mgs->mgs_mutex);
450         if (IS_ERR(fsdb))
451                 RETURN(PTR_ERR(fsdb));
452
453         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
454                 /* populate the db from the client llog */
455                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
456                 if (rc) {
457                         CERROR("Can't get db from client log %d\n", rc);
458                         GOTO(out_free, rc);
459                 }
460         }
461
462         /* populate srpc rules from params llog */
463         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
464         if (rc) {
465                 CERROR("Can't get db from params log %d\n", rc);
466                 GOTO(out_free, rc);
467         }
468
469         mutex_unlock(&fsdb->fsdb_mutex);
470         *dbh = fsdb;
471
472         RETURN(0);
473
474 out_free:
475         mutex_unlock(&fsdb->fsdb_mutex);
476         mgs_free_fsdb(mgs, fsdb);
477         return rc;
478 }
479
480 /* 1 = index in use
481    0 = index unused
482    -1= empty client log */
483 int mgs_check_index(const struct lu_env *env,
484                     struct mgs_device *mgs,
485                     struct mgs_target_info *mti)
486 {
487         struct fs_db *fsdb;
488         void *imap;
489         int rc = 0;
490         ENTRY;
491
492         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
493
494         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
495         if (rc) {
496                 CERROR("Can't get db for %s\n", mti->mti_fsname);
497                 RETURN(rc);
498         }
499
500         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
501                 RETURN(-1);
502
503         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
504                 imap = fsdb->fsdb_ost_index_map;
505         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
506                 imap = fsdb->fsdb_mdt_index_map;
507         else
508                 RETURN(-EINVAL);
509
510         if (test_bit(mti->mti_stripe_index, imap))
511                 RETURN(1);
512         RETURN(0);
513 }
514
515 static __inline__ int next_index(void *index_map, int map_len)
516 {
517         int i;
518         for (i = 0; i < map_len * 8; i++)
519                 if (!test_bit(i, index_map)) {
520                          return i;
521                  }
522         CERROR("max index %d exceeded.\n", i);
523         return -1;
524 }
525
526 /* Return codes:
527         0  newly marked as in use
528         <0 err
529         +EALREADY for update of an old index */
530 static int mgs_set_index(const struct lu_env *env,
531                          struct mgs_device *mgs,
532                          struct mgs_target_info *mti)
533 {
534         struct fs_db *fsdb;
535         void *imap;
536         int rc = 0;
537         ENTRY;
538
539         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
540         if (rc) {
541                 CERROR("Can't get db for %s\n", mti->mti_fsname);
542                 RETURN(rc);
543         }
544
545         mutex_lock(&fsdb->fsdb_mutex);
546         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
547                 imap = fsdb->fsdb_ost_index_map;
548         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
549                 imap = fsdb->fsdb_mdt_index_map;
550         } else {
551                 GOTO(out_up, rc = -EINVAL);
552         }
553
554         if (mti->mti_flags & LDD_F_NEED_INDEX) {
555                 rc = next_index(imap, INDEX_MAP_SIZE);
556                 if (rc == -1)
557                         GOTO(out_up, rc = -ERANGE);
558                 mti->mti_stripe_index = rc;
559                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
560                         fsdb->fsdb_mdt_count ++;
561         }
562
563         /* the last index(0xffff) is reserved for default value. */
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
566                                    "but index must be less than %u.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8 - 1);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 rc = llog_write(env, llh, rec, rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_free, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_free, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712
713 out_free:
714         OBD_FREE_PTR(mml);
715
716 out_close:
717         llog_close(env, loghandle);
718 out_pop:
719         if (rc < 0)
720                 CERROR("%s: modify %s/%s failed: rc = %d\n",
721                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
722         llog_ctxt_put(ctxt);
723         RETURN(rc);
724 }
725
726 /** This structure is passed to mgs_replace_handler */
727 struct mgs_replace_uuid_lookup {
728         /* Nids are replaced for this target device */
729         struct mgs_target_info target;
730         /* Temporary modified llog */
731         struct llog_handle *temp_llh;
732         /* Flag is set if in target block*/
733         int in_target_device;
734         /* Nids already added. Just skip (multiple nids) */
735         int device_nids_added;
736         /* Flag is set if this block should not be copied */
737         int skip_it;
738 };
739
740 /**
741  * Check: a) if block should be skipped
742  * b) is it target block
743  *
744  * \param[in] lcfg
745  * \param[in] mrul
746  *
747  * \retval 0 should not to be skipped
748  * \retval 1 should to be skipped
749  */
750 static int check_markers(struct lustre_cfg *lcfg,
751                          struct mgs_replace_uuid_lookup *mrul)
752 {
753          struct cfg_marker *marker;
754
755         /* Track markers. Find given device */
756         if (lcfg->lcfg_command == LCFG_MARKER) {
757                 marker = lustre_cfg_buf(lcfg, 1);
758                 /* Clean llog from records marked as CM_EXCLUDE.
759                    CM_SKIP records are used for "active" command
760                    and can be restored if needed */
761                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
762                     (CM_EXCLUDE | CM_START)) {
763                         mrul->skip_it = 1;
764                         return 1;
765                 }
766
767                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
768                     (CM_EXCLUDE | CM_END)) {
769                         mrul->skip_it = 0;
770                         return 1;
771                 }
772
773                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
774                         LASSERT(!(marker->cm_flags & CM_START) ||
775                                 !(marker->cm_flags & CM_END));
776                         if (marker->cm_flags & CM_START) {
777                                 mrul->in_target_device = 1;
778                                 mrul->device_nids_added = 0;
779                         } else if (marker->cm_flags & CM_END)
780                                 mrul->in_target_device = 0;
781                 }
782         }
783
784         return 0;
785 }
786
787 static int record_base(const struct lu_env *env, struct llog_handle *llh,
788                      char *cfgname, lnet_nid_t nid, int cmd,
789                      char *s1, char *s2, char *s3, char *s4)
790 {
791         struct mgs_thread_info  *mgi = mgs_env_info(env);
792         struct llog_cfg_rec     *lcr;
793         int rc;
794
795         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
796                cmd, s1, s2, s3, s4);
797
798         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
799         if (s1)
800                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
801         if (s2)
802                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
803         if (s3)
804                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
805         if (s4)
806                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
807
808         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
809         if (lcr == NULL)
810                 return -ENOMEM;
811
812         lcr->lcr_cfg.lcfg_nid = nid;
813         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
814
815         lustre_cfg_rec_free(lcr);
816
817         if (rc < 0)
818                 CDEBUG(D_MGS,
819                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
820                        cfgname, cmd, s1, s2, s3, s4, rc);
821         return rc;
822 }
823
824 static inline int record_add_uuid(const struct lu_env *env,
825                                   struct llog_handle *llh,
826                                   uint64_t nid, char *uuid)
827 {
828         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
829                            NULL, NULL, NULL);
830 }
831
832 static inline int record_add_conn(const struct lu_env *env,
833                                   struct llog_handle *llh,
834                                   char *devname, char *uuid)
835 {
836         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
837                            NULL, NULL, NULL);
838 }
839
840 static inline int record_attach(const struct lu_env *env,
841                                 struct llog_handle *llh, char *devname,
842                                 char *type, char *uuid)
843 {
844         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
845                            NULL, NULL);
846 }
847
848 static inline int record_setup(const struct lu_env *env,
849                                struct llog_handle *llh, char *devname,
850                                char *s1, char *s2, char *s3, char *s4)
851 {
852         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
853 }
854
855 /**
856  * \retval <0 record processing error
857  * \retval n record is processed. No need copy original one.
858  * \retval 0 record is not processed.
859  */
860 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
861                            struct mgs_replace_uuid_lookup *mrul)
862 {
863         int nids_added = 0;
864         lnet_nid_t nid;
865         char *ptr;
866         int rc;
867
868         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
869                 /* LCFG_ADD_UUID command found. Let's skip original command
870                    and add passed nids */
871                 ptr = mrul->target.mti_params;
872                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
873                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
874                                "device %s\n", libcfs_nid2str(nid),
875                                 mrul->target.mti_params,
876                                 mrul->target.mti_svname);
877                         rc = record_add_uuid(env,
878                                              mrul->temp_llh, nid,
879                                              mrul->target.mti_params);
880                         if (!rc)
881                                 nids_added++;
882                 }
883
884                 if (nids_added == 0) {
885                         CERROR("No new nids were added, nid %s with uuid %s, "
886                                "device %s\n", libcfs_nid2str(nid),
887                                mrul->target.mti_params,
888                                mrul->target.mti_svname);
889                         RETURN(-ENXIO);
890                 } else {
891                         mrul->device_nids_added = 1;
892                 }
893
894                 return nids_added;
895         }
896
897         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
898                 /* LCFG_SETUP command found. UUID should be changed */
899                 rc = record_setup(env,
900                                   mrul->temp_llh,
901                                   /* devname the same */
902                                   lustre_cfg_string(lcfg, 0),
903                                   /* s1 is not changed */
904                                   lustre_cfg_string(lcfg, 1),
905                                   /* new uuid should be
906                                   the full nidlist */
907                                   mrul->target.mti_params,
908                                   /* s3 is not changed */
909                                   lustre_cfg_string(lcfg, 3),
910                                   /* s4 is not changed */
911                                   lustre_cfg_string(lcfg, 4));
912                 return rc ? rc : 1;
913         }
914
915         /* Another commands in target device block */
916         return 0;
917 }
918
919 /**
920  * Handler that called for every record in llog.
921  * Records are processed in order they placed in llog.
922  *
923  * \param[in] llh       log to be processed
924  * \param[in] rec       current record
925  * \param[in] data      mgs_replace_uuid_lookup structure
926  *
927  * \retval 0    success
928  */
929 static int mgs_replace_handler(const struct lu_env *env,
930                                struct llog_handle *llh,
931                                struct llog_rec_hdr *rec,
932                                void *data)
933 {
934         struct mgs_replace_uuid_lookup *mrul;
935         struct lustre_cfg *lcfg = REC_DATA(rec);
936         int cfg_len = REC_DATA_LEN(rec);
937         int rc;
938         ENTRY;
939
940         mrul = (struct mgs_replace_uuid_lookup *)data;
941
942         if (rec->lrh_type != OBD_CFG_REC) {
943                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
944                        rec->lrh_type, lcfg->lcfg_command,
945                        lustre_cfg_string(lcfg, 0),
946                        lustre_cfg_string(lcfg, 1));
947                 RETURN(-EINVAL);
948         }
949
950         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
951         if (rc) {
952                 /* Do not copy any invalidated records */
953                 GOTO(skip_out, rc = 0);
954         }
955
956         rc = check_markers(lcfg, mrul);
957         if (rc || mrul->skip_it)
958                 GOTO(skip_out, rc = 0);
959
960         /* Write to new log all commands outside target device block */
961         if (!mrul->in_target_device)
962                 GOTO(copy_out, rc = 0);
963
964         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
965            (failover nids) for this target, assuming that if then
966            primary is changing then so is the failover */
967         if (mrul->device_nids_added &&
968             (lcfg->lcfg_command == LCFG_ADD_UUID ||
969              lcfg->lcfg_command == LCFG_ADD_CONN))
970                 GOTO(skip_out, rc = 0);
971
972         rc = process_command(env, lcfg, mrul);
973         if (rc < 0)
974                 RETURN(rc);
975
976         if (rc)
977                 RETURN(0);
978 copy_out:
979         /* Record is placed in temporary llog as is */
980         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
981
982         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
983                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
984                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
985         RETURN(rc);
986
987 skip_out:
988         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
989                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
990                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
991         RETURN(rc);
992 }
993
994 static int mgs_log_is_empty(const struct lu_env *env,
995                             struct mgs_device *mgs, char *name)
996 {
997         struct llog_ctxt        *ctxt;
998         int                      rc;
999
1000         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1001         LASSERT(ctxt != NULL);
1002
1003         rc = llog_is_empty(env, ctxt, name);
1004         llog_ctxt_put(ctxt);
1005         return rc;
1006 }
1007
1008 static int mgs_replace_nids_log(const struct lu_env *env,
1009                                 struct obd_device *mgs, struct fs_db *fsdb,
1010                                 char *logname, char *devname, char *nids)
1011 {
1012         struct llog_handle *orig_llh, *backup_llh;
1013         struct llog_ctxt *ctxt;
1014         struct mgs_replace_uuid_lookup *mrul;
1015         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1016         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1017         char *backup;
1018         int rc, rc2;
1019         ENTRY;
1020
1021         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1022
1023         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         LASSERT(ctxt != NULL);
1025
1026         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1027                 /* Log is empty. Nothing to replace */
1028                 GOTO(out_put, rc = 0);
1029         }
1030
1031         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1032         if (backup == NULL)
1033                 GOTO(out_put, rc = -ENOMEM);
1034
1035         sprintf(backup, "%s.bak", logname);
1036
1037         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1038         if (rc == 0) {
1039                 /* Now erase original log file. Connections are not allowed.
1040                    Backup is already saved */
1041                 rc = llog_erase(env, ctxt, NULL, logname);
1042                 if (rc < 0)
1043                         GOTO(out_free, rc);
1044         } else if (rc != -ENOENT) {
1045                 CERROR("%s: can't make backup for %s: rc = %d\n",
1046                        mgs->obd_name, logname, rc);
1047                 GOTO(out_free,rc);
1048         }
1049
1050         /* open local log */
1051         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1052         if (rc)
1053                 GOTO(out_restore, rc);
1054
1055         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1056         if (rc)
1057                 GOTO(out_closel, rc);
1058
1059         /* open backup llog */
1060         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1061                        LLOG_OPEN_EXISTS);
1062         if (rc)
1063                 GOTO(out_closel, rc);
1064
1065         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1066         if (rc)
1067                 GOTO(out_close, rc);
1068
1069         if (llog_get_size(backup_llh) <= 1)
1070                 GOTO(out_close, rc = 0);
1071
1072         OBD_ALLOC_PTR(mrul);
1073         if (!mrul)
1074                 GOTO(out_close, rc = -ENOMEM);
1075         /* devname is only needed information to replace UUID records */
1076         strlcpy(mrul->target.mti_svname, devname,
1077                 sizeof(mrul->target.mti_svname));
1078         /* parse nids later */
1079         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1080         /* Copy records to this temporary llog */
1081         mrul->temp_llh = orig_llh;
1082
1083         rc = llog_process(env, backup_llh, mgs_replace_handler,
1084                           (void *)mrul, NULL);
1085         OBD_FREE_PTR(mrul);
1086 out_close:
1087         rc2 = llog_close(NULL, backup_llh);
1088         if (!rc)
1089                 rc = rc2;
1090 out_closel:
1091         rc2 = llog_close(NULL, orig_llh);
1092         if (!rc)
1093                 rc = rc2;
1094
1095 out_restore:
1096         if (rc) {
1097                 CERROR("%s: llog should be restored: rc = %d\n",
1098                        mgs->obd_name, rc);
1099                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1100                                   logname);
1101                 if (rc2 < 0)
1102                         CERROR("%s: can't restore backup %s: rc = %d\n",
1103                                mgs->obd_name, logname, rc2);
1104         }
1105
1106 out_free:
1107         OBD_FREE(backup, strlen(backup) + 1);
1108
1109 out_put:
1110         llog_ctxt_put(ctxt);
1111
1112         if (rc)
1113                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1114                        mgs->obd_name, logname, rc);
1115
1116         RETURN(rc);
1117 }
1118
1119 /**
1120  * Parse device name and get file system name and/or device index
1121  *
1122  * \param[in]   devname device name (ex. lustre-MDT0000)
1123  * \param[out]  fsname  file system name(optional)
1124  * \param[out]  index   device index(optional)
1125  *
1126  * \retval 0    success
1127  */
1128 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1129 {
1130         int rc;
1131         ENTRY;
1132
1133         /* Extract fsname */
1134         if (fsname) {
1135                 rc = server_name2fsname(devname, fsname, NULL);
1136                 if (rc < 0) {
1137                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1138                                devname);
1139                         RETURN(-EINVAL);
1140                 }
1141         }
1142
1143         if (index) {
1144                 rc = server_name2index(devname, index, NULL);
1145                 if (rc < 0) {
1146                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1147                                devname);
1148                         RETURN(-EINVAL);
1149                 }
1150         }
1151
1152         RETURN(0);
1153 }
1154
1155 /* This is only called during replace_nids */
1156 static int only_mgs_is_running(struct obd_device *mgs_obd)
1157 {
1158         /* TDB: Is global variable with devices count exists? */
1159         int num_devices = get_devices_count();
1160         int num_exports = 0;
1161         struct obd_export *exp;
1162
1163         spin_lock(&mgs_obd->obd_dev_lock);
1164         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1165                 /* skip self export */
1166                 if (exp == mgs_obd->obd_self_export)
1167                         continue;
1168                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1169                         continue;
1170
1171                 ++num_exports;
1172
1173                 CERROR("%s: node %s still connected during replace_nids "
1174                        "connect_flags:%llx\n",
1175                        mgs_obd->obd_name,
1176                        libcfs_nid2str(exp->exp_nid_stats->nid),
1177                        exp_connect_flags(exp));
1178
1179         }
1180         spin_unlock(&mgs_obd->obd_dev_lock);
1181
1182         /* osd, MGS and MGC + self_export
1183            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1184         return (num_devices <= 3) && (num_exports == 0);
1185 }
1186
1187 static int name_create_mdt(char **logname, char *fsname, int i)
1188 {
1189         char mdt_index[9];
1190
1191         sprintf(mdt_index, "-MDT%04x", i);
1192         return name_create(logname, fsname, mdt_index);
1193 }
1194
1195 /**
1196  * Replace nids for \a device to \a nids values
1197  *
1198  * \param obd           MGS obd device
1199  * \param devname       nids need to be replaced for this device
1200  * (ex. lustre-OST0000)
1201  * \param nids          nids list (ex. nid1,nid2,nid3)
1202  *
1203  * \retval 0    success
1204  */
1205 int mgs_replace_nids(const struct lu_env *env,
1206                      struct mgs_device *mgs,
1207                      char *devname, char *nids)
1208 {
1209         /* Assume fsname is part of device name */
1210         char fsname[MTI_NAME_MAXLEN];
1211         int rc;
1212         __u32 index;
1213         char *logname;
1214         struct fs_db *fsdb;
1215         unsigned int i;
1216         int conn_state;
1217         struct obd_device *mgs_obd = mgs->mgs_obd;
1218         ENTRY;
1219
1220         /* We can only change NIDs if no other nodes are connected */
1221         spin_lock(&mgs_obd->obd_dev_lock);
1222         conn_state = mgs_obd->obd_no_conn;
1223         mgs_obd->obd_no_conn = 1;
1224         spin_unlock(&mgs_obd->obd_dev_lock);
1225
1226         /* We can not change nids if not only MGS is started */
1227         if (!only_mgs_is_running(mgs_obd)) {
1228                 CERROR("Only MGS is allowed to be started\n");
1229                 GOTO(out, rc = -EINPROGRESS);
1230         }
1231
1232         /* Get fsname and index*/
1233         rc = mgs_parse_devname(devname, fsname, &index);
1234         if (rc)
1235                 GOTO(out, rc);
1236
1237         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1238         if (rc) {
1239                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1240                 GOTO(out, rc);
1241         }
1242
1243         /* Process client llogs */
1244         name_create(&logname, fsname, "-client");
1245         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1246         name_destroy(&logname);
1247         if (rc) {
1248                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1249                        fsname, devname, rc);
1250                 GOTO(out, rc);
1251         }
1252
1253         /* Process MDT llogs */
1254         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1255                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1256                         continue;
1257                 name_create_mdt(&logname, fsname, i);
1258                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1259                 name_destroy(&logname);
1260                 if (rc)
1261                         GOTO(out, rc);
1262         }
1263
1264 out:
1265         spin_lock(&mgs_obd->obd_dev_lock);
1266         mgs_obd->obd_no_conn = conn_state;
1267         spin_unlock(&mgs_obd->obd_dev_lock);
1268
1269         RETURN(rc);
1270 }
1271
1272 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1273                             char *devname, struct lov_desc *desc)
1274 {
1275         struct mgs_thread_info  *mgi = mgs_env_info(env);
1276         struct llog_cfg_rec     *lcr;
1277         int rc;
1278
1279         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1280         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1281         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1282         if (lcr == NULL)
1283                 return -ENOMEM;
1284
1285         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1286         lustre_cfg_rec_free(lcr);
1287         return rc;
1288 }
1289
1290 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1291                             char *devname, struct lmv_desc *desc)
1292 {
1293         struct mgs_thread_info  *mgi = mgs_env_info(env);
1294         struct llog_cfg_rec     *lcr;
1295         int rc;
1296
1297         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1298         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1299         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1300         if (lcr == NULL)
1301                 return -ENOMEM;
1302
1303         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1304         lustre_cfg_rec_free(lcr);
1305         return rc;
1306 }
1307
1308 static inline int record_mdc_add(const struct lu_env *env,
1309                                  struct llog_handle *llh,
1310                                  char *logname, char *mdcuuid,
1311                                  char *mdtuuid, char *index,
1312                                  char *gen)
1313 {
1314         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1315                            mdtuuid,index,gen,mdcuuid);
1316 }
1317
1318 static inline int record_lov_add(const struct lu_env *env,
1319                                  struct llog_handle *llh,
1320                                  char *lov_name, char *ost_uuid,
1321                                  char *index, char *gen)
1322 {
1323         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1324                            ost_uuid, index, gen, NULL);
1325 }
1326
1327 static inline int record_mount_opt(const struct lu_env *env,
1328                                    struct llog_handle *llh,
1329                                    char *profile, char *lov_name,
1330                                    char *mdc_name)
1331 {
1332         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1333                            profile, lov_name, mdc_name, NULL);
1334 }
1335
1336 static int record_marker(const struct lu_env *env,
1337                          struct llog_handle *llh,
1338                          struct fs_db *fsdb, __u32 flags,
1339                          char *tgtname, char *comment)
1340 {
1341         struct mgs_thread_info *mgi = mgs_env_info(env);
1342         struct llog_cfg_rec *lcr;
1343         int rc;
1344         int cplen = 0;
1345
1346         if (flags & CM_START)
1347                 fsdb->fsdb_gen++;
1348         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1349         mgi->mgi_marker.cm_flags = flags;
1350         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1351         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1352                         sizeof(mgi->mgi_marker.cm_tgtname));
1353         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1354                 return -E2BIG;
1355         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1356                         sizeof(mgi->mgi_marker.cm_comment));
1357         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1358                 return -E2BIG;
1359         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1360         mgi->mgi_marker.cm_canceltime = 0;
1361         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1362         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1363                             sizeof(mgi->mgi_marker));
1364         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1365         if (lcr == NULL)
1366                 return -ENOMEM;
1367
1368         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1369         lustre_cfg_rec_free(lcr);
1370         return rc;
1371 }
1372
1373 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1374                             struct llog_handle **llh, char *name)
1375 {
1376         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1377         struct llog_ctxt        *ctxt;
1378         int                      rc = 0;
1379         ENTRY;
1380
1381         if (*llh)
1382                 GOTO(out, rc = -EBUSY);
1383
1384         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1385         if (!ctxt)
1386                 GOTO(out, rc = -ENODEV);
1387         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1388
1389         rc = llog_open_create(env, ctxt, llh, NULL, name);
1390         if (rc)
1391                 GOTO(out_ctxt, rc);
1392         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1393         if (rc)
1394                 llog_close(env, *llh);
1395 out_ctxt:
1396         llog_ctxt_put(ctxt);
1397 out:
1398         if (rc) {
1399                 CERROR("%s: can't start log %s: rc = %d\n",
1400                        mgs->mgs_obd->obd_name, name, rc);
1401                 *llh = NULL;
1402         }
1403         RETURN(rc);
1404 }
1405
1406 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1407 {
1408         int rc;
1409
1410         rc = llog_close(env, *llh);
1411         *llh = NULL;
1412
1413         return rc;
1414 }
1415
1416 /******************** config "macros" *********************/
1417
1418 /* write an lcfg directly into a log (with markers) */
1419 static int mgs_write_log_direct(const struct lu_env *env,
1420                                 struct mgs_device *mgs, struct fs_db *fsdb,
1421                                 char *logname, struct llog_cfg_rec *lcr,
1422                                 char *devname, char *comment)
1423 {
1424         struct llog_handle *llh = NULL;
1425         int rc;
1426
1427         ENTRY;
1428
1429         rc = record_start_log(env, mgs, &llh, logname);
1430         if (rc)
1431                 RETURN(rc);
1432
1433         /* FIXME These should be a single journal transaction */
1434         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1435         if (rc)
1436                 GOTO(out_end, rc);
1437         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1438         if (rc)
1439                 GOTO(out_end, rc);
1440         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1441         if (rc)
1442                 GOTO(out_end, rc);
1443 out_end:
1444         record_end_log(env, &llh);
1445         RETURN(rc);
1446 }
1447
1448 /* write the lcfg in all logs for the given fs */
1449 static int mgs_write_log_direct_all(const struct lu_env *env,
1450                                     struct mgs_device *mgs,
1451                                     struct fs_db *fsdb,
1452                                     struct mgs_target_info *mti,
1453                                     struct llog_cfg_rec *lcr, char *devname,
1454                                     char *comment, int server_only)
1455 {
1456         struct list_head         log_list;
1457         struct mgs_direntry     *dirent, *n;
1458         char                    *fsname = mti->mti_fsname;
1459         int                      rc = 0, len = strlen(fsname);
1460
1461         ENTRY;
1462         /* Find all the logs in the CONFIGS directory */
1463         rc = class_dentry_readdir(env, mgs, &log_list);
1464         if (rc)
1465                 RETURN(rc);
1466
1467         /* Could use fsdb index maps instead of directory listing */
1468         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1469                 list_del_init(&dirent->mde_list);
1470                 /* don't write to sptlrpc rule log */
1471                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1472                         goto next;
1473
1474                 /* caller wants write server logs only */
1475                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1476                         goto next;
1477
1478                 if (strlen(dirent->mde_name) <= len ||
1479                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1480                     dirent->mde_name[len] != '-')
1481                         goto next;
1482
1483                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1484                 /* Erase any old settings of this same parameter */
1485                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1486                                 devname, comment, CM_SKIP);
1487                 if (rc < 0)
1488                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1489                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1490                 if (lcr == NULL)
1491                         goto next;
1492                 /* Write the new one */
1493                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1494                                           lcr, devname, comment);
1495                 if (rc != 0)
1496                         CERROR("%s: writing log %s: rc = %d\n",
1497                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1498 next:
1499                 mgs_direntry_free(dirent);
1500         }
1501
1502         RETURN(rc);
1503 }
1504
1505 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1506                                     struct mgs_device *mgs,
1507                                     struct fs_db *fsdb,
1508                                     struct mgs_target_info *mti,
1509                                     int index, char *logname);
1510 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1511                                     struct mgs_device *mgs,
1512                                     struct fs_db *fsdb,
1513                                     struct mgs_target_info *mti,
1514                                     char *logname, char *suffix, char *lovname,
1515                                     enum lustre_sec_part sec_part, int flags);
1516 static int name_create_mdt_and_lov(char **logname, char **lovname,
1517                                    struct fs_db *fsdb, int i);
1518
1519 static int add_param(char *params, char *key, char *val)
1520 {
1521         char *start = params + strlen(params);
1522         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1523         int keylen = 0;
1524
1525         if (key != NULL)
1526                 keylen = strlen(key);
1527         if (start + 1 + keylen + strlen(val) >= end) {
1528                 CERROR("params are too long: %s %s%s\n",
1529                        params, key != NULL ? key : "", val);
1530                 return -EINVAL;
1531         }
1532
1533         sprintf(start, " %s%s", key != NULL ? key : "", val);
1534         return 0;
1535 }
1536
1537 /**
1538  * Walk through client config log record and convert the related records
1539  * into the target.
1540  **/
1541 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1542                                          struct llog_handle *llh,
1543                                          struct llog_rec_hdr *rec, void *data)
1544 {
1545         struct mgs_device *mgs;
1546         struct obd_device *obd;
1547         struct mgs_target_info *mti, *tmti;
1548         struct fs_db *fsdb;
1549         int cfg_len = rec->lrh_len;
1550         char *cfg_buf = (char*) (rec + 1);
1551         struct lustre_cfg *lcfg;
1552         int rc = 0;
1553         struct llog_handle *mdt_llh = NULL;
1554         static int got_an_osc_or_mdc = 0;
1555         /* 0: not found any osc/mdc;
1556            1: found osc;
1557            2: found mdc;
1558         */
1559         static int last_step = -1;
1560         int cplen = 0;
1561
1562         ENTRY;
1563
1564         mti = ((struct temp_comp*)data)->comp_mti;
1565         tmti = ((struct temp_comp*)data)->comp_tmti;
1566         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1567         obd = ((struct temp_comp *)data)->comp_obd;
1568         mgs = lu2mgs_dev(obd->obd_lu_dev);
1569         LASSERT(mgs);
1570
1571         if (rec->lrh_type != OBD_CFG_REC) {
1572                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1573                 RETURN(-EINVAL);
1574         }
1575
1576         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1577         if (rc) {
1578                 CERROR("Insane cfg\n");
1579                 RETURN(rc);
1580         }
1581
1582         lcfg = (struct lustre_cfg *)cfg_buf;
1583
1584         if (lcfg->lcfg_command == LCFG_MARKER) {
1585                 struct cfg_marker *marker;
1586                 marker = lustre_cfg_buf(lcfg, 1);
1587                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1588                     (marker->cm_flags & CM_START) &&
1589                      !(marker->cm_flags & CM_SKIP)) {
1590                         got_an_osc_or_mdc = 1;
1591                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1592                                         sizeof(tmti->mti_svname));
1593                         if (cplen >= sizeof(tmti->mti_svname))
1594                                 RETURN(-E2BIG);
1595                         rc = record_start_log(env, mgs, &mdt_llh,
1596                                               mti->mti_svname);
1597                         if (rc)
1598                                 RETURN(rc);
1599                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1600                                            mti->mti_svname, "add osc(copied)");
1601                         record_end_log(env, &mdt_llh);
1602                         last_step = marker->cm_step;
1603                         RETURN(rc);
1604                 }
1605                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1606                     (marker->cm_flags & CM_END) &&
1607                      !(marker->cm_flags & CM_SKIP)) {
1608                         LASSERT(last_step == marker->cm_step);
1609                         last_step = -1;
1610                         got_an_osc_or_mdc = 0;
1611                         memset(tmti, 0, sizeof(*tmti));
1612                         rc = record_start_log(env, mgs, &mdt_llh,
1613                                               mti->mti_svname);
1614                         if (rc)
1615                                 RETURN(rc);
1616                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1617                                            mti->mti_svname, "add osc(copied)");
1618                         record_end_log(env, &mdt_llh);
1619                         RETURN(rc);
1620                 }
1621                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1622                     (marker->cm_flags & CM_START) &&
1623                      !(marker->cm_flags & CM_SKIP)) {
1624                         got_an_osc_or_mdc = 2;
1625                         last_step = marker->cm_step;
1626                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1627                                strlen(marker->cm_tgtname));
1628
1629                         RETURN(rc);
1630                 }
1631                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1632                     (marker->cm_flags & CM_END) &&
1633                      !(marker->cm_flags & CM_SKIP)) {
1634                         LASSERT(last_step == marker->cm_step);
1635                         last_step = -1;
1636                         got_an_osc_or_mdc = 0;
1637                         memset(tmti, 0, sizeof(*tmti));
1638                         RETURN(rc);
1639                 }
1640         }
1641
1642         if (got_an_osc_or_mdc == 0 || last_step < 0)
1643                 RETURN(rc);
1644
1645         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1646                 __u64 nodenid = lcfg->lcfg_nid;
1647
1648                 if (strlen(tmti->mti_uuid) == 0) {
1649                         /* target uuid not set, this config record is before
1650                          * LCFG_SETUP, this nid is one of target node nid.
1651                          */
1652                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1653                         tmti->mti_nid_count++;
1654                 } else {
1655                         char nidstr[LNET_NIDSTR_SIZE];
1656
1657                         /* failover node nid */
1658                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1659                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1660                                         nidstr);
1661                 }
1662
1663                 RETURN(rc);
1664         }
1665
1666         if (lcfg->lcfg_command == LCFG_SETUP) {
1667                 char *target;
1668
1669                 target = lustre_cfg_string(lcfg, 1);
1670                 memcpy(tmti->mti_uuid, target, strlen(target));
1671                 RETURN(rc);
1672         }
1673
1674         /* ignore client side sptlrpc_conf_log */
1675         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1676                 RETURN(rc);
1677
1678         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1679                 int index;
1680
1681                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1682                         RETURN (-EINVAL);
1683
1684                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1685                        strlen(mti->mti_fsname));
1686                 tmti->mti_stripe_index = index;
1687
1688                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1689                                               mti->mti_stripe_index,
1690                                               mti->mti_svname);
1691                 memset(tmti, 0, sizeof(*tmti));
1692                 RETURN(rc);
1693         }
1694
1695         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1696                 int index;
1697                 char mdt_index[9];
1698                 char *logname, *lovname;
1699
1700                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1701                                              mti->mti_stripe_index);
1702                 if (rc)
1703                         RETURN(rc);
1704                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1705
1706                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1707                         name_destroy(&logname);
1708                         name_destroy(&lovname);
1709                         RETURN(-EINVAL);
1710                 }
1711
1712                 tmti->mti_stripe_index = index;
1713                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1714                                          mdt_index, lovname,
1715                                          LUSTRE_SP_MDT, 0);
1716                 name_destroy(&logname);
1717                 name_destroy(&lovname);
1718                 RETURN(rc);
1719         }
1720         RETURN(rc);
1721 }
1722
1723 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1724 /* stealed from mgs_get_fsdb_from_llog*/
1725 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1726                                               struct mgs_device *mgs,
1727                                               char *client_name,
1728                                               struct temp_comp* comp)
1729 {
1730         struct llog_handle *loghandle;
1731         struct mgs_target_info *tmti;
1732         struct llog_ctxt *ctxt;
1733         int rc;
1734
1735         ENTRY;
1736
1737         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1738         LASSERT(ctxt != NULL);
1739
1740         OBD_ALLOC_PTR(tmti);
1741         if (tmti == NULL)
1742                 GOTO(out_ctxt, rc = -ENOMEM);
1743
1744         comp->comp_tmti = tmti;
1745         comp->comp_obd = mgs->mgs_obd;
1746
1747         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1748                        LLOG_OPEN_EXISTS);
1749         if (rc < 0) {
1750                 if (rc == -ENOENT)
1751                         rc = 0;
1752                 GOTO(out_pop, rc);
1753         }
1754
1755         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1756         if (rc)
1757                 GOTO(out_close, rc);
1758
1759         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1760                                   (void *)comp, NULL, false);
1761         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1762 out_close:
1763         llog_close(env, loghandle);
1764 out_pop:
1765         OBD_FREE_PTR(tmti);
1766 out_ctxt:
1767         llog_ctxt_put(ctxt);
1768         RETURN(rc);
1769 }
1770
1771 /* lmv is the second thing for client logs */
1772 /* copied from mgs_write_log_lov. Please refer to that.  */
1773 static int mgs_write_log_lmv(const struct lu_env *env,
1774                              struct mgs_device *mgs,
1775                              struct fs_db *fsdb,
1776                              struct mgs_target_info *mti,
1777                              char *logname, char *lmvname)
1778 {
1779         struct llog_handle *llh = NULL;
1780         struct lmv_desc *lmvdesc;
1781         char *uuid;
1782         int rc = 0;
1783         ENTRY;
1784
1785         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1786
1787         OBD_ALLOC_PTR(lmvdesc);
1788         if (lmvdesc == NULL)
1789                 RETURN(-ENOMEM);
1790         lmvdesc->ld_active_tgt_count = 0;
1791         lmvdesc->ld_tgt_count = 0;
1792         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1793         uuid = (char *)lmvdesc->ld_uuid.uuid;
1794
1795         rc = record_start_log(env, mgs, &llh, logname);
1796         if (rc)
1797                 GOTO(out_free, rc);
1798         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1799         if (rc)
1800                 GOTO(out_end, rc);
1801         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1802         if (rc)
1803                 GOTO(out_end, rc);
1804         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1805         if (rc)
1806                 GOTO(out_end, rc);
1807         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1808         if (rc)
1809                 GOTO(out_end, rc);
1810 out_end:
1811         record_end_log(env, &llh);
1812 out_free:
1813         OBD_FREE_PTR(lmvdesc);
1814         RETURN(rc);
1815 }
1816
1817 /* lov is the first thing in the mdt and client logs */
1818 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1819                              struct fs_db *fsdb, struct mgs_target_info *mti,
1820                              char *logname, char *lovname)
1821 {
1822         struct llog_handle *llh = NULL;
1823         struct lov_desc *lovdesc;
1824         char *uuid;
1825         int rc = 0;
1826         ENTRY;
1827
1828         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1829
1830         /*
1831         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1832         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1833               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1834         */
1835
1836         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1837         OBD_ALLOC_PTR(lovdesc);
1838         if (lovdesc == NULL)
1839                 RETURN(-ENOMEM);
1840         lovdesc->ld_magic = LOV_DESC_MAGIC;
1841         lovdesc->ld_tgt_count = 0;
1842         /* Defaults.  Can be changed later by lcfg config_param */
1843         lovdesc->ld_default_stripe_count = 1;
1844         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1845         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1846         lovdesc->ld_default_stripe_offset = -1;
1847         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1848         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1849         /* can these be the same? */
1850         uuid = (char *)lovdesc->ld_uuid.uuid;
1851
1852         /* This should always be the first entry in a log.
1853         rc = mgs_clear_log(obd, logname); */
1854         rc = record_start_log(env, mgs, &llh, logname);
1855         if (rc)
1856                 GOTO(out_free, rc);
1857         /* FIXME these should be a single journal transaction */
1858         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1859         if (rc)
1860                 GOTO(out_end, rc);
1861         rc = record_attach(env, llh, lovname, "lov", uuid);
1862         if (rc)
1863                 GOTO(out_end, rc);
1864         rc = record_lov_setup(env, llh, lovname, lovdesc);
1865         if (rc)
1866                 GOTO(out_end, rc);
1867         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1868         if (rc)
1869                 GOTO(out_end, rc);
1870         EXIT;
1871 out_end:
1872         record_end_log(env, &llh);
1873 out_free:
1874         OBD_FREE_PTR(lovdesc);
1875         return rc;
1876 }
1877
1878 /* add failnids to open log */
1879 static int mgs_write_log_failnids(const struct lu_env *env,
1880                                   struct mgs_target_info *mti,
1881                                   struct llog_handle *llh,
1882                                   char *cliname)
1883 {
1884         char *failnodeuuid = NULL;
1885         char *ptr = mti->mti_params;
1886         lnet_nid_t nid;
1887         int rc = 0;
1888
1889         /*
1890         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1891         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1892         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1893         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1894         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1895         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1896         */
1897
1898         /*
1899          * Pull failnid info out of params string, which may contain something
1900          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1901          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1902          * etc.  However, convert_hostnames() should have caught those.
1903          */
1904         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1905                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1906                         char nidstr[LNET_NIDSTR_SIZE];
1907
1908                         if (failnodeuuid == NULL) {
1909                                 /* We don't know the failover node name,
1910                                  * so just use the first nid as the uuid */
1911                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1912                                 rc = name_create(&failnodeuuid, nidstr, "");
1913                                 if (rc != 0)
1914                                         return rc;
1915                         }
1916                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1917                                 "client %s\n",
1918                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1919                                 failnodeuuid, cliname);
1920                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1921                         /*
1922                          * If *ptr is ':', we have added all NIDs for
1923                          * failnodeuuid.
1924                          */
1925                         if (*ptr == ':') {
1926                                 rc = record_add_conn(env, llh, cliname,
1927                                                      failnodeuuid);
1928                                 name_destroy(&failnodeuuid);
1929                                 failnodeuuid = NULL;
1930                         }
1931                 }
1932                 if (failnodeuuid) {
1933                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1934                         name_destroy(&failnodeuuid);
1935                         failnodeuuid = NULL;
1936                 }
1937         }
1938
1939         return rc;
1940 }
1941
1942 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1943                                     struct mgs_device *mgs,
1944                                     struct fs_db *fsdb,
1945                                     struct mgs_target_info *mti,
1946                                     char *logname, char *lmvname)
1947 {
1948         struct llog_handle *llh = NULL;
1949         char *mdcname = NULL;
1950         char *nodeuuid = NULL;
1951         char *mdcuuid = NULL;
1952         char *lmvuuid = NULL;
1953         char index[6];
1954         char nidstr[LNET_NIDSTR_SIZE];
1955         int i, rc;
1956         ENTRY;
1957
1958         if (mgs_log_is_empty(env, mgs, logname)) {
1959                 CERROR("log is empty! Logical error\n");
1960                 RETURN(-EINVAL);
1961         }
1962
1963         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1964                mti->mti_svname, logname, lmvname);
1965
1966         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1967         rc = name_create(&nodeuuid, nidstr, "");
1968         if (rc)
1969                 RETURN(rc);
1970         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1971         if (rc)
1972                 GOTO(out_free, rc);
1973         rc = name_create(&mdcuuid, mdcname, "_UUID");
1974         if (rc)
1975                 GOTO(out_free, rc);
1976         rc = name_create(&lmvuuid, lmvname, "_UUID");
1977         if (rc)
1978                 GOTO(out_free, rc);
1979
1980         rc = record_start_log(env, mgs, &llh, logname);
1981         if (rc)
1982                 GOTO(out_free, rc);
1983         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1984                            "add mdc");
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         for (i = 0; i < mti->mti_nid_count; i++) {
1988                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1989                         libcfs_nid2str_r(mti->mti_nids[i],
1990                                          nidstr, sizeof(nidstr)));
1991
1992                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1993                 if (rc)
1994                         GOTO(out_end, rc);
1995         }
1996
1997         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1998         if (rc)
1999                 GOTO(out_end, rc);
2000         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2001                           NULL, NULL);
2002         if (rc)
2003                 GOTO(out_end, rc);
2004         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2005         if (rc)
2006                 GOTO(out_end, rc);
2007         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2008         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2009                             index, "1");
2010         if (rc)
2011                 GOTO(out_end, rc);
2012         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2013                            "add mdc");
2014         if (rc)
2015                 GOTO(out_end, rc);
2016 out_end:
2017         record_end_log(env, &llh);
2018 out_free:
2019         name_destroy(&lmvuuid);
2020         name_destroy(&mdcuuid);
2021         name_destroy(&mdcname);
2022         name_destroy(&nodeuuid);
2023         RETURN(rc);
2024 }
2025
2026 static inline int name_create_lov(char **lovname, char *mdtname,
2027                                   struct fs_db *fsdb, int index)
2028 {
2029         /* COMPAT_180 */
2030         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2031                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2032         else
2033                 return name_create(lovname, mdtname, "-mdtlov");
2034 }
2035
2036 static int name_create_mdt_and_lov(char **logname, char **lovname,
2037                                    struct fs_db *fsdb, int i)
2038 {
2039         int rc;
2040
2041         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2042         if (rc)
2043                 return rc;
2044         /* COMPAT_180 */
2045         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2046                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2047         else
2048                 rc = name_create(lovname, *logname, "-mdtlov");
2049         if (rc) {
2050                 name_destroy(logname);
2051                 *logname = NULL;
2052         }
2053         return rc;
2054 }
2055
2056 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2057                                       struct fs_db *fsdb, int i)
2058 {
2059         char suffix[16];
2060
2061         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2062                 sprintf(suffix, "-osc");
2063         else
2064                 sprintf(suffix, "-osc-MDT%04x", i);
2065         return name_create(oscname, ostname, suffix);
2066 }
2067
2068 /* add new mdc to already existent MDS */
2069 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2070                                     struct mgs_device *mgs,
2071                                     struct fs_db *fsdb,
2072                                     struct mgs_target_info *mti,
2073                                     int mdt_index, char *logname)
2074 {
2075         struct llog_handle      *llh = NULL;
2076         char    *nodeuuid = NULL;
2077         char    *ospname = NULL;
2078         char    *lovuuid = NULL;
2079         char    *mdtuuid = NULL;
2080         char    *svname = NULL;
2081         char    *mdtname = NULL;
2082         char    *lovname = NULL;
2083         char    index_str[16];
2084         char    nidstr[LNET_NIDSTR_SIZE];
2085         int     i, rc;
2086
2087         ENTRY;
2088         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2089                 CERROR("log is empty! Logical error\n");
2090                 RETURN (-EINVAL);
2091         }
2092
2093         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2094                logname);
2095
2096         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2097         if (rc)
2098                 RETURN(rc);
2099
2100         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2101         rc = name_create(&nodeuuid, nidstr, "");
2102         if (rc)
2103                 GOTO(out_destory, rc);
2104
2105         rc = name_create(&svname, mdtname, "-osp");
2106         if (rc)
2107                 GOTO(out_destory, rc);
2108
2109         sprintf(index_str, "-MDT%04x", mdt_index);
2110         rc = name_create(&ospname, svname, index_str);
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2115         if (rc)
2116                 GOTO(out_destory, rc);
2117
2118         rc = name_create(&lovuuid, lovname, "_UUID");
2119         if (rc)
2120                 GOTO(out_destory, rc);
2121
2122         rc = name_create(&mdtuuid, mdtname, "_UUID");
2123         if (rc)
2124                 GOTO(out_destory, rc);
2125
2126         rc = record_start_log(env, mgs, &llh, logname);
2127         if (rc)
2128                 GOTO(out_destory, rc);
2129
2130         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2131                            "add osp");
2132         if (rc)
2133                 GOTO(out_destory, rc);
2134
2135         for (i = 0; i < mti->mti_nid_count; i++) {
2136                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2137                         libcfs_nid2str_r(mti->mti_nids[i],
2138                                          nidstr, sizeof(nidstr)));
2139                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2140                 if (rc)
2141                         GOTO(out_end, rc);
2142         }
2143
2144         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2149                           NULL, NULL);
2150         if (rc)
2151                 GOTO(out_end, rc);
2152
2153         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2154         if (rc)
2155                 GOTO(out_end, rc);
2156
2157         /* Add mdc(osp) to lod */
2158         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2159         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2160                          index_str, "1", NULL);
2161         if (rc)
2162                 GOTO(out_end, rc);
2163
2164         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2165         if (rc)
2166                 GOTO(out_end, rc);
2167
2168 out_end:
2169         record_end_log(env, &llh);
2170
2171 out_destory:
2172         name_destroy(&mdtuuid);
2173         name_destroy(&lovuuid);
2174         name_destroy(&lovname);
2175         name_destroy(&ospname);
2176         name_destroy(&svname);
2177         name_destroy(&nodeuuid);
2178         name_destroy(&mdtname);
2179         RETURN(rc);
2180 }
2181
2182 static int mgs_write_log_mdt0(const struct lu_env *env,
2183                               struct mgs_device *mgs,
2184                               struct fs_db *fsdb,
2185                               struct mgs_target_info *mti)
2186 {
2187         char *log = mti->mti_svname;
2188         struct llog_handle *llh = NULL;
2189         char *uuid, *lovname;
2190         char mdt_index[6];
2191         char *ptr = mti->mti_params;
2192         int rc = 0, failout = 0;
2193         ENTRY;
2194
2195         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2196         if (uuid == NULL)
2197                 RETURN(-ENOMEM);
2198
2199         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2200                 failout = (strncmp(ptr, "failout", 7) == 0);
2201
2202         rc = name_create(&lovname, log, "-mdtlov");
2203         if (rc)
2204                 GOTO(out_free, rc);
2205         if (mgs_log_is_empty(env, mgs, log)) {
2206                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2207                 if (rc)
2208                         GOTO(out_lod, rc);
2209         }
2210
2211         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2212
2213         rc = record_start_log(env, mgs, &llh, log);
2214         if (rc)
2215                 GOTO(out_lod, rc);
2216
2217         /* add MDT itself */
2218
2219         /* FIXME this whole fn should be a single journal transaction */
2220         sprintf(uuid, "%s_UUID", log);
2221         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2222         if (rc)
2223                 GOTO(out_lod, rc);
2224         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2225         if (rc)
2226                 GOTO(out_end, rc);
2227         rc = record_mount_opt(env, llh, log, lovname, NULL);
2228         if (rc)
2229                 GOTO(out_end, rc);
2230         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2231                         failout ? "n" : "f");
2232         if (rc)
2233                 GOTO(out_end, rc);
2234         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2235         if (rc)
2236                 GOTO(out_end, rc);
2237 out_end:
2238         record_end_log(env, &llh);
2239 out_lod:
2240         name_destroy(&lovname);
2241 out_free:
2242         OBD_FREE(uuid, sizeof(struct obd_uuid));
2243         RETURN(rc);
2244 }
2245
2246 /* envelope method for all layers log */
2247 static int mgs_write_log_mdt(const struct lu_env *env,
2248                              struct mgs_device *mgs,
2249                              struct fs_db *fsdb,
2250                              struct mgs_target_info *mti)
2251 {
2252         struct mgs_thread_info *mgi = mgs_env_info(env);
2253         struct llog_handle *llh = NULL;
2254         char *cliname;
2255         int rc, i = 0;
2256         ENTRY;
2257
2258         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2259
2260         if (mti->mti_uuid[0] == '\0') {
2261                 /* Make up our own uuid */
2262                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2263                          "%s_UUID", mti->mti_svname);
2264         }
2265
2266         /* add mdt */
2267         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2268         if (rc)
2269                 RETURN(rc);
2270         /* Append the mdt info to the client log */
2271         rc = name_create(&cliname, mti->mti_fsname, "-client");
2272         if (rc)
2273                 RETURN(rc);
2274
2275         if (mgs_log_is_empty(env, mgs, cliname)) {
2276                 /* Start client log */
2277                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2278                                        fsdb->fsdb_clilov);
2279                 if (rc)
2280                         GOTO(out_free, rc);
2281                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2282                                        fsdb->fsdb_clilmv);
2283                 if (rc)
2284                         GOTO(out_free, rc);
2285         }
2286
2287         /*
2288         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2289         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2290         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2291         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2292         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2293         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2294         */
2295
2296                 /* copy client info about lov/lmv */
2297                 mgi->mgi_comp.comp_mti = mti;
2298                 mgi->mgi_comp.comp_fsdb = fsdb;
2299
2300                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2301                                                         &mgi->mgi_comp);
2302                 if (rc)
2303                         GOTO(out_free, rc);
2304                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2305                                               fsdb->fsdb_clilmv);
2306                 if (rc)
2307                         GOTO(out_free, rc);
2308
2309                 /* add mountopts */
2310                 rc = record_start_log(env, mgs, &llh, cliname);
2311                 if (rc)
2312                         GOTO(out_free, rc);
2313
2314                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2315                                    "mount opts");
2316                 if (rc)
2317                         GOTO(out_end, rc);
2318                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2319                                       fsdb->fsdb_clilmv);
2320                 if (rc)
2321                         GOTO(out_end, rc);
2322                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2323                                    "mount opts");
2324
2325         if (rc)
2326                 GOTO(out_end, rc);
2327
2328         /* for_all_existing_mdt except current one */
2329         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2330                 if (i !=  mti->mti_stripe_index &&
2331                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2332                         char *logname;
2333
2334                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2335                         if (rc)
2336                                 GOTO(out_end, rc);
2337
2338                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2339                                                       i, logname);
2340                         name_destroy(&logname);
2341                         if (rc)
2342                                 GOTO(out_end, rc);
2343                 }
2344         }
2345 out_end:
2346         record_end_log(env, &llh);
2347 out_free:
2348         name_destroy(&cliname);
2349         RETURN(rc);
2350 }
2351
2352 /* Add the ost info to the client/mdt lov */
2353 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2354                                     struct mgs_device *mgs, struct fs_db *fsdb,
2355                                     struct mgs_target_info *mti,
2356                                     char *logname, char *suffix, char *lovname,
2357                                     enum lustre_sec_part sec_part, int flags)
2358 {
2359         struct llog_handle *llh = NULL;
2360         char *nodeuuid = NULL;
2361         char *oscname = NULL;
2362         char *oscuuid = NULL;
2363         char *lovuuid = NULL;
2364         char *svname = NULL;
2365         char index[6];
2366         char nidstr[LNET_NIDSTR_SIZE];
2367         int i, rc;
2368         ENTRY;
2369
2370         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2371                 mti->mti_svname, logname);
2372
2373         if (mgs_log_is_empty(env, mgs, logname)) {
2374                 CERROR("log is empty! Logical error\n");
2375                 RETURN(-EINVAL);
2376         }
2377
2378         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2379         rc = name_create(&nodeuuid, nidstr, "");
2380         if (rc)
2381                 RETURN(rc);
2382         rc = name_create(&svname, mti->mti_svname, "-osc");
2383         if (rc)
2384                 GOTO(out_free, rc);
2385
2386         /* for the system upgraded from old 1.8, keep using the old osc naming
2387          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2388         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2389                 rc = name_create(&oscname, svname, "");
2390         else
2391                 rc = name_create(&oscname, svname, suffix);
2392         if (rc)
2393                 GOTO(out_free, rc);
2394
2395         rc = name_create(&oscuuid, oscname, "_UUID");
2396         if (rc)
2397                 GOTO(out_free, rc);
2398         rc = name_create(&lovuuid, lovname, "_UUID");
2399         if (rc)
2400                 GOTO(out_free, rc);
2401
2402
2403         /*
2404         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2405         multihomed (#4)
2406         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2407         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2408         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2409         failover (#6,7)
2410         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2411         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2412         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2413         */
2414
2415         rc = record_start_log(env, mgs, &llh, logname);
2416         if (rc)
2417                 GOTO(out_free, rc);
2418
2419         /* FIXME these should be a single journal transaction */
2420         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2421                            "add osc");
2422         if (rc)
2423                 GOTO(out_end, rc);
2424
2425         /* NB: don't change record order, because upon MDT steal OSC config
2426          * from client, it treats all nids before LCFG_SETUP as target nids
2427          * (multiple interfaces), while nids after as failover node nids.
2428          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2429          */
2430         for (i = 0; i < mti->mti_nid_count; i++) {
2431                 CDEBUG(D_MGS, "add nid %s\n",
2432                         libcfs_nid2str_r(mti->mti_nids[i],
2433                                          nidstr, sizeof(nidstr)));
2434                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2435                 if (rc)
2436                         GOTO(out_end, rc);
2437         }
2438         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2439         if (rc)
2440                 GOTO(out_end, rc);
2441         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2442                           NULL, NULL);
2443         if (rc)
2444                 GOTO(out_end, rc);
2445         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2446         if (rc)
2447                 GOTO(out_end, rc);
2448
2449         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2450
2451         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2452         if (rc)
2453                 GOTO(out_end, rc);
2454         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2455                            "add osc");
2456         if (rc)
2457                 GOTO(out_end, rc);
2458 out_end:
2459         record_end_log(env, &llh);
2460 out_free:
2461         name_destroy(&lovuuid);
2462         name_destroy(&oscuuid);
2463         name_destroy(&oscname);
2464         name_destroy(&svname);
2465         name_destroy(&nodeuuid);
2466         RETURN(rc);
2467 }
2468
2469 static int mgs_write_log_ost(const struct lu_env *env,
2470                              struct mgs_device *mgs, struct fs_db *fsdb,
2471                              struct mgs_target_info *mti)
2472 {
2473         struct llog_handle *llh = NULL;
2474         char *logname, *lovname;
2475         char *ptr = mti->mti_params;
2476         int rc, flags = 0, failout = 0, i;
2477         ENTRY;
2478
2479         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2480
2481         /* The ost startup log */
2482
2483         /* If the ost log already exists, that means that someone reformatted
2484            the ost and it called target_add again. */
2485         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2486                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2487                                    "exists, yet the server claims it never "
2488                                    "registered. It may have been reformatted, "
2489                                    "or the index changed. writeconf the MDT to "
2490                                    "regenerate all logs.\n", mti->mti_svname);
2491                 RETURN(-EALREADY);
2492         }
2493
2494         /*
2495         attach obdfilter ost1 ost1_UUID
2496         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2497         */
2498         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2499                 failout = (strncmp(ptr, "failout", 7) == 0);
2500         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2501         if (rc)
2502                 RETURN(rc);
2503         /* FIXME these should be a single journal transaction */
2504         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2505         if (rc)
2506                 GOTO(out_end, rc);
2507         if (*mti->mti_uuid == '\0')
2508                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2509                          "%s_UUID", mti->mti_svname);
2510         rc = record_attach(env, llh, mti->mti_svname,
2511                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2512         if (rc)
2513                 GOTO(out_end, rc);
2514         rc = record_setup(env, llh, mti->mti_svname,
2515                           "dev"/*ignored*/, "type"/*ignored*/,
2516                           failout ? "n" : "f", NULL/*options*/);
2517         if (rc)
2518                 GOTO(out_end, rc);
2519         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2520         if (rc)
2521                 GOTO(out_end, rc);
2522 out_end:
2523         record_end_log(env, &llh);
2524         if (rc)
2525                 RETURN(rc);
2526         /* We also have to update the other logs where this osc is part of
2527            the lov */
2528
2529         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2530                 /* If we're upgrading, the old mdt log already has our
2531                    entry. Let's do a fake one for fun. */
2532                 /* Note that we can't add any new failnids, since we don't
2533                    know the old osc names. */
2534                 flags = CM_SKIP | CM_UPGRADE146;
2535
2536         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2537                 /* If the update flag isn't set, don't update client/mdt
2538                    logs. */
2539                 flags |= CM_SKIP;
2540                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2541                               "the MDT first to regenerate it.\n",
2542                               mti->mti_svname);
2543         }
2544
2545         /* Add ost to all MDT lov defs */
2546         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2547                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2548                         char mdt_index[9];
2549
2550                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2551                                                      i);
2552                         if (rc)
2553                                 RETURN(rc);
2554                         sprintf(mdt_index, "-MDT%04x", i);
2555                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2556                                                       logname, mdt_index,
2557                                                       lovname, LUSTRE_SP_MDT,
2558                                                       flags);
2559                         name_destroy(&logname);
2560                         name_destroy(&lovname);
2561                         if (rc)
2562                                 RETURN(rc);
2563                 }
2564         }
2565
2566         /* Append ost info to the client log */
2567         rc = name_create(&logname, mti->mti_fsname, "-client");
2568         if (rc)
2569                 RETURN(rc);
2570         if (mgs_log_is_empty(env, mgs, logname)) {
2571                 /* Start client log */
2572                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2573                                        fsdb->fsdb_clilov);
2574                 if (rc)
2575                         GOTO(out_free, rc);
2576                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2577                                        fsdb->fsdb_clilmv);
2578                 if (rc)
2579                         GOTO(out_free, rc);
2580         }
2581         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2582                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2583 out_free:
2584         name_destroy(&logname);
2585         RETURN(rc);
2586 }
2587
2588 static __inline__ int mgs_param_empty(char *ptr)
2589 {
2590         char *tmp;
2591
2592         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2593                 return 1;
2594         return 0;
2595 }
2596
2597 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2598                                           struct mgs_device *mgs,
2599                                           struct fs_db *fsdb,
2600                                           struct mgs_target_info *mti,
2601                                           char *logname, char *cliname)
2602 {
2603         int rc;
2604         struct llog_handle *llh = NULL;
2605
2606         if (mgs_param_empty(mti->mti_params)) {
2607                 /* Remove _all_ failnids */
2608                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2609                                 mti->mti_svname, "add failnid", CM_SKIP);
2610                 return rc < 0 ? rc : 0;
2611         }
2612
2613         /* Otherwise failover nids are additive */
2614         rc = record_start_log(env, mgs, &llh, logname);
2615         if (rc)
2616                 return rc;
2617                 /* FIXME this should be a single journal transaction */
2618         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2619                            "add failnid");
2620         if (rc)
2621                 goto out_end;
2622         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2623         if (rc)
2624                 goto out_end;
2625         rc = record_marker(env, llh, fsdb, CM_END,
2626                            mti->mti_svname, "add failnid");
2627 out_end:
2628         record_end_log(env, &llh);
2629         return rc;
2630 }
2631
2632
2633 /* Add additional failnids to an existing log.
2634    The mdc/osc must have been added to logs first */
2635 /* tcp nids must be in dotted-quad ascii -
2636    we can't resolve hostnames from the kernel. */
2637 static int mgs_write_log_add_failnid(const struct lu_env *env,
2638                                      struct mgs_device *mgs,
2639                                      struct fs_db *fsdb,
2640                                      struct mgs_target_info *mti)
2641 {
2642         char *logname, *cliname;
2643         int rc;
2644         ENTRY;
2645
2646         /* FIXME we currently can't erase the failnids
2647          * given when a target first registers, since they aren't part of
2648          * an "add uuid" stanza */
2649
2650         /* Verify that we know about this target */
2651         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2652                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2653                                    "yet. It must be started before failnids "
2654                                    "can be added.\n", mti->mti_svname);
2655                 RETURN(-ENOENT);
2656         }
2657
2658         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2659         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2660                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2661         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2662                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2663         } else {
2664                 RETURN(-EINVAL);
2665         }
2666         if (rc)
2667                 RETURN(rc);
2668         /* Add failover nids to the client log */
2669         rc = name_create(&logname, mti->mti_fsname, "-client");
2670         if (rc) {
2671                 name_destroy(&cliname);
2672                 RETURN(rc);
2673         }
2674         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2675         name_destroy(&logname);
2676         name_destroy(&cliname);
2677         if (rc)
2678                 RETURN(rc);
2679
2680         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2681                 /* Add OST failover nids to the MDT logs as well */
2682                 int i;
2683
2684                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2685                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2686                                 continue;
2687                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2688                         if (rc)
2689                                 RETURN(rc);
2690                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2691                                                  fsdb, i);
2692                         if (rc) {
2693                                 name_destroy(&logname);
2694                                 RETURN(rc);
2695                         }
2696                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2697                                                             mti, logname,
2698                                                             cliname);
2699                         name_destroy(&cliname);
2700                         name_destroy(&logname);
2701                         if (rc)
2702                                 RETURN(rc);
2703                 }
2704         }
2705
2706         RETURN(rc);
2707 }
2708
2709 static int mgs_wlp_lcfg(const struct lu_env *env,
2710                         struct mgs_device *mgs, struct fs_db *fsdb,
2711                         struct mgs_target_info *mti,
2712                         char *logname, struct lustre_cfg_bufs *bufs,
2713                         char *tgtname, char *ptr)
2714 {
2715         char comment[MTI_NAME_MAXLEN];
2716         char *tmp;
2717         struct llog_cfg_rec *lcr;
2718         int rc, del;
2719
2720         /* Erase any old settings of this same parameter */
2721         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2722         comment[MTI_NAME_MAXLEN - 1] = 0;
2723         /* But don't try to match the value. */
2724         tmp = strchr(comment, '=');
2725         if (tmp != NULL)
2726                 *tmp = 0;
2727         /* FIXME we should skip settings that are the same as old values */
2728         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2729         if (rc < 0)
2730                 return rc;
2731         del = mgs_param_empty(ptr);
2732
2733         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2734                       "Setting" : "Modifying", tgtname, comment, logname);
2735         if (del) {
2736                 /* mgs_modify() will return 1 if nothing had to be done */
2737                 if (rc == 1)
2738                         rc = 0;
2739                 return rc;
2740         }
2741
2742         lustre_cfg_bufs_reset(bufs, tgtname);
2743         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2744         if (mti->mti_flags & LDD_F_PARAM2)
2745                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2746
2747         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2748                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2749         if (lcr == NULL)
2750                 return -ENOMEM;
2751
2752         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2753                                   comment);
2754         lustre_cfg_rec_free(lcr);
2755         return rc;
2756 }
2757
2758 static int mgs_write_log_param2(const struct lu_env *env,
2759                                 struct mgs_device *mgs,
2760                                 struct fs_db *fsdb,
2761                                 struct mgs_target_info *mti, char *ptr)
2762 {
2763         struct lustre_cfg_bufs  bufs;
2764         int                     rc = 0;
2765         ENTRY;
2766
2767         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2768         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2769                           mti->mti_svname, ptr);
2770
2771         RETURN(rc);
2772 }
2773
2774 /* write global variable settings into log */
2775 static int mgs_write_log_sys(const struct lu_env *env,
2776                              struct mgs_device *mgs, struct fs_db *fsdb,
2777                              struct mgs_target_info *mti, char *sys, char *ptr)
2778 {
2779         struct mgs_thread_info  *mgi = mgs_env_info(env);
2780         struct lustre_cfg       *lcfg;
2781         struct llog_cfg_rec     *lcr;
2782         char *tmp, sep;
2783         int rc, cmd, convert = 1;
2784
2785         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2786                 cmd = LCFG_SET_TIMEOUT;
2787         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2788                 cmd = LCFG_SET_LDLM_TIMEOUT;
2789         /* Check for known params here so we can return error to lctl */
2790         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2791                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2792                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2793                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2794                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2795                 cmd = LCFG_PARAM;
2796         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2797                 convert = 0; /* Don't convert string value to integer */
2798                 cmd = LCFG_PARAM;
2799         } else {
2800                 return -EINVAL;
2801         }
2802
2803         if (mgs_param_empty(ptr))
2804                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2805         else
2806                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2807
2808         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2809         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2810         if (!convert && *tmp != '\0')
2811                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2812         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2813         if (lcr == NULL)
2814                 return -ENOMEM;
2815
2816         lcfg = &lcr->lcr_cfg;
2817         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2818         /* truncate the comment to the parameter name */
2819         ptr = tmp - 1;
2820         sep = *ptr;
2821         *ptr = '\0';
2822         /* modify all servers and clients */
2823         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2824                                       *tmp == '\0' ? NULL : lcr,
2825                                       mti->mti_fsname, sys, 0);
2826         if (rc == 0 && *tmp != '\0') {
2827                 switch (cmd) {
2828                 case LCFG_SET_TIMEOUT:
2829                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2830                                 class_process_config(lcfg);
2831                         break;
2832                 case LCFG_SET_LDLM_TIMEOUT:
2833                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2834                                 class_process_config(lcfg);
2835                         break;
2836                 default:
2837                         break;
2838                 }
2839         }
2840         *ptr = sep;
2841         lustre_cfg_rec_free(lcr);
2842         return rc;
2843 }
2844
2845 /* write quota settings into log */
2846 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2847                                struct fs_db *fsdb, struct mgs_target_info *mti,
2848                                char *quota, char *ptr)
2849 {
2850         struct mgs_thread_info  *mgi = mgs_env_info(env);
2851         struct llog_cfg_rec     *lcr;
2852         char                    *tmp;
2853         char                     sep;
2854         int                      rc, cmd = LCFG_PARAM;
2855
2856         /* support only 'meta' and 'data' pools so far */
2857         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2858             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2859                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2860                        "& quota.ost are)\n", ptr);
2861                 return -EINVAL;
2862         }
2863
2864         if (*tmp == '\0') {
2865                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2866         } else {
2867                 CDEBUG(D_MGS, "global '%s'\n", quota);
2868
2869                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2870                     strcmp(tmp, "none") != 0) {
2871                         CERROR("enable option(%s) isn't supported\n", tmp);
2872                         return -EINVAL;
2873                 }
2874         }
2875
2876         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2877         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2878         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2879         if (lcr == NULL)
2880                 return -ENOMEM;
2881
2882         /* truncate the comment to the parameter name */
2883         ptr = tmp - 1;
2884         sep = *ptr;
2885         *ptr = '\0';
2886
2887         /* XXX we duplicated quota enable information in all server
2888          *     config logs, it should be moved to a separate config
2889          *     log once we cleanup the config log for global param. */
2890         /* modify all servers */
2891         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2892                                       *tmp == '\0' ? NULL : lcr,
2893                                       mti->mti_fsname, quota, 1);
2894         *ptr = sep;
2895         lustre_cfg_rec_free(lcr);
2896         return rc < 0 ? rc : 0;
2897 }
2898
2899 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2900                                    struct mgs_device *mgs,
2901                                    struct fs_db *fsdb,
2902                                    struct mgs_target_info *mti,
2903                                    char *param)
2904 {
2905         struct mgs_thread_info  *mgi = mgs_env_info(env);
2906         struct llog_cfg_rec     *lcr;
2907         struct llog_handle      *llh = NULL;
2908         char                    *logname;
2909         char                    *comment, *ptr;
2910         int                      rc, len;
2911
2912         ENTRY;
2913
2914         /* get comment */
2915         ptr = strchr(param, '=');
2916         LASSERT(ptr != NULL);
2917         len = ptr - param;
2918
2919         OBD_ALLOC(comment, len + 1);
2920         if (comment == NULL)
2921                 RETURN(-ENOMEM);
2922         strncpy(comment, param, len);
2923         comment[len] = '\0';
2924
2925         /* prepare lcfg */
2926         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2927         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2928         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2929         if (lcr == NULL)
2930                 GOTO(out_comment, rc = -ENOMEM);
2931
2932         /* construct log name */
2933         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2934         if (rc < 0)
2935                 GOTO(out_lcfg, rc);
2936
2937         if (mgs_log_is_empty(env, mgs, logname)) {
2938                 rc = record_start_log(env, mgs, &llh, logname);
2939                 if (rc < 0)
2940                         GOTO(out, rc);
2941                 record_end_log(env, &llh);
2942         }
2943
2944         /* obsolete old one */
2945         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2946                         comment, CM_SKIP);
2947         if (rc < 0)
2948                 GOTO(out, rc);
2949         /* write the new one */
2950         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2951                                   mti->mti_svname, comment);
2952         if (rc)
2953                 CERROR("%s: error writing log %s: rc = %d\n",
2954                        mgs->mgs_obd->obd_name, logname, rc);
2955 out:
2956         name_destroy(&logname);
2957 out_lcfg:
2958         lustre_cfg_rec_free(lcr);
2959 out_comment:
2960         OBD_FREE(comment, len + 1);
2961         RETURN(rc);
2962 }
2963
2964 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2965                                         char *param)
2966 {
2967         char    *ptr;
2968
2969         /* disable the adjustable udesc parameter for now, i.e. use default
2970          * setting that client always ship udesc to MDT if possible. to enable
2971          * it simply remove the following line */
2972         goto error_out;
2973
2974         ptr = strchr(param, '=');
2975         if (ptr == NULL)
2976                 goto error_out;
2977         *ptr++ = '\0';
2978
2979         if (strcmp(param, PARAM_SRPC_UDESC))
2980                 goto error_out;
2981
2982         if (strcmp(ptr, "yes") == 0) {
2983                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2984                 CWARN("Enable user descriptor shipping from client to MDT\n");
2985         } else if (strcmp(ptr, "no") == 0) {
2986                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2987                 CWARN("Disable user descriptor shipping from client to MDT\n");
2988         } else {
2989                 *(ptr - 1) = '=';
2990                 goto error_out;
2991         }
2992         return 0;
2993
2994 error_out:
2995         CERROR("Invalid param: %s\n", param);
2996         return -EINVAL;
2997 }
2998
2999 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3000                                   const char *svname,
3001                                   char *param)
3002 {
3003         struct sptlrpc_rule      rule;
3004         struct sptlrpc_rule_set *rset;
3005         int                      rc;
3006         ENTRY;
3007
3008         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3009                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3010                 RETURN(-EINVAL);
3011         }
3012
3013         if (strncmp(param, PARAM_SRPC_UDESC,
3014                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3015                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3016         }
3017
3018         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3019                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3020                 RETURN(-EINVAL);
3021         }
3022
3023         param += sizeof(PARAM_SRPC_FLVR) - 1;
3024
3025         rc = sptlrpc_parse_rule(param, &rule);
3026         if (rc)
3027                 RETURN(rc);
3028
3029         /* mgs rules implies must be mgc->mgs */
3030         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3031                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3032                      rule.sr_from != LUSTRE_SP_ANY) ||
3033                     (rule.sr_to != LUSTRE_SP_MGS &&
3034                      rule.sr_to != LUSTRE_SP_ANY))
3035                         RETURN(-EINVAL);
3036         }
3037
3038         /* preapre room for this coming rule. svcname format should be:
3039          * - fsname: general rule
3040          * - fsname-tgtname: target-specific rule
3041          */
3042         if (strchr(svname, '-')) {
3043                 struct mgs_tgt_srpc_conf *tgtconf;
3044                 int                       found = 0;
3045
3046                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3047                      tgtconf = tgtconf->mtsc_next) {
3048                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3049                                 found = 1;
3050                                 break;
3051                         }
3052                 }
3053
3054                 if (!found) {
3055                         int name_len;
3056
3057                         OBD_ALLOC_PTR(tgtconf);
3058                         if (tgtconf == NULL)
3059                                 RETURN(-ENOMEM);
3060
3061                         name_len = strlen(svname);
3062
3063                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3064                         if (tgtconf->mtsc_tgt == NULL) {
3065                                 OBD_FREE_PTR(tgtconf);
3066                                 RETURN(-ENOMEM);
3067                         }
3068                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3069
3070                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3071                         fsdb->fsdb_srpc_tgt = tgtconf;
3072                 }
3073
3074                 rset = &tgtconf->mtsc_rset;
3075         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3076                 /* put _mgs related srpc rule directly in mgs ruleset */
3077                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3078         } else {
3079                 rset = &fsdb->fsdb_srpc_gen;
3080         }
3081
3082         rc = sptlrpc_rule_set_merge(rset, &rule);
3083
3084         RETURN(rc);
3085 }
3086
3087 static int mgs_srpc_set_param(const struct lu_env *env,
3088                               struct mgs_device *mgs,
3089                               struct fs_db *fsdb,
3090                               struct mgs_target_info *mti,
3091                               char *param)
3092 {
3093         char                   *copy;
3094         int                     rc, copy_size;
3095         ENTRY;
3096
3097 #ifndef HAVE_GSS
3098         RETURN(-EINVAL);
3099 #endif
3100         /* keep a copy of original param, which could be destroied
3101          * during parsing */
3102         copy_size = strlen(param) + 1;
3103         OBD_ALLOC(copy, copy_size);
3104         if (copy == NULL)
3105                 return -ENOMEM;
3106         memcpy(copy, param, copy_size);
3107
3108         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3109         if (rc)
3110                 goto out_free;
3111
3112         /* previous steps guaranteed the syntax is correct */
3113         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3114         if (rc)
3115                 goto out_free;
3116
3117         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3118                 /*
3119                  * for mgs rules, make them effective immediately.
3120                  */
3121                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3122                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3123                                                  &fsdb->fsdb_srpc_gen);
3124         }
3125
3126 out_free:
3127         OBD_FREE(copy, copy_size);
3128         RETURN(rc);
3129 }
3130
3131 struct mgs_srpc_read_data {
3132         struct fs_db   *msrd_fsdb;
3133         int             msrd_skip;
3134 };
3135
3136 static int mgs_srpc_read_handler(const struct lu_env *env,
3137                                  struct llog_handle *llh,
3138                                  struct llog_rec_hdr *rec, void *data)
3139 {
3140         struct mgs_srpc_read_data *msrd = data;
3141         struct cfg_marker         *marker;
3142         struct lustre_cfg         *lcfg = REC_DATA(rec);
3143         char                      *svname, *param;
3144         int                        cfg_len, rc;
3145         ENTRY;
3146
3147         if (rec->lrh_type != OBD_CFG_REC) {
3148                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3149                 RETURN(-EINVAL);
3150         }
3151
3152         cfg_len = REC_DATA_LEN(rec);
3153
3154         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3155         if (rc) {
3156                 CERROR("Insane cfg\n");
3157                 RETURN(rc);
3158         }
3159
3160         if (lcfg->lcfg_command == LCFG_MARKER) {
3161                 marker = lustre_cfg_buf(lcfg, 1);
3162
3163                 if (marker->cm_flags & CM_START &&
3164                     marker->cm_flags & CM_SKIP)
3165                         msrd->msrd_skip = 1;
3166                 if (marker->cm_flags & CM_END)
3167                         msrd->msrd_skip = 0;
3168
3169                 RETURN(0);
3170         }
3171
3172         if (msrd->msrd_skip)
3173                 RETURN(0);
3174
3175         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3176                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3177                 RETURN(0);
3178         }
3179
3180         svname = lustre_cfg_string(lcfg, 0);
3181         if (svname == NULL) {
3182                 CERROR("svname is empty\n");
3183                 RETURN(0);
3184         }
3185
3186         param = lustre_cfg_string(lcfg, 1);
3187         if (param == NULL) {
3188                 CERROR("param is empty\n");
3189                 RETURN(0);
3190         }
3191
3192         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3193         if (rc)
3194                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3195
3196         RETURN(0);
3197 }
3198
3199 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3200                                 struct mgs_device *mgs,
3201                                 struct fs_db *fsdb)
3202 {
3203         struct llog_handle        *llh = NULL;
3204         struct llog_ctxt          *ctxt;
3205         char                      *logname;
3206         struct mgs_srpc_read_data  msrd;
3207         int                        rc;
3208         ENTRY;
3209
3210         /* construct log name */
3211         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3212         if (rc)
3213                 RETURN(rc);
3214
3215         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3216         LASSERT(ctxt != NULL);
3217
3218         if (mgs_log_is_empty(env, mgs, logname))
3219                 GOTO(out, rc = 0);
3220
3221         rc = llog_open(env, ctxt, &llh, NULL, logname,
3222                        LLOG_OPEN_EXISTS);
3223         if (rc < 0) {
3224                 if (rc == -ENOENT)
3225                         rc = 0;
3226                 GOTO(out, rc);
3227         }
3228
3229         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3230         if (rc)
3231                 GOTO(out_close, rc);
3232
3233         if (llog_get_size(llh) <= 1)
3234                 GOTO(out_close, rc = 0);
3235
3236         msrd.msrd_fsdb = fsdb;
3237         msrd.msrd_skip = 0;
3238
3239         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3240                           NULL);
3241
3242 out_close:
3243         llog_close(env, llh);
3244 out:
3245         llog_ctxt_put(ctxt);
3246         name_destroy(&logname);
3247
3248         if (rc)
3249                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3250         RETURN(rc);
3251 }
3252
3253 /* Permanent settings of all parameters by writing into the appropriate
3254  * configuration logs.
3255  * A parameter with null value ("<param>='\0'") means to erase it out of
3256  * the logs.
3257  */
3258 static int mgs_write_log_param(const struct lu_env *env,
3259                                struct mgs_device *mgs, struct fs_db *fsdb,
3260                                struct mgs_target_info *mti, char *ptr)
3261 {
3262         struct mgs_thread_info *mgi = mgs_env_info(env);
3263         char *logname;
3264         char *tmp;
3265         int rc = 0, rc2 = 0;
3266         ENTRY;
3267
3268         /* For various parameter settings, we have to figure out which logs
3269            care about them (e.g. both mdt and client for lov settings) */
3270         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3271
3272         /* The params are stored in MOUNT_DATA_FILE and modified via
3273            tunefs.lustre, or set using lctl conf_param */
3274
3275         /* Processed in lustre_start_mgc */
3276         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3277                 GOTO(end, rc);
3278
3279         /* Processed in ost/mdt */
3280         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3281                 GOTO(end, rc);
3282
3283         /* Processed in mgs_write_log_ost */
3284         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3285                 if (mti->mti_flags & LDD_F_PARAM) {
3286                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3287                                            "changed with tunefs.lustre"
3288                                            "and --writeconf\n", ptr);
3289                         rc = -EPERM;
3290                 }
3291                 GOTO(end, rc);
3292         }
3293
3294         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3295                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3296                 GOTO(end, rc);
3297         }
3298
3299         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3300                 /* Add a failover nidlist */
3301                 rc = 0;
3302                 /* We already processed failovers params for new
3303                    targets in mgs_write_log_target */
3304                 if (mti->mti_flags & LDD_F_PARAM) {
3305                         CDEBUG(D_MGS, "Adding failnode\n");
3306                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3307                 }
3308                 GOTO(end, rc);
3309         }
3310
3311         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3312                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3313                 GOTO(end, rc);
3314         }
3315
3316         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3317                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3318                 GOTO(end, rc);
3319         }
3320
3321         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3322             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3323                 /* active=0 means off, anything else means on */
3324                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3325                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3326                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3327                 int i;
3328
3329                 if (!deactive_osc) {
3330                         __u32   index;
3331
3332                         rc = server_name2index(mti->mti_svname, &index, NULL);
3333                         if (rc < 0)
3334                                 GOTO(end, rc);
3335
3336                         if (index == 0) {
3337                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3338                                                    " (de)activated.\n",
3339                                                    mti->mti_svname);
3340                                 GOTO(end, rc = -EINVAL);
3341                         }
3342                 }
3343
3344                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3345                               flag ? "de" : "re", mti->mti_svname);
3346                 /* Modify clilov */
3347                 rc = name_create(&logname, mti->mti_fsname, "-client");
3348                 if (rc < 0)
3349                         GOTO(end, rc);
3350                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3351                                 mti->mti_svname,
3352                                 deactive_osc ? "add osc" : "add mdc", flag);
3353                 name_destroy(&logname);
3354                 if (rc < 0)
3355                         goto active_err;
3356
3357                 /* Modify mdtlov */
3358                 /* Add to all MDT logs for DNE */
3359                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3360                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3361                                 continue;
3362                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3363                         if (rc < 0)
3364                                 GOTO(end, rc);
3365                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3366                                         mti->mti_svname,
3367                                         deactive_osc ? "add osc" : "add osp",
3368                                         flag);
3369                         name_destroy(&logname);
3370                         if (rc < 0)
3371                                 goto active_err;
3372                 }
3373 active_err:
3374                 if (rc < 0) {
3375                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3376                                            "log (%d). No permanent "
3377                                            "changes were made to the "
3378                                            "config log.\n",
3379                                            mti->mti_svname, rc);
3380                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3381                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3382                                                    " because the log"
3383                                                    "is in the old 1.4"
3384                                                    "style. Consider "
3385                                                    " --writeconf to "
3386                                                    "update the logs.\n");
3387                         GOTO(end, rc);
3388                 }
3389                 /* Fall through to osc/mdc proc for deactivating live
3390                    OSC/OSP on running MDT / clients. */
3391         }
3392         /* Below here, let obd's XXX_process_config methods handle it */
3393
3394         /* All lov. in proc */
3395         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3396                 char *mdtlovname;
3397
3398                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3399                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3400                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3401                                            "set on the MDT, not %s. "
3402                                            "Ignoring.\n",
3403                                            mti->mti_svname);
3404                         GOTO(end, rc = 0);
3405                 }
3406
3407                 /* Modify mdtlov */
3408                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3409                         GOTO(end, rc = -ENODEV);
3410
3411                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3412                                              mti->mti_stripe_index);
3413                 if (rc)
3414                         GOTO(end, rc);
3415                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3416                                   &mgi->mgi_bufs, mdtlovname, ptr);
3417                 name_destroy(&logname);
3418                 name_destroy(&mdtlovname);
3419                 if (rc)
3420                         GOTO(end, rc);
3421
3422                 /* Modify clilov */
3423                 rc = name_create(&logname, mti->mti_fsname, "-client");
3424                 if (rc)
3425                         GOTO(end, rc);
3426                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3427                                   fsdb->fsdb_clilov, ptr);
3428                 name_destroy(&logname);
3429                 GOTO(end, rc);
3430         }
3431
3432         /* All osc., mdc., llite. params in proc */
3433         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3434             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3435             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3436                 char *cname;
3437
3438                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3439                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3440                                            " cannot be modified. Consider"
3441                                            " updating the configuration with"
3442                                            " --writeconf\n",
3443                                            mti->mti_svname);
3444                         GOTO(end, rc = -EINVAL);
3445                 }
3446                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3447                         rc = name_create(&cname, mti->mti_fsname, "-client");
3448                         /* Add the client type to match the obdname in
3449                            class_config_llog_handler */
3450                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3451                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3452                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3453                         rc = name_create(&cname, mti->mti_svname, "-osc");
3454                 } else {
3455                         GOTO(end, rc = -EINVAL);
3456                 }
3457                 if (rc)
3458                         GOTO(end, rc);
3459
3460                 /* Forbid direct update of llite root squash parameters.
3461                  * These parameters are indirectly set via the MDT settings.
3462                  * See (LU-1778) */
3463                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3464                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3465                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3466                         LCONSOLE_ERROR("%s: root squash parameters can only "
3467                                 "be updated through MDT component\n",
3468                                 mti->mti_fsname);
3469                         name_destroy(&cname);
3470                         GOTO(end, rc = -EINVAL);
3471                 }
3472
3473                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3474
3475                 /* Modify client */
3476                 rc = name_create(&logname, mti->mti_fsname, "-client");
3477                 if (rc) {
3478                         name_destroy(&cname);
3479                         GOTO(end, rc);
3480                 }
3481                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3482                                   cname, ptr);
3483
3484                 /* osc params affect the MDT as well */
3485                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3486                         int i;
3487
3488                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3489                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3490                                         continue;
3491                                 name_destroy(&cname);
3492                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3493                                                          fsdb, i);
3494                                 name_destroy(&logname);
3495                                 if (rc)
3496                                         break;
3497                                 rc = name_create_mdt(&logname,
3498                                                      mti->mti_fsname, i);
3499                                 if (rc)
3500                                         break;
3501                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3502                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3503                                                           mti, logname,
3504                                                           &mgi->mgi_bufs,
3505                                                           cname, ptr);
3506                                         if (rc)
3507                                                 break;
3508                                 }
3509                         }
3510                 }
3511
3512                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
3513                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
3514                     rc == 0) {
3515                         char suffix[16];
3516                         char *lodname = NULL;
3517                         char *param_str = NULL;
3518                         int i;
3519                         int index;
3520
3521                         /* replace mdc with osp */
3522                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
3523                         rc = server_name2index(mti->mti_svname, &index, NULL);
3524                         if (rc < 0) {
3525                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3526                                 GOTO(end, rc);
3527                         }
3528
3529                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3530                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3531                                         continue;
3532
3533                                 if (i == index)
3534                                         continue;
3535
3536                                 name_destroy(&logname);
3537                                 rc = name_create_mdt(&logname, mti->mti_fsname,
3538                                                      i);
3539                                 if (rc < 0)
3540                                         break;
3541
3542                                 if (mgs_log_is_empty(env, mgs, logname))
3543                                         continue;
3544
3545                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
3546                                          i);
3547                                 name_destroy(&cname);
3548                                 rc = name_create(&cname, mti->mti_svname,
3549                                                  suffix);
3550                                 if (rc < 0)
3551                                         break;
3552
3553                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3554                                                   &mgi->mgi_bufs, cname, ptr);
3555                                 if (rc < 0)
3556                                         break;
3557
3558                                 /* Add configuration log for noitfying LOD
3559                                  * to active/deactive the OSP. */
3560                                 name_destroy(&param_str);
3561                                 rc = name_create(&param_str, cname,
3562                                                  (*tmp == '0') ?  ".active=0" :
3563                                                  ".active=1");
3564                                 if (rc < 0)
3565                                         break;
3566
3567                                 name_destroy(&lodname);
3568                                 rc = name_create(&lodname, logname, "-mdtlov");
3569                                 if (rc < 0)
3570                                         break;
3571
3572                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3573                                                   &mgi->mgi_bufs, lodname,
3574                                                   param_str);
3575                                 if (rc < 0)
3576                                         break;
3577                         }
3578                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3579                         name_destroy(&lodname);
3580                         name_destroy(&param_str);
3581                 }
3582
3583                 name_destroy(&logname);
3584                 name_destroy(&cname);
3585                 GOTO(end, rc);
3586         }
3587
3588         /* All mdt. params in proc */
3589         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3590                 int i;
3591                 __u32 idx;
3592
3593                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3594                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3595                             MTI_NAME_MAXLEN) == 0)
3596                         /* device is unspecified completely? */
3597                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3598                 else
3599                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3600                 if (rc < 0)
3601                         goto active_err;
3602                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3603                         goto active_err;
3604                 if (rc & LDD_F_SV_ALL) {
3605                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3606                                 if (!test_bit(i,
3607                                                   fsdb->fsdb_mdt_index_map))
3608                                         continue;
3609                                 rc = name_create_mdt(&logname,
3610                                                 mti->mti_fsname, i);
3611                                 if (rc)
3612                                         goto active_err;
3613                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3614                                                   logname, &mgi->mgi_bufs,
3615                                                   logname, ptr);
3616                                 name_destroy(&logname);
3617                                 if (rc)
3618                                         goto active_err;
3619                         }
3620                 } else {
3621                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3622                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3623                                 LCONSOLE_ERROR("%s: root squash parameters "
3624                                         "cannot be applied to a single MDT\n",
3625                                         mti->mti_fsname);
3626                                 GOTO(end, rc = -EINVAL);
3627                         }
3628                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3629                                           mti->mti_svname, &mgi->mgi_bufs,
3630                                           mti->mti_svname, ptr);
3631                         if (rc)
3632                                 goto active_err;
3633                 }
3634
3635                 /* root squash settings are also applied to llite
3636                  * config log (see LU-1778) */
3637                 if (rc == 0 &&
3638                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3639                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3640                         char *cname;
3641                         char *ptr2;
3642
3643                         rc = name_create(&cname, mti->mti_fsname, "-client");
3644                         if (rc)
3645                                 GOTO(end, rc);
3646                         rc = name_create(&logname, mti->mti_fsname, "-client");
3647                         if (rc) {
3648                                 name_destroy(&cname);
3649                                 GOTO(end, rc);
3650                         }
3651                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3652                         if (rc) {
3653                                 name_destroy(&cname);
3654                                 name_destroy(&logname);
3655                                 GOTO(end, rc);
3656                         }
3657                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3658                                           &mgi->mgi_bufs, cname, ptr2);
3659                         name_destroy(&ptr2);
3660                         name_destroy(&logname);
3661                         name_destroy(&cname);
3662                 }
3663                 GOTO(end, rc);
3664         }
3665
3666         /* All mdd., ost. and osd. params in proc */
3667         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3668             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3669             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3670                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3671                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3672                         GOTO(end, rc = -ENODEV);
3673
3674                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3675                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3676                 GOTO(end, rc);
3677         }
3678
3679         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3680         rc2 = -ENOSYS;
3681
3682 end:
3683         if (rc)
3684                 CERROR("err %d on param '%s'\n", rc, ptr);
3685
3686         RETURN(rc ?: rc2);
3687 }
3688
3689 /* Not implementing automatic failover nid addition at this time. */
3690 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3691                       struct mgs_target_info *mti)
3692 {
3693 #if 0
3694         struct fs_db *fsdb;
3695         int rc;
3696         ENTRY;
3697
3698         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3699         if (rc)
3700                 RETURN(rc);
3701
3702         if (mgs_log_is_empty(obd, mti->mti_svname))
3703                 /* should never happen */
3704                 RETURN(-ENOENT);
3705
3706         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3707
3708         /* FIXME We can just check mti->params to see if we're already in
3709            the failover list.  Modify mti->params for rewriting back at
3710            server_register_target(). */
3711
3712         mutex_lock(&fsdb->fsdb_mutex);
3713         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3714         mutex_unlock(&fsdb->fsdb_mutex);
3715         char    *buf, *params;
3716         int      rc = -EINVAL;
3717
3718         RETURN(rc);
3719 #endif
3720         return 0;
3721 }
3722
3723 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3724                          struct mgs_target_info *mti, struct fs_db *fsdb)
3725 {
3726         char    *buf, *params;
3727         int      rc = -EINVAL;
3728
3729         ENTRY;
3730
3731         /* set/check the new target index */
3732         rc = mgs_set_index(env, mgs, mti);
3733         if (rc < 0)
3734                 RETURN(rc);
3735
3736         if (rc == EALREADY) {
3737                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3738                               mti->mti_stripe_index, mti->mti_svname);
3739                 /* We would like to mark old log sections as invalid
3740                    and add new log sections in the client and mdt logs.
3741                    But if we add new sections, then live clients will
3742                    get repeat setup instructions for already running
3743                    osc's. So don't update the client/mdt logs. */
3744                 mti->mti_flags &= ~LDD_F_UPDATE;
3745                 rc = 0;
3746         }
3747
3748         mutex_lock(&fsdb->fsdb_mutex);
3749
3750         if (mti->mti_flags &
3751             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3752                 /* Generate a log from scratch */
3753                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3754                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3755                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3756                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3757                 } else {
3758                         CERROR("Unknown target type %#x, can't create log for "
3759                                "%s\n", mti->mti_flags, mti->mti_svname);
3760                 }
3761                 if (rc) {
3762                         CERROR("Can't write logs for %s (%d)\n",
3763                                mti->mti_svname, rc);
3764                         GOTO(out_up, rc);
3765                 }
3766         } else {
3767                 /* Just update the params from tunefs in mgs_write_log_params */
3768                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3769                 mti->mti_flags |= LDD_F_PARAM;
3770         }
3771
3772         /* allocate temporary buffer, where class_get_next_param will
3773            make copy of a current  parameter */
3774         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3775         if (buf == NULL)
3776                 GOTO(out_up, rc = -ENOMEM);
3777         params = mti->mti_params;
3778         while (params != NULL) {
3779                 rc = class_get_next_param(&params, buf);
3780                 if (rc) {
3781                         if (rc == 1)
3782                                 /* there is no next parameter, that is
3783                                    not an error */
3784                                 rc = 0;
3785                         break;
3786                 }
3787                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3788                        params, buf);
3789                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3790                 if (rc)
3791                         break;
3792         }
3793
3794         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3795
3796 out_up:
3797         mutex_unlock(&fsdb->fsdb_mutex);
3798         RETURN(rc);
3799 }
3800
3801 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3802 {
3803         struct llog_ctxt        *ctxt;
3804         int                      rc = 0;
3805
3806         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3807         if (ctxt == NULL) {
3808                 CERROR("%s: MGS config context doesn't exist\n",
3809                        mgs->mgs_obd->obd_name);
3810                 rc = -ENODEV;
3811         } else {
3812                 rc = llog_erase(env, ctxt, NULL, name);
3813                 /* llog may not exist */
3814                 if (rc == -ENOENT)
3815                         rc = 0;
3816                 llog_ctxt_put(ctxt);
3817         }
3818
3819         if (rc)
3820                 CERROR("%s: failed to clear log %s: %d\n",
3821                        mgs->mgs_obd->obd_name, name, rc);
3822
3823         return rc;
3824 }
3825
3826 /* erase all logs for the given fs */
3827 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3828 {
3829         struct fs_db *fsdb;
3830         struct list_head log_list;
3831         struct mgs_direntry *dirent, *n;
3832         int rc, len = strlen(fsname);
3833         char *suffix;
3834         ENTRY;
3835
3836         /* Find all the logs in the CONFIGS directory */
3837         rc = class_dentry_readdir(env, mgs, &log_list);
3838         if (rc)
3839                 RETURN(rc);
3840
3841         mutex_lock(&mgs->mgs_mutex);
3842
3843         /* Delete the fs db */
3844         fsdb = mgs_find_fsdb(mgs, fsname);
3845         if (fsdb)
3846                 mgs_free_fsdb(mgs, fsdb);
3847
3848         mutex_unlock(&mgs->mgs_mutex);
3849
3850         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3851                 list_del_init(&dirent->mde_list);
3852                 suffix = strrchr(dirent->mde_name, '-');
3853                 if (suffix != NULL) {
3854                         if ((len == suffix - dirent->mde_name) &&
3855                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3856                                 CDEBUG(D_MGS, "Removing log %s\n",
3857                                        dirent->mde_name);
3858                                 mgs_erase_log(env, mgs, dirent->mde_name);
3859                         }
3860                 }
3861                 mgs_direntry_free(dirent);
3862         }
3863
3864         RETURN(rc);
3865 }
3866
3867 /* list all logs for the given fs */
3868 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3869                   struct obd_ioctl_data *data)
3870 {
3871         struct list_head         log_list;
3872         struct mgs_direntry     *dirent, *n;
3873         char                    *out, *suffix;
3874         int                      l, remains, rc;
3875
3876         ENTRY;
3877
3878         /* Find all the logs in the CONFIGS directory */
3879         rc = class_dentry_readdir(env, mgs, &log_list);
3880         if (rc)
3881                 RETURN(rc);
3882
3883         out = data->ioc_bulk;
3884         remains = data->ioc_inllen1;
3885         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3886                 list_del_init(&dirent->mde_list);
3887                 suffix = strrchr(dirent->mde_name, '-');
3888                 if (suffix != NULL) {
3889                         l = snprintf(out, remains, "config log: $%s\n",
3890                                      dirent->mde_name);
3891                         out += l;
3892                         remains -= l;
3893                 }
3894                 mgs_direntry_free(dirent);
3895                 if (remains <= 0)
3896                         break;
3897         }
3898         RETURN(rc);
3899 }
3900
3901 /* from llog_swab */
3902 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3903 {
3904         int i;
3905         ENTRY;
3906
3907         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3908         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3909
3910         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3911         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3912         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3913         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3914
3915         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3916         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3917                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3918                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3919                                i, lcfg->lcfg_buflens[i],
3920                                lustre_cfg_string(lcfg, i));
3921                 }
3922         EXIT;
3923 }
3924
3925 /* Setup _mgs fsdb and log
3926  */
3927 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3928                           struct fs_db *fsdb)
3929 {
3930         int                     rc;
3931         ENTRY;
3932
3933         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
3934
3935         RETURN(rc);
3936 }
3937
3938 /* Setup params fsdb and log
3939  */
3940 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3941                           struct fs_db *fsdb)
3942 {
3943         struct llog_handle      *params_llh = NULL;
3944         int                     rc;
3945         ENTRY;
3946
3947         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3948         if (fsdb != NULL) {
3949                 mutex_lock(&fsdb->fsdb_mutex);
3950                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3951                 if (rc == 0)
3952                         rc = record_end_log(env, &params_llh);
3953                 mutex_unlock(&fsdb->fsdb_mutex);
3954         }
3955
3956         RETURN(rc);
3957 }
3958
3959 /* Cleanup params fsdb and log
3960  */
3961 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3962 {
3963         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3964 }
3965
3966 /* Set a permanent (config log) param for a target or fs
3967  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3968  *             buf1 contains the single parameter
3969  */
3970 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3971                  struct lustre_cfg *lcfg, char *fsname)
3972 {
3973         struct fs_db *fsdb;
3974         struct mgs_target_info *mti;
3975         char *devname, *param;
3976         char *ptr;
3977         const char *tmp;
3978         __u32 index;
3979         int rc = 0;
3980         ENTRY;
3981
3982         print_lustre_cfg(lcfg);
3983
3984         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3985         devname = lustre_cfg_string(lcfg, 0);
3986         param = lustre_cfg_string(lcfg, 1);
3987         if (!devname) {
3988                 /* Assume device name embedded in param:
3989                    lustre-OST0000.osc.max_dirty_mb=32 */
3990                 ptr = strchr(param, '.');
3991                 if (ptr) {
3992                         devname = param;
3993                         *ptr = 0;
3994                         param = ptr + 1;
3995                 }
3996         }
3997         if (!devname) {
3998                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3999                 RETURN(-ENOSYS);
4000         }
4001
4002         rc = mgs_parse_devname(devname, fsname, NULL);
4003         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
4004                 /* param related to llite isn't allowed to set by OST or MDT */
4005                 if (rc == 0 && strncmp(param, PARAM_LLITE,
4006                                        sizeof(PARAM_LLITE) - 1) == 0)
4007                         RETURN(-EINVAL);
4008         } else {
4009                 /* assume devname is the fsname */
4010                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
4011         }
4012         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
4013
4014         rc = mgs_find_or_make_fsdb(env, mgs,
4015                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
4016                                    PARAMS_FILENAME : fsname, &fsdb);
4017         if (rc)
4018                 RETURN(rc);
4019
4020         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
4021             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
4022             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4023                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
4024                        "is '%s'\n", fsname, devname);
4025                 mgs_free_fsdb(mgs, fsdb);
4026                 RETURN(-EINVAL);
4027         }
4028
4029         /* Create a fake mti to hold everything */
4030         OBD_ALLOC_PTR(mti);
4031         if (!mti)
4032                 GOTO(out, rc = -ENOMEM);
4033         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
4034             >= sizeof(mti->mti_fsname))
4035                 GOTO(out, rc = -E2BIG);
4036         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
4037             >= sizeof(mti->mti_svname))
4038                 GOTO(out, rc = -E2BIG);
4039         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
4040             >= sizeof(mti->mti_params))
4041                 GOTO(out, rc = -E2BIG);
4042         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
4043         if (rc < 0)
4044                 /* Not a valid server; may be only fsname */
4045                 rc = 0;
4046         else
4047                 /* Strip -osc or -mdc suffix from svname */
4048                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
4049                                      mti->mti_svname))
4050                         GOTO(out, rc = -EINVAL);
4051         /*
4052          * Revoke lock so everyone updates.  Should be alright if
4053          * someone was already reading while we were updating the logs,
4054          * so we don't really need to hold the lock while we're
4055          * writing (above).
4056          */
4057         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
4058                 mti->mti_flags = rc | LDD_F_PARAM2;
4059                 mutex_lock(&fsdb->fsdb_mutex);
4060                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
4061                 mutex_unlock(&fsdb->fsdb_mutex);
4062                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
4063         } else {
4064                 mti->mti_flags = rc | LDD_F_PARAM;
4065                 mutex_lock(&fsdb->fsdb_mutex);
4066                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
4067                 mutex_unlock(&fsdb->fsdb_mutex);
4068                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4069         }
4070
4071 out:
4072         OBD_FREE_PTR(mti);
4073         RETURN(rc);
4074 }
4075
4076 static int mgs_write_log_pool(const struct lu_env *env,
4077                               struct mgs_device *mgs, char *logname,
4078                               struct fs_db *fsdb, char *tgtname,
4079                               enum lcfg_command_type cmd,
4080                               char *fsname, char *poolname,
4081                               char *ostname, char *comment)
4082 {
4083         struct llog_handle *llh = NULL;
4084         int rc;
4085
4086         rc = record_start_log(env, mgs, &llh, logname);
4087         if (rc)
4088                 return rc;
4089         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
4090         if (rc)
4091                 goto out;
4092         rc = record_base(env, llh, tgtname, 0, cmd,
4093                          fsname, poolname, ostname, NULL);
4094         if (rc)
4095                 goto out;
4096         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
4097 out:
4098         record_end_log(env, &llh);
4099         return rc;
4100 }
4101
4102 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
4103                     enum lcfg_command_type cmd, const char *nodemap_name,
4104                     char *param)
4105 {
4106         lnet_nid_t      nid[2];
4107         __u32           idmap[2];
4108         bool            bool_switch;
4109         __u32           int_id;
4110         int             rc = 0;
4111         ENTRY;
4112
4113         switch (cmd) {
4114         case LCFG_NODEMAP_ADD:
4115                 rc = nodemap_add(nodemap_name);
4116                 break;
4117         case LCFG_NODEMAP_DEL:
4118                 rc = nodemap_del(nodemap_name);
4119                 break;
4120         case LCFG_NODEMAP_ADD_RANGE:
4121                 rc = nodemap_parse_range(param, nid);
4122                 if (rc != 0)
4123                         break;
4124                 rc = nodemap_add_range(nodemap_name, nid);
4125                 break;
4126         case LCFG_NODEMAP_DEL_RANGE:
4127                 rc = nodemap_parse_range(param, nid);
4128                 if (rc != 0)
4129                         break;
4130                 rc = nodemap_del_range(nodemap_name, nid);
4131                 break;
4132         case LCFG_NODEMAP_ADMIN:
4133                 bool_switch = simple_strtoul(param, NULL, 10);
4134                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4135                 break;
4136         case LCFG_NODEMAP_TRUSTED:
4137                 bool_switch = simple_strtoul(param, NULL, 10);
4138                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4139                 break;
4140         case LCFG_NODEMAP_SQUASH_UID:
4141                 int_id = simple_strtoul(param, NULL, 10);
4142                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4143                 break;
4144         case LCFG_NODEMAP_SQUASH_GID:
4145                 int_id = simple_strtoul(param, NULL, 10);
4146                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4147                 break;
4148         case LCFG_NODEMAP_ADD_UIDMAP:
4149         case LCFG_NODEMAP_ADD_GIDMAP:
4150                 rc = nodemap_parse_idmap(param, idmap);
4151                 if (rc != 0)
4152                         break;
4153                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4154                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4155                                                idmap);
4156                 else
4157                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4158                                                idmap);
4159                 break;
4160         case LCFG_NODEMAP_DEL_UIDMAP:
4161         case LCFG_NODEMAP_DEL_GIDMAP:
4162                 rc = nodemap_parse_idmap(param, idmap);
4163                 if (rc != 0)
4164                         break;
4165                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4166                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4167                                                idmap);
4168                 else
4169                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4170                                                idmap);
4171                 break;
4172         case LCFG_NODEMAP_SET_FILESET:
4173                 rc = nodemap_set_fileset(nodemap_name, param);
4174                 break;
4175         default:
4176                 rc = -EINVAL;
4177         }
4178
4179         RETURN(rc);
4180 }
4181
4182 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4183                  enum lcfg_command_type cmd, char *fsname,
4184                  char *poolname, char *ostname)
4185 {
4186         struct fs_db *fsdb;
4187         char *lovname;
4188         char *logname;
4189         char *label = NULL, *canceled_label = NULL;
4190         int label_sz;
4191         struct mgs_target_info *mti = NULL;
4192         int rc, i;
4193         ENTRY;
4194
4195         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4196         if (rc) {
4197                 CERROR("Can't get db for %s\n", fsname);
4198                 RETURN(rc);
4199         }
4200         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4201                 CERROR("%s is not defined\n", fsname);
4202                 mgs_free_fsdb(mgs, fsdb);
4203                 RETURN(-EINVAL);
4204         }
4205
4206         label_sz = 10 + strlen(fsname) + strlen(poolname);
4207
4208         /* check if ostname match fsname */
4209         if (ostname != NULL) {
4210                 char *ptr;
4211
4212                 ptr = strrchr(ostname, '-');
4213                 if ((ptr == NULL) ||
4214                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4215                         RETURN(-EINVAL);
4216                 label_sz += strlen(ostname);
4217         }
4218
4219         OBD_ALLOC(label, label_sz);
4220         if (label == NULL)
4221                 RETURN(-ENOMEM);
4222
4223         switch(cmd) {
4224         case LCFG_POOL_NEW:
4225                 sprintf(label,
4226                         "new %s.%s", fsname, poolname);
4227                 break;
4228         case LCFG_POOL_ADD:
4229                 sprintf(label,
4230                         "add %s.%s.%s", fsname, poolname, ostname);
4231                 break;
4232         case LCFG_POOL_REM:
4233                 OBD_ALLOC(canceled_label, label_sz);
4234                 if (canceled_label == NULL)
4235                         GOTO(out_label, rc = -ENOMEM);
4236                 sprintf(label,
4237                         "rem %s.%s.%s", fsname, poolname, ostname);
4238                 sprintf(canceled_label,
4239                         "add %s.%s.%s", fsname, poolname, ostname);
4240                 break;
4241         case LCFG_POOL_DEL:
4242                 OBD_ALLOC(canceled_label, label_sz);
4243                 if (canceled_label == NULL)
4244                         GOTO(out_label, rc = -ENOMEM);
4245                 sprintf(label,
4246                         "del %s.%s", fsname, poolname);
4247                 sprintf(canceled_label,
4248                         "new %s.%s", fsname, poolname);
4249                 break;
4250         default:
4251                 break;
4252         }
4253
4254         if (canceled_label != NULL) {
4255                 OBD_ALLOC_PTR(mti);
4256                 if (mti == NULL)
4257                         GOTO(out_cancel, rc = -ENOMEM);
4258         }
4259
4260         mutex_lock(&fsdb->fsdb_mutex);
4261         /* write pool def to all MDT logs */
4262         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4263                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4264                         rc = name_create_mdt_and_lov(&logname, &lovname,
4265                                                      fsdb, i);
4266                         if (rc) {
4267                                 mutex_unlock(&fsdb->fsdb_mutex);
4268                                 GOTO(out_mti, rc);
4269                         }
4270                         if (canceled_label != NULL) {
4271                                 strcpy(mti->mti_svname, "lov pool");
4272                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4273                                                 lovname, canceled_label,
4274                                                 CM_SKIP);
4275                         }
4276
4277                         if (rc >= 0)
4278                                 rc = mgs_write_log_pool(env, mgs, logname,
4279                                                         fsdb, lovname, cmd,
4280                                                         fsname, poolname,
4281                                                         ostname, label);
4282                         name_destroy(&logname);
4283                         name_destroy(&lovname);
4284                         if (rc) {
4285                                 mutex_unlock(&fsdb->fsdb_mutex);
4286                                 GOTO(out_mti, rc);
4287                         }
4288                 }
4289         }
4290
4291         rc = name_create(&logname, fsname, "-client");
4292         if (rc) {
4293                 mutex_unlock(&fsdb->fsdb_mutex);
4294                 GOTO(out_mti, rc);
4295         }
4296         if (canceled_label != NULL) {
4297                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4298                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4299                 if (rc < 0) {
4300                         mutex_unlock(&fsdb->fsdb_mutex);
4301                         name_destroy(&logname);
4302                         GOTO(out_mti, rc);
4303                 }
4304         }
4305
4306         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4307                                 cmd, fsname, poolname, ostname, label);
4308         mutex_unlock(&fsdb->fsdb_mutex);
4309         name_destroy(&logname);
4310         /* request for update */
4311         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4312
4313         EXIT;
4314 out_mti:
4315         if (mti != NULL)
4316                 OBD_FREE_PTR(mti);
4317 out_cancel:
4318         if (canceled_label != NULL)
4319                 OBD_FREE(canceled_label, label_sz);
4320 out_label:
4321         OBD_FREE(label, label_sz);
4322         return rc;
4323 }