Whamcloud - gitweb
7252ee582c620578c64a05cdcc900d69eb7be650
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(ERR_PTR(-EINVAL));
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(ERR_PTR(-ENOMEM));
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351                 fsdb->fsdb_mgs = mgs;
352         } else {
353                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
354                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
355                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
356                         CERROR("No memory for index maps\n");
357                         GOTO(err, rc = -ENOMEM);
358                 }
359
360                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
361                 if (rc)
362                         GOTO(err, rc);
363                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
364                 if (rc)
365                         GOTO(err, rc);
366
367                 /* initialise data for NID table */
368                 mgs_ir_init_fs(env, mgs, fsdb);
369
370                 lproc_mgs_add_live(mgs, fsdb);
371         }
372
373         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
374
375         RETURN(fsdb);
376 err:
377         if (fsdb->fsdb_ost_index_map)
378                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
379         if (fsdb->fsdb_mdt_index_map)
380                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
381         name_destroy(&fsdb->fsdb_clilov);
382         name_destroy(&fsdb->fsdb_clilmv);
383         OBD_FREE_PTR(fsdb);
384         RETURN(ERR_PTR(rc));
385 }
386
387 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
388 {
389         /* wait for anyone with the sem */
390         mutex_lock(&fsdb->fsdb_mutex);
391         lproc_mgs_del_live(mgs, fsdb);
392         list_del(&fsdb->fsdb_list);
393
394         /* deinitialize fsr */
395         mgs_ir_fini_fs(mgs, fsdb);
396
397         if (fsdb->fsdb_ost_index_map)
398                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
399         if (fsdb->fsdb_mdt_index_map)
400                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
401         name_destroy(&fsdb->fsdb_clilov);
402         name_destroy(&fsdb->fsdb_clilmv);
403         mgs_free_fsdb_srpc(fsdb);
404         mutex_unlock(&fsdb->fsdb_mutex);
405         OBD_FREE_PTR(fsdb);
406 }
407
408 int mgs_init_fsdb_list(struct mgs_device *mgs)
409 {
410         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
411         return 0;
412 }
413
414 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
415 {
416         struct fs_db *fsdb;
417         struct list_head *tmp, *tmp2;
418
419         mutex_lock(&mgs->mgs_mutex);
420         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
421                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
422                 mgs_free_fsdb(mgs, fsdb);
423         }
424         mutex_unlock(&mgs->mgs_mutex);
425         return 0;
426 }
427
428 int mgs_find_or_make_fsdb(const struct lu_env *env,
429                           struct mgs_device *mgs, char *name,
430                           struct fs_db **dbh)
431 {
432         struct fs_db *fsdb;
433         int rc = 0;
434
435         ENTRY;
436         mutex_lock(&mgs->mgs_mutex);
437         fsdb = mgs_find_fsdb(mgs, name);
438         if (fsdb) {
439                 mutex_unlock(&mgs->mgs_mutex);
440                 *dbh = fsdb;
441                 RETURN(0);
442         }
443
444         CDEBUG(D_MGS, "Creating new db\n");
445         fsdb = mgs_new_fsdb(env, mgs, name);
446         /* lock fsdb_mutex until the db is loaded from llogs */
447         if (!IS_ERR(fsdb))
448                 mutex_lock(&fsdb->fsdb_mutex);
449         mutex_unlock(&mgs->mgs_mutex);
450         if (IS_ERR(fsdb))
451                 RETURN(PTR_ERR(fsdb));
452
453         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
454                 /* populate the db from the client llog */
455                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
456                 if (rc) {
457                         CERROR("Can't get db from client log %d\n", rc);
458                         GOTO(out_free, rc);
459                 }
460         }
461
462         /* populate srpc rules from params llog */
463         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
464         if (rc) {
465                 CERROR("Can't get db from params log %d\n", rc);
466                 GOTO(out_free, rc);
467         }
468
469         mutex_unlock(&fsdb->fsdb_mutex);
470         *dbh = fsdb;
471
472         RETURN(0);
473
474 out_free:
475         mutex_unlock(&fsdb->fsdb_mutex);
476         mgs_free_fsdb(mgs, fsdb);
477         return rc;
478 }
479
480 /* 1 = index in use
481    0 = index unused
482    -1= empty client log */
483 int mgs_check_index(const struct lu_env *env,
484                     struct mgs_device *mgs,
485                     struct mgs_target_info *mti)
486 {
487         struct fs_db *fsdb;
488         void *imap;
489         int rc = 0;
490         ENTRY;
491
492         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
493
494         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
495         if (rc) {
496                 CERROR("Can't get db for %s\n", mti->mti_fsname);
497                 RETURN(rc);
498         }
499
500         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
501                 RETURN(-1);
502
503         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
504                 imap = fsdb->fsdb_ost_index_map;
505         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
506                 imap = fsdb->fsdb_mdt_index_map;
507         else
508                 RETURN(-EINVAL);
509
510         if (test_bit(mti->mti_stripe_index, imap))
511                 RETURN(1);
512         RETURN(0);
513 }
514
515 static __inline__ int next_index(void *index_map, int map_len)
516 {
517         int i;
518         for (i = 0; i < map_len * 8; i++)
519                 if (!test_bit(i, index_map)) {
520                          return i;
521                  }
522         CERROR("max index %d exceeded.\n", i);
523         return -1;
524 }
525
526 /* Return codes:
527         0  newly marked as in use
528         <0 err
529         +EALREADY for update of an old index */
530 static int mgs_set_index(const struct lu_env *env,
531                          struct mgs_device *mgs,
532                          struct mgs_target_info *mti)
533 {
534         struct fs_db *fsdb;
535         void *imap;
536         int rc = 0;
537         ENTRY;
538
539         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
540         if (rc) {
541                 CERROR("Can't get db for %s\n", mti->mti_fsname);
542                 RETURN(rc);
543         }
544
545         mutex_lock(&fsdb->fsdb_mutex);
546         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
547                 imap = fsdb->fsdb_ost_index_map;
548         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
549                 imap = fsdb->fsdb_mdt_index_map;
550         } else {
551                 GOTO(out_up, rc = -EINVAL);
552         }
553
554         if (mti->mti_flags & LDD_F_NEED_INDEX) {
555                 rc = next_index(imap, INDEX_MAP_SIZE);
556                 if (rc == -1)
557                         GOTO(out_up, rc = -ERANGE);
558                 mti->mti_stripe_index = rc;
559                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
560                         fsdb->fsdb_mdt_count ++;
561         }
562
563         /* the last index(0xffff) is reserved for default value. */
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
566                                    "but index must be less than %u.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8 - 1);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 rc = llog_write(env, llh, rec, rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_free, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_free, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712
713 out_free:
714         OBD_FREE_PTR(mml);
715
716 out_close:
717         llog_close(env, loghandle);
718 out_pop:
719         if (rc < 0)
720                 CERROR("%s: modify %s/%s failed: rc = %d\n",
721                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
722         llog_ctxt_put(ctxt);
723         RETURN(rc);
724 }
725
726 /** This structure is passed to mgs_replace_handler */
727 struct mgs_replace_uuid_lookup {
728         /* Nids are replaced for this target device */
729         struct mgs_target_info target;
730         /* Temporary modified llog */
731         struct llog_handle *temp_llh;
732         /* Flag is set if in target block*/
733         int in_target_device;
734         /* Nids already added. Just skip (multiple nids) */
735         int device_nids_added;
736         /* Flag is set if this block should not be copied */
737         int skip_it;
738 };
739
740 /**
741  * Check: a) if block should be skipped
742  * b) is it target block
743  *
744  * \param[in] lcfg
745  * \param[in] mrul
746  *
747  * \retval 0 should not to be skipped
748  * \retval 1 should to be skipped
749  */
750 static int check_markers(struct lustre_cfg *lcfg,
751                          struct mgs_replace_uuid_lookup *mrul)
752 {
753          struct cfg_marker *marker;
754
755         /* Track markers. Find given device */
756         if (lcfg->lcfg_command == LCFG_MARKER) {
757                 marker = lustre_cfg_buf(lcfg, 1);
758                 /* Clean llog from records marked as CM_EXCLUDE.
759                    CM_SKIP records are used for "active" command
760                    and can be restored if needed */
761                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
762                     (CM_EXCLUDE | CM_START)) {
763                         mrul->skip_it = 1;
764                         return 1;
765                 }
766
767                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
768                     (CM_EXCLUDE | CM_END)) {
769                         mrul->skip_it = 0;
770                         return 1;
771                 }
772
773                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
774                         LASSERT(!(marker->cm_flags & CM_START) ||
775                                 !(marker->cm_flags & CM_END));
776                         if (marker->cm_flags & CM_START) {
777                                 mrul->in_target_device = 1;
778                                 mrul->device_nids_added = 0;
779                         } else if (marker->cm_flags & CM_END)
780                                 mrul->in_target_device = 0;
781                 }
782         }
783
784         return 0;
785 }
786
787 static int record_base(const struct lu_env *env, struct llog_handle *llh,
788                      char *cfgname, lnet_nid_t nid, int cmd,
789                      char *s1, char *s2, char *s3, char *s4)
790 {
791         struct mgs_thread_info  *mgi = mgs_env_info(env);
792         struct llog_cfg_rec     *lcr;
793         int rc;
794
795         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
796                cmd, s1, s2, s3, s4);
797
798         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
799         if (s1)
800                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
801         if (s2)
802                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
803         if (s3)
804                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
805         if (s4)
806                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
807
808         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
809         if (lcr == NULL)
810                 return -ENOMEM;
811
812         lcr->lcr_cfg.lcfg_nid = nid;
813         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
814
815         lustre_cfg_rec_free(lcr);
816
817         if (rc < 0)
818                 CDEBUG(D_MGS,
819                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
820                        cfgname, cmd, s1, s2, s3, s4, rc);
821         return rc;
822 }
823
824 static inline int record_add_uuid(const struct lu_env *env,
825                                   struct llog_handle *llh,
826                                   uint64_t nid, char *uuid)
827 {
828         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
829                            NULL, NULL, NULL);
830 }
831
832 static inline int record_add_conn(const struct lu_env *env,
833                                   struct llog_handle *llh,
834                                   char *devname, char *uuid)
835 {
836         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
837                            NULL, NULL, NULL);
838 }
839
840 static inline int record_attach(const struct lu_env *env,
841                                 struct llog_handle *llh, char *devname,
842                                 char *type, char *uuid)
843 {
844         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
845                            NULL, NULL);
846 }
847
848 static inline int record_setup(const struct lu_env *env,
849                                struct llog_handle *llh, char *devname,
850                                char *s1, char *s2, char *s3, char *s4)
851 {
852         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
853 }
854
855 /**
856  * \retval <0 record processing error
857  * \retval n record is processed. No need copy original one.
858  * \retval 0 record is not processed.
859  */
860 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
861                            struct mgs_replace_uuid_lookup *mrul)
862 {
863         int nids_added = 0;
864         lnet_nid_t nid;
865         char *ptr;
866         int rc;
867
868         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
869                 /* LCFG_ADD_UUID command found. Let's skip original command
870                    and add passed nids */
871                 ptr = mrul->target.mti_params;
872                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
873                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
874                                "device %s\n", libcfs_nid2str(nid),
875                                 mrul->target.mti_params,
876                                 mrul->target.mti_svname);
877                         rc = record_add_uuid(env,
878                                              mrul->temp_llh, nid,
879                                              mrul->target.mti_params);
880                         if (!rc)
881                                 nids_added++;
882                 }
883
884                 if (nids_added == 0) {
885                         CERROR("No new nids were added, nid %s with uuid %s, "
886                                "device %s\n", libcfs_nid2str(nid),
887                                mrul->target.mti_params,
888                                mrul->target.mti_svname);
889                         RETURN(-ENXIO);
890                 } else {
891                         mrul->device_nids_added = 1;
892                 }
893
894                 return nids_added;
895         }
896
897         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
898                 /* LCFG_SETUP command found. UUID should be changed */
899                 rc = record_setup(env,
900                                   mrul->temp_llh,
901                                   /* devname the same */
902                                   lustre_cfg_string(lcfg, 0),
903                                   /* s1 is not changed */
904                                   lustre_cfg_string(lcfg, 1),
905                                   /* new uuid should be
906                                   the full nidlist */
907                                   mrul->target.mti_params,
908                                   /* s3 is not changed */
909                                   lustre_cfg_string(lcfg, 3),
910                                   /* s4 is not changed */
911                                   lustre_cfg_string(lcfg, 4));
912                 return rc ? rc : 1;
913         }
914
915         /* Another commands in target device block */
916         return 0;
917 }
918
919 /**
920  * Handler that called for every record in llog.
921  * Records are processed in order they placed in llog.
922  *
923  * \param[in] llh       log to be processed
924  * \param[in] rec       current record
925  * \param[in] data      mgs_replace_uuid_lookup structure
926  *
927  * \retval 0    success
928  */
929 static int mgs_replace_handler(const struct lu_env *env,
930                                struct llog_handle *llh,
931                                struct llog_rec_hdr *rec,
932                                void *data)
933 {
934         struct mgs_replace_uuid_lookup *mrul;
935         struct lustre_cfg *lcfg = REC_DATA(rec);
936         int cfg_len = REC_DATA_LEN(rec);
937         int rc;
938         ENTRY;
939
940         mrul = (struct mgs_replace_uuid_lookup *)data;
941
942         if (rec->lrh_type != OBD_CFG_REC) {
943                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
944                        rec->lrh_type, lcfg->lcfg_command,
945                        lustre_cfg_string(lcfg, 0),
946                        lustre_cfg_string(lcfg, 1));
947                 RETURN(-EINVAL);
948         }
949
950         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
951         if (rc) {
952                 /* Do not copy any invalidated records */
953                 GOTO(skip_out, rc = 0);
954         }
955
956         rc = check_markers(lcfg, mrul);
957         if (rc || mrul->skip_it)
958                 GOTO(skip_out, rc = 0);
959
960         /* Write to new log all commands outside target device block */
961         if (!mrul->in_target_device)
962                 GOTO(copy_out, rc = 0);
963
964         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
965            (failover nids) for this target, assuming that if then
966            primary is changing then so is the failover */
967         if (mrul->device_nids_added &&
968             (lcfg->lcfg_command == LCFG_ADD_UUID ||
969              lcfg->lcfg_command == LCFG_ADD_CONN))
970                 GOTO(skip_out, rc = 0);
971
972         rc = process_command(env, lcfg, mrul);
973         if (rc < 0)
974                 RETURN(rc);
975
976         if (rc)
977                 RETURN(0);
978 copy_out:
979         /* Record is placed in temporary llog as is */
980         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
981
982         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
983                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
984                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
985         RETURN(rc);
986
987 skip_out:
988         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
989                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
990                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
991         RETURN(rc);
992 }
993
994 static int mgs_log_is_empty(const struct lu_env *env,
995                             struct mgs_device *mgs, char *name)
996 {
997         struct llog_ctxt        *ctxt;
998         int                      rc;
999
1000         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1001         LASSERT(ctxt != NULL);
1002
1003         rc = llog_is_empty(env, ctxt, name);
1004         llog_ctxt_put(ctxt);
1005         return rc;
1006 }
1007
1008 static int mgs_replace_nids_log(const struct lu_env *env,
1009                                 struct obd_device *mgs, struct fs_db *fsdb,
1010                                 char *logname, char *devname, char *nids)
1011 {
1012         struct llog_handle *orig_llh, *backup_llh;
1013         struct llog_ctxt *ctxt;
1014         struct mgs_replace_uuid_lookup *mrul;
1015         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1016         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1017         char *backup;
1018         int rc, rc2;
1019         ENTRY;
1020
1021         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1022
1023         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         LASSERT(ctxt != NULL);
1025
1026         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1027                 /* Log is empty. Nothing to replace */
1028                 GOTO(out_put, rc = 0);
1029         }
1030
1031         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1032         if (backup == NULL)
1033                 GOTO(out_put, rc = -ENOMEM);
1034
1035         sprintf(backup, "%s.bak", logname);
1036
1037         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1038         if (rc == 0) {
1039                 /* Now erase original log file. Connections are not allowed.
1040                    Backup is already saved */
1041                 rc = llog_erase(env, ctxt, NULL, logname);
1042                 if (rc < 0)
1043                         GOTO(out_free, rc);
1044         } else if (rc != -ENOENT) {
1045                 CERROR("%s: can't make backup for %s: rc = %d\n",
1046                        mgs->obd_name, logname, rc);
1047                 GOTO(out_free,rc);
1048         }
1049
1050         /* open local log */
1051         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1052         if (rc)
1053                 GOTO(out_restore, rc);
1054
1055         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1056         if (rc)
1057                 GOTO(out_closel, rc);
1058
1059         /* open backup llog */
1060         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1061                        LLOG_OPEN_EXISTS);
1062         if (rc)
1063                 GOTO(out_closel, rc);
1064
1065         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1066         if (rc)
1067                 GOTO(out_close, rc);
1068
1069         if (llog_get_size(backup_llh) <= 1)
1070                 GOTO(out_close, rc = 0);
1071
1072         OBD_ALLOC_PTR(mrul);
1073         if (!mrul)
1074                 GOTO(out_close, rc = -ENOMEM);
1075         /* devname is only needed information to replace UUID records */
1076         strlcpy(mrul->target.mti_svname, devname,
1077                 sizeof(mrul->target.mti_svname));
1078         /* parse nids later */
1079         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1080         /* Copy records to this temporary llog */
1081         mrul->temp_llh = orig_llh;
1082
1083         rc = llog_process(env, backup_llh, mgs_replace_handler,
1084                           (void *)mrul, NULL);
1085         OBD_FREE_PTR(mrul);
1086 out_close:
1087         rc2 = llog_close(NULL, backup_llh);
1088         if (!rc)
1089                 rc = rc2;
1090 out_closel:
1091         rc2 = llog_close(NULL, orig_llh);
1092         if (!rc)
1093                 rc = rc2;
1094
1095 out_restore:
1096         if (rc) {
1097                 CERROR("%s: llog should be restored: rc = %d\n",
1098                        mgs->obd_name, rc);
1099                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1100                                   logname);
1101                 if (rc2 < 0)
1102                         CERROR("%s: can't restore backup %s: rc = %d\n",
1103                                mgs->obd_name, logname, rc2);
1104         }
1105
1106 out_free:
1107         OBD_FREE(backup, strlen(backup) + 1);
1108
1109 out_put:
1110         llog_ctxt_put(ctxt);
1111
1112         if (rc)
1113                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1114                        mgs->obd_name, logname, rc);
1115
1116         RETURN(rc);
1117 }
1118
1119 /**
1120  * Parse device name and get file system name and/or device index
1121  *
1122  * \param[in]   devname device name (ex. lustre-MDT0000)
1123  * \param[out]  fsname  file system name(optional)
1124  * \param[out]  index   device index(optional)
1125  *
1126  * \retval 0    success
1127  */
1128 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1129 {
1130         int rc;
1131         ENTRY;
1132
1133         /* Extract fsname */
1134         if (fsname) {
1135                 rc = server_name2fsname(devname, fsname, NULL);
1136                 if (rc < 0) {
1137                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1138                                devname);
1139                         RETURN(-EINVAL);
1140                 }
1141         }
1142
1143         if (index) {
1144                 rc = server_name2index(devname, index, NULL);
1145                 if (rc < 0) {
1146                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1147                                devname);
1148                         RETURN(-EINVAL);
1149                 }
1150         }
1151
1152         RETURN(0);
1153 }
1154
1155 /* This is only called during replace_nids */
1156 static int only_mgs_is_running(struct obd_device *mgs_obd)
1157 {
1158         /* TDB: Is global variable with devices count exists? */
1159         int num_devices = get_devices_count();
1160         int num_exports = 0;
1161         struct obd_export *exp;
1162
1163         spin_lock(&mgs_obd->obd_dev_lock);
1164         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1165                 /* skip self export */
1166                 if (exp == mgs_obd->obd_self_export)
1167                         continue;
1168                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1169                         continue;
1170
1171                 ++num_exports;
1172
1173                 CERROR("%s: node %s still connected during replace_nids "
1174                        "connect_flags:%llx\n",
1175                        mgs_obd->obd_name,
1176                        libcfs_nid2str(exp->exp_nid_stats->nid),
1177                        exp_connect_flags(exp));
1178
1179         }
1180         spin_unlock(&mgs_obd->obd_dev_lock);
1181
1182         /* osd, MGS and MGC + self_export
1183            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1184         return (num_devices <= 3) && (num_exports == 0);
1185 }
1186
1187 static int name_create_mdt(char **logname, char *fsname, int i)
1188 {
1189         char mdt_index[9];
1190
1191         sprintf(mdt_index, "-MDT%04x", i);
1192         return name_create(logname, fsname, mdt_index);
1193 }
1194
1195 /**
1196  * Replace nids for \a device to \a nids values
1197  *
1198  * \param obd           MGS obd device
1199  * \param devname       nids need to be replaced for this device
1200  * (ex. lustre-OST0000)
1201  * \param nids          nids list (ex. nid1,nid2,nid3)
1202  *
1203  * \retval 0    success
1204  */
1205 int mgs_replace_nids(const struct lu_env *env,
1206                      struct mgs_device *mgs,
1207                      char *devname, char *nids)
1208 {
1209         /* Assume fsname is part of device name */
1210         char fsname[MTI_NAME_MAXLEN];
1211         int rc;
1212         __u32 index;
1213         char *logname;
1214         struct fs_db *fsdb;
1215         unsigned int i;
1216         int conn_state;
1217         struct obd_device *mgs_obd = mgs->mgs_obd;
1218         ENTRY;
1219
1220         /* We can only change NIDs if no other nodes are connected */
1221         spin_lock(&mgs_obd->obd_dev_lock);
1222         conn_state = mgs_obd->obd_no_conn;
1223         mgs_obd->obd_no_conn = 1;
1224         spin_unlock(&mgs_obd->obd_dev_lock);
1225
1226         /* We can not change nids if not only MGS is started */
1227         if (!only_mgs_is_running(mgs_obd)) {
1228                 CERROR("Only MGS is allowed to be started\n");
1229                 GOTO(out, rc = -EINPROGRESS);
1230         }
1231
1232         /* Get fsname and index*/
1233         rc = mgs_parse_devname(devname, fsname, &index);
1234         if (rc)
1235                 GOTO(out, rc);
1236
1237         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1238         if (rc) {
1239                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1240                 GOTO(out, rc);
1241         }
1242
1243         /* Process client llogs */
1244         name_create(&logname, fsname, "-client");
1245         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1246         name_destroy(&logname);
1247         if (rc) {
1248                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1249                        fsname, devname, rc);
1250                 GOTO(out, rc);
1251         }
1252
1253         /* Process MDT llogs */
1254         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1255                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1256                         continue;
1257                 name_create_mdt(&logname, fsname, i);
1258                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1259                 name_destroy(&logname);
1260                 if (rc)
1261                         GOTO(out, rc);
1262         }
1263
1264 out:
1265         spin_lock(&mgs_obd->obd_dev_lock);
1266         mgs_obd->obd_no_conn = conn_state;
1267         spin_unlock(&mgs_obd->obd_dev_lock);
1268
1269         RETURN(rc);
1270 }
1271
1272 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1273                             char *devname, struct lov_desc *desc)
1274 {
1275         struct mgs_thread_info  *mgi = mgs_env_info(env);
1276         struct llog_cfg_rec     *lcr;
1277         int rc;
1278
1279         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1280         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1281         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1282         if (lcr == NULL)
1283                 return -ENOMEM;
1284
1285         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1286         lustre_cfg_rec_free(lcr);
1287         return rc;
1288 }
1289
1290 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1291                             char *devname, struct lmv_desc *desc)
1292 {
1293         struct mgs_thread_info  *mgi = mgs_env_info(env);
1294         struct llog_cfg_rec     *lcr;
1295         int rc;
1296
1297         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1298         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1299         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1300         if (lcr == NULL)
1301                 return -ENOMEM;
1302
1303         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1304         lustre_cfg_rec_free(lcr);
1305         return rc;
1306 }
1307
1308 static inline int record_mdc_add(const struct lu_env *env,
1309                                  struct llog_handle *llh,
1310                                  char *logname, char *mdcuuid,
1311                                  char *mdtuuid, char *index,
1312                                  char *gen)
1313 {
1314         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1315                            mdtuuid,index,gen,mdcuuid);
1316 }
1317
1318 static inline int record_lov_add(const struct lu_env *env,
1319                                  struct llog_handle *llh,
1320                                  char *lov_name, char *ost_uuid,
1321                                  char *index, char *gen)
1322 {
1323         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1324                            ost_uuid, index, gen, NULL);
1325 }
1326
1327 static inline int record_mount_opt(const struct lu_env *env,
1328                                    struct llog_handle *llh,
1329                                    char *profile, char *lov_name,
1330                                    char *mdc_name)
1331 {
1332         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1333                            profile, lov_name, mdc_name, NULL);
1334 }
1335
1336 static int record_marker(const struct lu_env *env,
1337                          struct llog_handle *llh,
1338                          struct fs_db *fsdb, __u32 flags,
1339                          char *tgtname, char *comment)
1340 {
1341         struct mgs_thread_info *mgi = mgs_env_info(env);
1342         struct llog_cfg_rec *lcr;
1343         int rc;
1344         int cplen = 0;
1345
1346         if (flags & CM_START)
1347                 fsdb->fsdb_gen++;
1348         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1349         mgi->mgi_marker.cm_flags = flags;
1350         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1351         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1352                         sizeof(mgi->mgi_marker.cm_tgtname));
1353         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1354                 return -E2BIG;
1355         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1356                         sizeof(mgi->mgi_marker.cm_comment));
1357         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1358                 return -E2BIG;
1359         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1360         mgi->mgi_marker.cm_canceltime = 0;
1361         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1362         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1363                             sizeof(mgi->mgi_marker));
1364         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1365         if (lcr == NULL)
1366                 return -ENOMEM;
1367
1368         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1369         lustre_cfg_rec_free(lcr);
1370         return rc;
1371 }
1372
1373 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1374                             struct llog_handle **llh, char *name)
1375 {
1376         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1377         struct llog_ctxt        *ctxt;
1378         int                      rc = 0;
1379         ENTRY;
1380
1381         if (*llh)
1382                 GOTO(out, rc = -EBUSY);
1383
1384         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1385         if (!ctxt)
1386                 GOTO(out, rc = -ENODEV);
1387         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1388
1389         rc = llog_open_create(env, ctxt, llh, NULL, name);
1390         if (rc)
1391                 GOTO(out_ctxt, rc);
1392         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1393         if (rc)
1394                 llog_close(env, *llh);
1395 out_ctxt:
1396         llog_ctxt_put(ctxt);
1397 out:
1398         if (rc) {
1399                 CERROR("%s: can't start log %s: rc = %d\n",
1400                        mgs->mgs_obd->obd_name, name, rc);
1401                 *llh = NULL;
1402         }
1403         RETURN(rc);
1404 }
1405
1406 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1407 {
1408         int rc;
1409
1410         rc = llog_close(env, *llh);
1411         *llh = NULL;
1412
1413         return rc;
1414 }
1415
1416 /******************** config "macros" *********************/
1417
1418 /* write an lcfg directly into a log (with markers) */
1419 static int mgs_write_log_direct(const struct lu_env *env,
1420                                 struct mgs_device *mgs, struct fs_db *fsdb,
1421                                 char *logname, struct llog_cfg_rec *lcr,
1422                                 char *devname, char *comment)
1423 {
1424         struct llog_handle *llh = NULL;
1425         int rc;
1426
1427         ENTRY;
1428
1429         rc = record_start_log(env, mgs, &llh, logname);
1430         if (rc)
1431                 RETURN(rc);
1432
1433         /* FIXME These should be a single journal transaction */
1434         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1435         if (rc)
1436                 GOTO(out_end, rc);
1437         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1438         if (rc)
1439                 GOTO(out_end, rc);
1440         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1441         if (rc)
1442                 GOTO(out_end, rc);
1443 out_end:
1444         record_end_log(env, &llh);
1445         RETURN(rc);
1446 }
1447
1448 /* write the lcfg in all logs for the given fs */
1449 static int mgs_write_log_direct_all(const struct lu_env *env,
1450                                     struct mgs_device *mgs,
1451                                     struct fs_db *fsdb,
1452                                     struct mgs_target_info *mti,
1453                                     struct llog_cfg_rec *lcr, char *devname,
1454                                     char *comment, int server_only)
1455 {
1456         struct list_head         log_list;
1457         struct mgs_direntry     *dirent, *n;
1458         char                    *fsname = mti->mti_fsname;
1459         int                      rc = 0, len = strlen(fsname);
1460
1461         ENTRY;
1462         /* Find all the logs in the CONFIGS directory */
1463         rc = class_dentry_readdir(env, mgs, &log_list);
1464         if (rc)
1465                 RETURN(rc);
1466
1467         /* Could use fsdb index maps instead of directory listing */
1468         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1469                 list_del_init(&dirent->mde_list);
1470                 /* don't write to sptlrpc rule log */
1471                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1472                         goto next;
1473
1474                 /* caller wants write server logs only */
1475                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1476                         goto next;
1477
1478                 if (strlen(dirent->mde_name) <= len ||
1479                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1480                     dirent->mde_name[len] != '-')
1481                         goto next;
1482
1483                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1484                 /* Erase any old settings of this same parameter */
1485                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1486                                 devname, comment, CM_SKIP);
1487                 if (rc < 0)
1488                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1489                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1490                 if (lcr == NULL)
1491                         goto next;
1492                 /* Write the new one */
1493                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1494                                           lcr, devname, comment);
1495                 if (rc != 0)
1496                         CERROR("%s: writing log %s: rc = %d\n",
1497                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1498 next:
1499                 mgs_direntry_free(dirent);
1500         }
1501
1502         RETURN(rc);
1503 }
1504
1505 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1506                                     struct mgs_device *mgs,
1507                                     struct fs_db *fsdb,
1508                                     struct mgs_target_info *mti,
1509                                     int index, char *logname);
1510 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1511                                     struct mgs_device *mgs,
1512                                     struct fs_db *fsdb,
1513                                     struct mgs_target_info *mti,
1514                                     char *logname, char *suffix, char *lovname,
1515                                     enum lustre_sec_part sec_part, int flags);
1516 static int name_create_mdt_and_lov(char **logname, char **lovname,
1517                                    struct fs_db *fsdb, int i);
1518
1519 static int add_param(char *params, char *key, char *val)
1520 {
1521         char *start = params + strlen(params);
1522         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1523         int keylen = 0;
1524
1525         if (key != NULL)
1526                 keylen = strlen(key);
1527         if (start + 1 + keylen + strlen(val) >= end) {
1528                 CERROR("params are too long: %s %s%s\n",
1529                        params, key != NULL ? key : "", val);
1530                 return -EINVAL;
1531         }
1532
1533         sprintf(start, " %s%s", key != NULL ? key : "", val);
1534         return 0;
1535 }
1536
1537 /**
1538  * Walk through client config log record and convert the related records
1539  * into the target.
1540  **/
1541 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1542                                          struct llog_handle *llh,
1543                                          struct llog_rec_hdr *rec, void *data)
1544 {
1545         struct mgs_device *mgs;
1546         struct obd_device *obd;
1547         struct mgs_target_info *mti, *tmti;
1548         struct fs_db *fsdb;
1549         int cfg_len = rec->lrh_len;
1550         char *cfg_buf = (char*) (rec + 1);
1551         struct lustre_cfg *lcfg;
1552         int rc = 0;
1553         struct llog_handle *mdt_llh = NULL;
1554         static int got_an_osc_or_mdc = 0;
1555         /* 0: not found any osc/mdc;
1556            1: found osc;
1557            2: found mdc;
1558         */
1559         static int last_step = -1;
1560         int cplen = 0;
1561
1562         ENTRY;
1563
1564         mti = ((struct temp_comp*)data)->comp_mti;
1565         tmti = ((struct temp_comp*)data)->comp_tmti;
1566         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1567         obd = ((struct temp_comp *)data)->comp_obd;
1568         mgs = lu2mgs_dev(obd->obd_lu_dev);
1569         LASSERT(mgs);
1570
1571         if (rec->lrh_type != OBD_CFG_REC) {
1572                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1573                 RETURN(-EINVAL);
1574         }
1575
1576         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1577         if (rc) {
1578                 CERROR("Insane cfg\n");
1579                 RETURN(rc);
1580         }
1581
1582         lcfg = (struct lustre_cfg *)cfg_buf;
1583
1584         if (lcfg->lcfg_command == LCFG_MARKER) {
1585                 struct cfg_marker *marker;
1586                 marker = lustre_cfg_buf(lcfg, 1);
1587                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1588                     (marker->cm_flags & CM_START) &&
1589                      !(marker->cm_flags & CM_SKIP)) {
1590                         got_an_osc_or_mdc = 1;
1591                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1592                                         sizeof(tmti->mti_svname));
1593                         if (cplen >= sizeof(tmti->mti_svname))
1594                                 RETURN(-E2BIG);
1595                         rc = record_start_log(env, mgs, &mdt_llh,
1596                                               mti->mti_svname);
1597                         if (rc)
1598                                 RETURN(rc);
1599                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1600                                            mti->mti_svname, "add osc(copied)");
1601                         record_end_log(env, &mdt_llh);
1602                         last_step = marker->cm_step;
1603                         RETURN(rc);
1604                 }
1605                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1606                     (marker->cm_flags & CM_END) &&
1607                      !(marker->cm_flags & CM_SKIP)) {
1608                         LASSERT(last_step == marker->cm_step);
1609                         last_step = -1;
1610                         got_an_osc_or_mdc = 0;
1611                         memset(tmti, 0, sizeof(*tmti));
1612                         rc = record_start_log(env, mgs, &mdt_llh,
1613                                               mti->mti_svname);
1614                         if (rc)
1615                                 RETURN(rc);
1616                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1617                                            mti->mti_svname, "add osc(copied)");
1618                         record_end_log(env, &mdt_llh);
1619                         RETURN(rc);
1620                 }
1621                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1622                     (marker->cm_flags & CM_START) &&
1623                      !(marker->cm_flags & CM_SKIP)) {
1624                         got_an_osc_or_mdc = 2;
1625                         last_step = marker->cm_step;
1626                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1627                                strlen(marker->cm_tgtname));
1628
1629                         RETURN(rc);
1630                 }
1631                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1632                     (marker->cm_flags & CM_END) &&
1633                      !(marker->cm_flags & CM_SKIP)) {
1634                         LASSERT(last_step == marker->cm_step);
1635                         last_step = -1;
1636                         got_an_osc_or_mdc = 0;
1637                         memset(tmti, 0, sizeof(*tmti));
1638                         RETURN(rc);
1639                 }
1640         }
1641
1642         if (got_an_osc_or_mdc == 0 || last_step < 0)
1643                 RETURN(rc);
1644
1645         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1646                 __u64 nodenid = lcfg->lcfg_nid;
1647
1648                 if (strlen(tmti->mti_uuid) == 0) {
1649                         /* target uuid not set, this config record is before
1650                          * LCFG_SETUP, this nid is one of target node nid.
1651                          */
1652                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1653                         tmti->mti_nid_count++;
1654                 } else {
1655                         char nidstr[LNET_NIDSTR_SIZE];
1656
1657                         /* failover node nid */
1658                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1659                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1660                                         nidstr);
1661                 }
1662
1663                 RETURN(rc);
1664         }
1665
1666         if (lcfg->lcfg_command == LCFG_SETUP) {
1667                 char *target;
1668
1669                 target = lustre_cfg_string(lcfg, 1);
1670                 memcpy(tmti->mti_uuid, target, strlen(target));
1671                 RETURN(rc);
1672         }
1673
1674         /* ignore client side sptlrpc_conf_log */
1675         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1676                 RETURN(rc);
1677
1678         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1679                 int index;
1680
1681                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1682                         RETURN (-EINVAL);
1683
1684                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1685                        strlen(mti->mti_fsname));
1686                 tmti->mti_stripe_index = index;
1687
1688                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1689                                               mti->mti_stripe_index,
1690                                               mti->mti_svname);
1691                 memset(tmti, 0, sizeof(*tmti));
1692                 RETURN(rc);
1693         }
1694
1695         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1696                 int index;
1697                 char mdt_index[9];
1698                 char *logname, *lovname;
1699
1700                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1701                                              mti->mti_stripe_index);
1702                 if (rc)
1703                         RETURN(rc);
1704                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1705
1706                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1707                         name_destroy(&logname);
1708                         name_destroy(&lovname);
1709                         RETURN(-EINVAL);
1710                 }
1711
1712                 tmti->mti_stripe_index = index;
1713                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1714                                          mdt_index, lovname,
1715                                          LUSTRE_SP_MDT, 0);
1716                 name_destroy(&logname);
1717                 name_destroy(&lovname);
1718                 RETURN(rc);
1719         }
1720         RETURN(rc);
1721 }
1722
1723 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1724 /* stealed from mgs_get_fsdb_from_llog*/
1725 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1726                                               struct mgs_device *mgs,
1727                                               char *client_name,
1728                                               struct temp_comp* comp)
1729 {
1730         struct llog_handle *loghandle;
1731         struct mgs_target_info *tmti;
1732         struct llog_ctxt *ctxt;
1733         int rc;
1734
1735         ENTRY;
1736
1737         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1738         LASSERT(ctxt != NULL);
1739
1740         OBD_ALLOC_PTR(tmti);
1741         if (tmti == NULL)
1742                 GOTO(out_ctxt, rc = -ENOMEM);
1743
1744         comp->comp_tmti = tmti;
1745         comp->comp_obd = mgs->mgs_obd;
1746
1747         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1748                        LLOG_OPEN_EXISTS);
1749         if (rc < 0) {
1750                 if (rc == -ENOENT)
1751                         rc = 0;
1752                 GOTO(out_pop, rc);
1753         }
1754
1755         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1756         if (rc)
1757                 GOTO(out_close, rc);
1758
1759         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1760                                   (void *)comp, NULL, false);
1761         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1762 out_close:
1763         llog_close(env, loghandle);
1764 out_pop:
1765         OBD_FREE_PTR(tmti);
1766 out_ctxt:
1767         llog_ctxt_put(ctxt);
1768         RETURN(rc);
1769 }
1770
1771 /* lmv is the second thing for client logs */
1772 /* copied from mgs_write_log_lov. Please refer to that.  */
1773 static int mgs_write_log_lmv(const struct lu_env *env,
1774                              struct mgs_device *mgs,
1775                              struct fs_db *fsdb,
1776                              struct mgs_target_info *mti,
1777                              char *logname, char *lmvname)
1778 {
1779         struct llog_handle *llh = NULL;
1780         struct lmv_desc *lmvdesc;
1781         char *uuid;
1782         int rc = 0;
1783         ENTRY;
1784
1785         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1786
1787         OBD_ALLOC_PTR(lmvdesc);
1788         if (lmvdesc == NULL)
1789                 RETURN(-ENOMEM);
1790         lmvdesc->ld_active_tgt_count = 0;
1791         lmvdesc->ld_tgt_count = 0;
1792         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1793         uuid = (char *)lmvdesc->ld_uuid.uuid;
1794
1795         rc = record_start_log(env, mgs, &llh, logname);
1796         if (rc)
1797                 GOTO(out_free, rc);
1798         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1799         if (rc)
1800                 GOTO(out_end, rc);
1801         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1802         if (rc)
1803                 GOTO(out_end, rc);
1804         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1805         if (rc)
1806                 GOTO(out_end, rc);
1807         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1808         if (rc)
1809                 GOTO(out_end, rc);
1810 out_end:
1811         record_end_log(env, &llh);
1812 out_free:
1813         OBD_FREE_PTR(lmvdesc);
1814         RETURN(rc);
1815 }
1816
1817 /* lov is the first thing in the mdt and client logs */
1818 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1819                              struct fs_db *fsdb, struct mgs_target_info *mti,
1820                              char *logname, char *lovname)
1821 {
1822         struct llog_handle *llh = NULL;
1823         struct lov_desc *lovdesc;
1824         char *uuid;
1825         int rc = 0;
1826         ENTRY;
1827
1828         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1829
1830         /*
1831         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1832         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1833               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1834         */
1835
1836         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1837         OBD_ALLOC_PTR(lovdesc);
1838         if (lovdesc == NULL)
1839                 RETURN(-ENOMEM);
1840         lovdesc->ld_magic = LOV_DESC_MAGIC;
1841         lovdesc->ld_tgt_count = 0;
1842         /* Defaults.  Can be changed later by lcfg config_param */
1843         lovdesc->ld_default_stripe_count = 1;
1844         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1845         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1846         lovdesc->ld_default_stripe_offset = -1;
1847         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1848         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1849         /* can these be the same? */
1850         uuid = (char *)lovdesc->ld_uuid.uuid;
1851
1852         /* This should always be the first entry in a log.
1853         rc = mgs_clear_log(obd, logname); */
1854         rc = record_start_log(env, mgs, &llh, logname);
1855         if (rc)
1856                 GOTO(out_free, rc);
1857         /* FIXME these should be a single journal transaction */
1858         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1859         if (rc)
1860                 GOTO(out_end, rc);
1861         rc = record_attach(env, llh, lovname, "lov", uuid);
1862         if (rc)
1863                 GOTO(out_end, rc);
1864         rc = record_lov_setup(env, llh, lovname, lovdesc);
1865         if (rc)
1866                 GOTO(out_end, rc);
1867         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1868         if (rc)
1869                 GOTO(out_end, rc);
1870         EXIT;
1871 out_end:
1872         record_end_log(env, &llh);
1873 out_free:
1874         OBD_FREE_PTR(lovdesc);
1875         return rc;
1876 }
1877
1878 /* add failnids to open log */
1879 static int mgs_write_log_failnids(const struct lu_env *env,
1880                                   struct mgs_target_info *mti,
1881                                   struct llog_handle *llh,
1882                                   char *cliname)
1883 {
1884         char *failnodeuuid = NULL;
1885         char *ptr = mti->mti_params;
1886         lnet_nid_t nid;
1887         int rc = 0;
1888
1889         /*
1890         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1891         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1892         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1893         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1894         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1895         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1896         */
1897
1898         /*
1899          * Pull failnid info out of params string, which may contain something
1900          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1901          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1902          * etc.  However, convert_hostnames() should have caught those.
1903          */
1904         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1905                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1906                         char nidstr[LNET_NIDSTR_SIZE];
1907
1908                         if (failnodeuuid == NULL) {
1909                                 /* We don't know the failover node name,
1910                                  * so just use the first nid as the uuid */
1911                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1912                                 rc = name_create(&failnodeuuid, nidstr, "");
1913                                 if (rc != 0)
1914                                         return rc;
1915                         }
1916                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1917                                 "client %s\n",
1918                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1919                                 failnodeuuid, cliname);
1920                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1921                         /*
1922                          * If *ptr is ':', we have added all NIDs for
1923                          * failnodeuuid.
1924                          */
1925                         if (*ptr == ':') {
1926                                 rc = record_add_conn(env, llh, cliname,
1927                                                      failnodeuuid);
1928                                 name_destroy(&failnodeuuid);
1929                                 failnodeuuid = NULL;
1930                         }
1931                 }
1932                 if (failnodeuuid) {
1933                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1934                         name_destroy(&failnodeuuid);
1935                         failnodeuuid = NULL;
1936                 }
1937         }
1938
1939         return rc;
1940 }
1941
1942 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1943                                     struct mgs_device *mgs,
1944                                     struct fs_db *fsdb,
1945                                     struct mgs_target_info *mti,
1946                                     char *logname, char *lmvname)
1947 {
1948         struct llog_handle *llh = NULL;
1949         char *mdcname = NULL;
1950         char *nodeuuid = NULL;
1951         char *mdcuuid = NULL;
1952         char *lmvuuid = NULL;
1953         char index[6];
1954         char nidstr[LNET_NIDSTR_SIZE];
1955         int i, rc;
1956         ENTRY;
1957
1958         if (mgs_log_is_empty(env, mgs, logname)) {
1959                 CERROR("log is empty! Logical error\n");
1960                 RETURN(-EINVAL);
1961         }
1962
1963         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1964                mti->mti_svname, logname, lmvname);
1965
1966         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1967         rc = name_create(&nodeuuid, nidstr, "");
1968         if (rc)
1969                 RETURN(rc);
1970         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1971         if (rc)
1972                 GOTO(out_free, rc);
1973         rc = name_create(&mdcuuid, mdcname, "_UUID");
1974         if (rc)
1975                 GOTO(out_free, rc);
1976         rc = name_create(&lmvuuid, lmvname, "_UUID");
1977         if (rc)
1978                 GOTO(out_free, rc);
1979
1980         rc = record_start_log(env, mgs, &llh, logname);
1981         if (rc)
1982                 GOTO(out_free, rc);
1983         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1984                            "add mdc");
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         for (i = 0; i < mti->mti_nid_count; i++) {
1988                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1989                         libcfs_nid2str_r(mti->mti_nids[i],
1990                                          nidstr, sizeof(nidstr)));
1991
1992                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1993                 if (rc)
1994                         GOTO(out_end, rc);
1995         }
1996
1997         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1998         if (rc)
1999                 GOTO(out_end, rc);
2000         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2001                           NULL, NULL);
2002         if (rc)
2003                 GOTO(out_end, rc);
2004         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2005         if (rc)
2006                 GOTO(out_end, rc);
2007         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2008         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2009                             index, "1");
2010         if (rc)
2011                 GOTO(out_end, rc);
2012         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2013                            "add mdc");
2014         if (rc)
2015                 GOTO(out_end, rc);
2016 out_end:
2017         record_end_log(env, &llh);
2018 out_free:
2019         name_destroy(&lmvuuid);
2020         name_destroy(&mdcuuid);
2021         name_destroy(&mdcname);
2022         name_destroy(&nodeuuid);
2023         RETURN(rc);
2024 }
2025
2026 static inline int name_create_lov(char **lovname, char *mdtname,
2027                                   struct fs_db *fsdb, int index)
2028 {
2029         /* COMPAT_180 */
2030         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2031                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2032         else
2033                 return name_create(lovname, mdtname, "-mdtlov");
2034 }
2035
2036 static int name_create_mdt_and_lov(char **logname, char **lovname,
2037                                    struct fs_db *fsdb, int i)
2038 {
2039         int rc;
2040
2041         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2042         if (rc)
2043                 return rc;
2044         /* COMPAT_180 */
2045         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2046                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2047         else
2048                 rc = name_create(lovname, *logname, "-mdtlov");
2049         if (rc) {
2050                 name_destroy(logname);
2051                 *logname = NULL;
2052         }
2053         return rc;
2054 }
2055
2056 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2057                                       struct fs_db *fsdb, int i)
2058 {
2059         char suffix[16];
2060
2061         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2062                 sprintf(suffix, "-osc");
2063         else
2064                 sprintf(suffix, "-osc-MDT%04x", i);
2065         return name_create(oscname, ostname, suffix);
2066 }
2067
2068 /* add new mdc to already existent MDS */
2069 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2070                                     struct mgs_device *mgs,
2071                                     struct fs_db *fsdb,
2072                                     struct mgs_target_info *mti,
2073                                     int mdt_index, char *logname)
2074 {
2075         struct llog_handle      *llh = NULL;
2076         char    *nodeuuid = NULL;
2077         char    *ospname = NULL;
2078         char    *lovuuid = NULL;
2079         char    *mdtuuid = NULL;
2080         char    *svname = NULL;
2081         char    *mdtname = NULL;
2082         char    *lovname = NULL;
2083         char    index_str[16];
2084         char    nidstr[LNET_NIDSTR_SIZE];
2085         int     i, rc;
2086
2087         ENTRY;
2088         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2089                 CERROR("log is empty! Logical error\n");
2090                 RETURN (-EINVAL);
2091         }
2092
2093         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2094                logname);
2095
2096         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2097         if (rc)
2098                 RETURN(rc);
2099
2100         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2101         rc = name_create(&nodeuuid, nidstr, "");
2102         if (rc)
2103                 GOTO(out_destory, rc);
2104
2105         rc = name_create(&svname, mdtname, "-osp");
2106         if (rc)
2107                 GOTO(out_destory, rc);
2108
2109         sprintf(index_str, "-MDT%04x", mdt_index);
2110         rc = name_create(&ospname, svname, index_str);
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2115         if (rc)
2116                 GOTO(out_destory, rc);
2117
2118         rc = name_create(&lovuuid, lovname, "_UUID");
2119         if (rc)
2120                 GOTO(out_destory, rc);
2121
2122         rc = name_create(&mdtuuid, mdtname, "_UUID");
2123         if (rc)
2124                 GOTO(out_destory, rc);
2125
2126         rc = record_start_log(env, mgs, &llh, logname);
2127         if (rc)
2128                 GOTO(out_destory, rc);
2129
2130         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2131                            "add osp");
2132         if (rc)
2133                 GOTO(out_destory, rc);
2134
2135         for (i = 0; i < mti->mti_nid_count; i++) {
2136                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2137                         libcfs_nid2str_r(mti->mti_nids[i],
2138                                          nidstr, sizeof(nidstr)));
2139                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2140                 if (rc)
2141                         GOTO(out_end, rc);
2142         }
2143
2144         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2149                           NULL, NULL);
2150         if (rc)
2151                 GOTO(out_end, rc);
2152
2153         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2154         if (rc)
2155                 GOTO(out_end, rc);
2156
2157         /* Add mdc(osp) to lod */
2158         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2159                  mti->mti_stripe_index);
2160         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2161                          index_str, "1", NULL);
2162         if (rc)
2163                 GOTO(out_end, rc);
2164
2165         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2166         if (rc)
2167                 GOTO(out_end, rc);
2168
2169 out_end:
2170         record_end_log(env, &llh);
2171
2172 out_destory:
2173         name_destroy(&mdtuuid);
2174         name_destroy(&lovuuid);
2175         name_destroy(&lovname);
2176         name_destroy(&ospname);
2177         name_destroy(&svname);
2178         name_destroy(&nodeuuid);
2179         name_destroy(&mdtname);
2180         RETURN(rc);
2181 }
2182
2183 static int mgs_write_log_mdt0(const struct lu_env *env,
2184                               struct mgs_device *mgs,
2185                               struct fs_db *fsdb,
2186                               struct mgs_target_info *mti)
2187 {
2188         char *log = mti->mti_svname;
2189         struct llog_handle *llh = NULL;
2190         char *uuid, *lovname;
2191         char mdt_index[6];
2192         char *ptr = mti->mti_params;
2193         int rc = 0, failout = 0;
2194         ENTRY;
2195
2196         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2197         if (uuid == NULL)
2198                 RETURN(-ENOMEM);
2199
2200         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2201                 failout = (strncmp(ptr, "failout", 7) == 0);
2202
2203         rc = name_create(&lovname, log, "-mdtlov");
2204         if (rc)
2205                 GOTO(out_free, rc);
2206         if (mgs_log_is_empty(env, mgs, log)) {
2207                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2208                 if (rc)
2209                         GOTO(out_lod, rc);
2210         }
2211
2212         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2213
2214         rc = record_start_log(env, mgs, &llh, log);
2215         if (rc)
2216                 GOTO(out_lod, rc);
2217
2218         /* add MDT itself */
2219
2220         /* FIXME this whole fn should be a single journal transaction */
2221         sprintf(uuid, "%s_UUID", log);
2222         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2223         if (rc)
2224                 GOTO(out_lod, rc);
2225         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2226         if (rc)
2227                 GOTO(out_end, rc);
2228         rc = record_mount_opt(env, llh, log, lovname, NULL);
2229         if (rc)
2230                 GOTO(out_end, rc);
2231         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2232                         failout ? "n" : "f");
2233         if (rc)
2234                 GOTO(out_end, rc);
2235         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2236         if (rc)
2237                 GOTO(out_end, rc);
2238 out_end:
2239         record_end_log(env, &llh);
2240 out_lod:
2241         name_destroy(&lovname);
2242 out_free:
2243         OBD_FREE(uuid, sizeof(struct obd_uuid));
2244         RETURN(rc);
2245 }
2246
2247 /* envelope method for all layers log */
2248 static int mgs_write_log_mdt(const struct lu_env *env,
2249                              struct mgs_device *mgs,
2250                              struct fs_db *fsdb,
2251                              struct mgs_target_info *mti)
2252 {
2253         struct mgs_thread_info *mgi = mgs_env_info(env);
2254         struct llog_handle *llh = NULL;
2255         char *cliname;
2256         int rc, i = 0;
2257         ENTRY;
2258
2259         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2260
2261         if (mti->mti_uuid[0] == '\0') {
2262                 /* Make up our own uuid */
2263                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2264                          "%s_UUID", mti->mti_svname);
2265         }
2266
2267         /* add mdt */
2268         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2269         if (rc)
2270                 RETURN(rc);
2271         /* Append the mdt info to the client log */
2272         rc = name_create(&cliname, mti->mti_fsname, "-client");
2273         if (rc)
2274                 RETURN(rc);
2275
2276         if (mgs_log_is_empty(env, mgs, cliname)) {
2277                 /* Start client log */
2278                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2279                                        fsdb->fsdb_clilov);
2280                 if (rc)
2281                         GOTO(out_free, rc);
2282                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2283                                        fsdb->fsdb_clilmv);
2284                 if (rc)
2285                         GOTO(out_free, rc);
2286         }
2287
2288         /*
2289         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2290         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2291         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2292         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2293         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2294         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2295         */
2296
2297                 /* copy client info about lov/lmv */
2298                 mgi->mgi_comp.comp_mti = mti;
2299                 mgi->mgi_comp.comp_fsdb = fsdb;
2300
2301                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2302                                                         &mgi->mgi_comp);
2303                 if (rc)
2304                         GOTO(out_free, rc);
2305                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2306                                               fsdb->fsdb_clilmv);
2307                 if (rc)
2308                         GOTO(out_free, rc);
2309
2310                 /* add mountopts */
2311                 rc = record_start_log(env, mgs, &llh, cliname);
2312                 if (rc)
2313                         GOTO(out_free, rc);
2314
2315                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2316                                    "mount opts");
2317                 if (rc)
2318                         GOTO(out_end, rc);
2319                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2320                                       fsdb->fsdb_clilmv);
2321                 if (rc)
2322                         GOTO(out_end, rc);
2323                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2324                                    "mount opts");
2325
2326         if (rc)
2327                 GOTO(out_end, rc);
2328
2329         /* for_all_existing_mdt except current one */
2330         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2331                 if (i !=  mti->mti_stripe_index &&
2332                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2333                         char *logname;
2334
2335                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2336                         if (rc)
2337                                 GOTO(out_end, rc);
2338
2339                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2340                                                       i, logname);
2341                         name_destroy(&logname);
2342                         if (rc)
2343                                 GOTO(out_end, rc);
2344                 }
2345         }
2346 out_end:
2347         record_end_log(env, &llh);
2348 out_free:
2349         name_destroy(&cliname);
2350         RETURN(rc);
2351 }
2352
2353 /* Add the ost info to the client/mdt lov */
2354 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2355                                     struct mgs_device *mgs, struct fs_db *fsdb,
2356                                     struct mgs_target_info *mti,
2357                                     char *logname, char *suffix, char *lovname,
2358                                     enum lustre_sec_part sec_part, int flags)
2359 {
2360         struct llog_handle *llh = NULL;
2361         char *nodeuuid = NULL;
2362         char *oscname = NULL;
2363         char *oscuuid = NULL;
2364         char *lovuuid = NULL;
2365         char *svname = NULL;
2366         char index[6];
2367         char nidstr[LNET_NIDSTR_SIZE];
2368         int i, rc;
2369         ENTRY;
2370
2371         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2372                 mti->mti_svname, logname);
2373
2374         if (mgs_log_is_empty(env, mgs, logname)) {
2375                 CERROR("log is empty! Logical error\n");
2376                 RETURN(-EINVAL);
2377         }
2378
2379         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2380         rc = name_create(&nodeuuid, nidstr, "");
2381         if (rc)
2382                 RETURN(rc);
2383         rc = name_create(&svname, mti->mti_svname, "-osc");
2384         if (rc)
2385                 GOTO(out_free, rc);
2386
2387         /* for the system upgraded from old 1.8, keep using the old osc naming
2388          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2389         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2390                 rc = name_create(&oscname, svname, "");
2391         else
2392                 rc = name_create(&oscname, svname, suffix);
2393         if (rc)
2394                 GOTO(out_free, rc);
2395
2396         rc = name_create(&oscuuid, oscname, "_UUID");
2397         if (rc)
2398                 GOTO(out_free, rc);
2399         rc = name_create(&lovuuid, lovname, "_UUID");
2400         if (rc)
2401                 GOTO(out_free, rc);
2402
2403
2404         /*
2405         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2406         multihomed (#4)
2407         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2408         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2409         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2410         failover (#6,7)
2411         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2412         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2413         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2414         */
2415
2416         rc = record_start_log(env, mgs, &llh, logname);
2417         if (rc)
2418                 GOTO(out_free, rc);
2419
2420         /* FIXME these should be a single journal transaction */
2421         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2422                            "add osc");
2423         if (rc)
2424                 GOTO(out_end, rc);
2425
2426         /* NB: don't change record order, because upon MDT steal OSC config
2427          * from client, it treats all nids before LCFG_SETUP as target nids
2428          * (multiple interfaces), while nids after as failover node nids.
2429          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2430          */
2431         for (i = 0; i < mti->mti_nid_count; i++) {
2432                 CDEBUG(D_MGS, "add nid %s\n",
2433                         libcfs_nid2str_r(mti->mti_nids[i],
2434                                          nidstr, sizeof(nidstr)));
2435                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2436                 if (rc)
2437                         GOTO(out_end, rc);
2438         }
2439         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2440         if (rc)
2441                 GOTO(out_end, rc);
2442         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2443                           NULL, NULL);
2444         if (rc)
2445                 GOTO(out_end, rc);
2446         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2447         if (rc)
2448                 GOTO(out_end, rc);
2449
2450         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2451
2452         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2453         if (rc)
2454                 GOTO(out_end, rc);
2455         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2456                            "add osc");
2457         if (rc)
2458                 GOTO(out_end, rc);
2459 out_end:
2460         record_end_log(env, &llh);
2461 out_free:
2462         name_destroy(&lovuuid);
2463         name_destroy(&oscuuid);
2464         name_destroy(&oscname);
2465         name_destroy(&svname);
2466         name_destroy(&nodeuuid);
2467         RETURN(rc);
2468 }
2469
2470 static int mgs_write_log_ost(const struct lu_env *env,
2471                              struct mgs_device *mgs, struct fs_db *fsdb,
2472                              struct mgs_target_info *mti)
2473 {
2474         struct llog_handle *llh = NULL;
2475         char *logname, *lovname;
2476         char *ptr = mti->mti_params;
2477         int rc, flags = 0, failout = 0, i;
2478         ENTRY;
2479
2480         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2481
2482         /* The ost startup log */
2483
2484         /* If the ost log already exists, that means that someone reformatted
2485            the ost and it called target_add again. */
2486         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2487                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2488                                    "exists, yet the server claims it never "
2489                                    "registered. It may have been reformatted, "
2490                                    "or the index changed. writeconf the MDT to "
2491                                    "regenerate all logs.\n", mti->mti_svname);
2492                 RETURN(-EALREADY);
2493         }
2494
2495         /*
2496         attach obdfilter ost1 ost1_UUID
2497         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2498         */
2499         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2500                 failout = (strncmp(ptr, "failout", 7) == 0);
2501         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2502         if (rc)
2503                 RETURN(rc);
2504         /* FIXME these should be a single journal transaction */
2505         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2506         if (rc)
2507                 GOTO(out_end, rc);
2508         if (*mti->mti_uuid == '\0')
2509                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2510                          "%s_UUID", mti->mti_svname);
2511         rc = record_attach(env, llh, mti->mti_svname,
2512                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2513         if (rc)
2514                 GOTO(out_end, rc);
2515         rc = record_setup(env, llh, mti->mti_svname,
2516                           "dev"/*ignored*/, "type"/*ignored*/,
2517                           failout ? "n" : "f", NULL/*options*/);
2518         if (rc)
2519                 GOTO(out_end, rc);
2520         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2521         if (rc)
2522                 GOTO(out_end, rc);
2523 out_end:
2524         record_end_log(env, &llh);
2525         if (rc)
2526                 RETURN(rc);
2527         /* We also have to update the other logs where this osc is part of
2528            the lov */
2529
2530         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2531                 /* If we're upgrading, the old mdt log already has our
2532                    entry. Let's do a fake one for fun. */
2533                 /* Note that we can't add any new failnids, since we don't
2534                    know the old osc names. */
2535                 flags = CM_SKIP | CM_UPGRADE146;
2536
2537         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2538                 /* If the update flag isn't set, don't update client/mdt
2539                    logs. */
2540                 flags |= CM_SKIP;
2541                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2542                               "the MDT first to regenerate it.\n",
2543                               mti->mti_svname);
2544         }
2545
2546         /* Add ost to all MDT lov defs */
2547         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2548                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2549                         char mdt_index[9];
2550
2551                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2552                                                      i);
2553                         if (rc)
2554                                 RETURN(rc);
2555                         sprintf(mdt_index, "-MDT%04x", i);
2556                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2557                                                       logname, mdt_index,
2558                                                       lovname, LUSTRE_SP_MDT,
2559                                                       flags);
2560                         name_destroy(&logname);
2561                         name_destroy(&lovname);
2562                         if (rc)
2563                                 RETURN(rc);
2564                 }
2565         }
2566
2567         /* Append ost info to the client log */
2568         rc = name_create(&logname, mti->mti_fsname, "-client");
2569         if (rc)
2570                 RETURN(rc);
2571         if (mgs_log_is_empty(env, mgs, logname)) {
2572                 /* Start client log */
2573                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2574                                        fsdb->fsdb_clilov);
2575                 if (rc)
2576                         GOTO(out_free, rc);
2577                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2578                                        fsdb->fsdb_clilmv);
2579                 if (rc)
2580                         GOTO(out_free, rc);
2581         }
2582         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2583                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2584 out_free:
2585         name_destroy(&logname);
2586         RETURN(rc);
2587 }
2588
2589 static __inline__ int mgs_param_empty(char *ptr)
2590 {
2591         char *tmp;
2592
2593         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2594                 return 1;
2595         return 0;
2596 }
2597
2598 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2599                                           struct mgs_device *mgs,
2600                                           struct fs_db *fsdb,
2601                                           struct mgs_target_info *mti,
2602                                           char *logname, char *cliname)
2603 {
2604         int rc;
2605         struct llog_handle *llh = NULL;
2606
2607         if (mgs_param_empty(mti->mti_params)) {
2608                 /* Remove _all_ failnids */
2609                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2610                                 mti->mti_svname, "add failnid", CM_SKIP);
2611                 return rc < 0 ? rc : 0;
2612         }
2613
2614         /* Otherwise failover nids are additive */
2615         rc = record_start_log(env, mgs, &llh, logname);
2616         if (rc)
2617                 return rc;
2618                 /* FIXME this should be a single journal transaction */
2619         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2620                            "add failnid");
2621         if (rc)
2622                 goto out_end;
2623         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2624         if (rc)
2625                 goto out_end;
2626         rc = record_marker(env, llh, fsdb, CM_END,
2627                            mti->mti_svname, "add failnid");
2628 out_end:
2629         record_end_log(env, &llh);
2630         return rc;
2631 }
2632
2633
2634 /* Add additional failnids to an existing log.
2635    The mdc/osc must have been added to logs first */
2636 /* tcp nids must be in dotted-quad ascii -
2637    we can't resolve hostnames from the kernel. */
2638 static int mgs_write_log_add_failnid(const struct lu_env *env,
2639                                      struct mgs_device *mgs,
2640                                      struct fs_db *fsdb,
2641                                      struct mgs_target_info *mti)
2642 {
2643         char *logname, *cliname;
2644         int rc;
2645         ENTRY;
2646
2647         /* FIXME we currently can't erase the failnids
2648          * given when a target first registers, since they aren't part of
2649          * an "add uuid" stanza */
2650
2651         /* Verify that we know about this target */
2652         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2653                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2654                                    "yet. It must be started before failnids "
2655                                    "can be added.\n", mti->mti_svname);
2656                 RETURN(-ENOENT);
2657         }
2658
2659         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2660         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2661                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2662         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2663                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2664         } else {
2665                 RETURN(-EINVAL);
2666         }
2667         if (rc)
2668                 RETURN(rc);
2669         /* Add failover nids to the client log */
2670         rc = name_create(&logname, mti->mti_fsname, "-client");
2671         if (rc) {
2672                 name_destroy(&cliname);
2673                 RETURN(rc);
2674         }
2675         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2676         name_destroy(&logname);
2677         name_destroy(&cliname);
2678         if (rc)
2679                 RETURN(rc);
2680
2681         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2682                 /* Add OST failover nids to the MDT logs as well */
2683                 int i;
2684
2685                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2686                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2687                                 continue;
2688                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2689                         if (rc)
2690                                 RETURN(rc);
2691                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2692                                                  fsdb, i);
2693                         if (rc) {
2694                                 name_destroy(&logname);
2695                                 RETURN(rc);
2696                         }
2697                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2698                                                             mti, logname,
2699                                                             cliname);
2700                         name_destroy(&cliname);
2701                         name_destroy(&logname);
2702                         if (rc)
2703                                 RETURN(rc);
2704                 }
2705         }
2706
2707         RETURN(rc);
2708 }
2709
2710 static int mgs_wlp_lcfg(const struct lu_env *env,
2711                         struct mgs_device *mgs, struct fs_db *fsdb,
2712                         struct mgs_target_info *mti,
2713                         char *logname, struct lustre_cfg_bufs *bufs,
2714                         char *tgtname, char *ptr)
2715 {
2716         char comment[MTI_NAME_MAXLEN];
2717         char *tmp;
2718         struct llog_cfg_rec *lcr;
2719         int rc, del;
2720
2721         /* Erase any old settings of this same parameter */
2722         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2723         comment[MTI_NAME_MAXLEN - 1] = 0;
2724         /* But don't try to match the value. */
2725         tmp = strchr(comment, '=');
2726         if (tmp != NULL)
2727                 *tmp = 0;
2728         /* FIXME we should skip settings that are the same as old values */
2729         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2730         if (rc < 0)
2731                 return rc;
2732         del = mgs_param_empty(ptr);
2733
2734         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2735                       "Setting" : "Modifying", tgtname, comment, logname);
2736         if (del) {
2737                 /* mgs_modify() will return 1 if nothing had to be done */
2738                 if (rc == 1)
2739                         rc = 0;
2740                 return rc;
2741         }
2742
2743         lustre_cfg_bufs_reset(bufs, tgtname);
2744         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2745         if (mti->mti_flags & LDD_F_PARAM2)
2746                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2747
2748         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2749                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2750         if (lcr == NULL)
2751                 return -ENOMEM;
2752
2753         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2754                                   comment);
2755         lustre_cfg_rec_free(lcr);
2756         return rc;
2757 }
2758
2759 static int mgs_write_log_param2(const struct lu_env *env,
2760                                 struct mgs_device *mgs,
2761                                 struct fs_db *fsdb,
2762                                 struct mgs_target_info *mti, char *ptr)
2763 {
2764         struct lustre_cfg_bufs  bufs;
2765         int                     rc = 0;
2766         ENTRY;
2767
2768         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2769         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2770                           mti->mti_svname, ptr);
2771
2772         RETURN(rc);
2773 }
2774
2775 /* write global variable settings into log */
2776 static int mgs_write_log_sys(const struct lu_env *env,
2777                              struct mgs_device *mgs, struct fs_db *fsdb,
2778                              struct mgs_target_info *mti, char *sys, char *ptr)
2779 {
2780         struct mgs_thread_info  *mgi = mgs_env_info(env);
2781         struct lustre_cfg       *lcfg;
2782         struct llog_cfg_rec     *lcr;
2783         char *tmp, sep;
2784         int rc, cmd, convert = 1;
2785
2786         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2787                 cmd = LCFG_SET_TIMEOUT;
2788         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2789                 cmd = LCFG_SET_LDLM_TIMEOUT;
2790         /* Check for known params here so we can return error to lctl */
2791         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2792                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2793                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2794                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2795                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2796                 cmd = LCFG_PARAM;
2797         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2798                 convert = 0; /* Don't convert string value to integer */
2799                 cmd = LCFG_PARAM;
2800         } else {
2801                 return -EINVAL;
2802         }
2803
2804         if (mgs_param_empty(ptr))
2805                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2806         else
2807                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2808
2809         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2810         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2811         if (!convert && *tmp != '\0')
2812                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2813         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2814         if (lcr == NULL)
2815                 return -ENOMEM;
2816
2817         lcfg = &lcr->lcr_cfg;
2818         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2819         /* truncate the comment to the parameter name */
2820         ptr = tmp - 1;
2821         sep = *ptr;
2822         *ptr = '\0';
2823         /* modify all servers and clients */
2824         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2825                                       *tmp == '\0' ? NULL : lcr,
2826                                       mti->mti_fsname, sys, 0);
2827         if (rc == 0 && *tmp != '\0') {
2828                 switch (cmd) {
2829                 case LCFG_SET_TIMEOUT:
2830                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2831                                 class_process_config(lcfg);
2832                         break;
2833                 case LCFG_SET_LDLM_TIMEOUT:
2834                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2835                                 class_process_config(lcfg);
2836                         break;
2837                 default:
2838                         break;
2839                 }
2840         }
2841         *ptr = sep;
2842         lustre_cfg_rec_free(lcr);
2843         return rc;
2844 }
2845
2846 /* write quota settings into log */
2847 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2848                                struct fs_db *fsdb, struct mgs_target_info *mti,
2849                                char *quota, char *ptr)
2850 {
2851         struct mgs_thread_info  *mgi = mgs_env_info(env);
2852         struct llog_cfg_rec     *lcr;
2853         char                    *tmp;
2854         char                     sep;
2855         int                      rc, cmd = LCFG_PARAM;
2856
2857         /* support only 'meta' and 'data' pools so far */
2858         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2859             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2860                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2861                        "& quota.ost are)\n", ptr);
2862                 return -EINVAL;
2863         }
2864
2865         if (*tmp == '\0') {
2866                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2867         } else {
2868                 CDEBUG(D_MGS, "global '%s'\n", quota);
2869
2870                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2871                     strcmp(tmp, "none") != 0) {
2872                         CERROR("enable option(%s) isn't supported\n", tmp);
2873                         return -EINVAL;
2874                 }
2875         }
2876
2877         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2878         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2879         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2880         if (lcr == NULL)
2881                 return -ENOMEM;
2882
2883         /* truncate the comment to the parameter name */
2884         ptr = tmp - 1;
2885         sep = *ptr;
2886         *ptr = '\0';
2887
2888         /* XXX we duplicated quota enable information in all server
2889          *     config logs, it should be moved to a separate config
2890          *     log once we cleanup the config log for global param. */
2891         /* modify all servers */
2892         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2893                                       *tmp == '\0' ? NULL : lcr,
2894                                       mti->mti_fsname, quota, 1);
2895         *ptr = sep;
2896         lustre_cfg_rec_free(lcr);
2897         return rc < 0 ? rc : 0;
2898 }
2899
2900 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2901                                    struct mgs_device *mgs,
2902                                    struct fs_db *fsdb,
2903                                    struct mgs_target_info *mti,
2904                                    char *param)
2905 {
2906         struct mgs_thread_info  *mgi = mgs_env_info(env);
2907         struct llog_cfg_rec     *lcr;
2908         struct llog_handle      *llh = NULL;
2909         char                    *logname;
2910         char                    *comment, *ptr;
2911         int                      rc, len;
2912
2913         ENTRY;
2914
2915         /* get comment */
2916         ptr = strchr(param, '=');
2917         LASSERT(ptr != NULL);
2918         len = ptr - param;
2919
2920         OBD_ALLOC(comment, len + 1);
2921         if (comment == NULL)
2922                 RETURN(-ENOMEM);
2923         strncpy(comment, param, len);
2924         comment[len] = '\0';
2925
2926         /* prepare lcfg */
2927         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2928         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2929         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2930         if (lcr == NULL)
2931                 GOTO(out_comment, rc = -ENOMEM);
2932
2933         /* construct log name */
2934         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2935         if (rc < 0)
2936                 GOTO(out_lcfg, rc);
2937
2938         if (mgs_log_is_empty(env, mgs, logname)) {
2939                 rc = record_start_log(env, mgs, &llh, logname);
2940                 if (rc < 0)
2941                         GOTO(out, rc);
2942                 record_end_log(env, &llh);
2943         }
2944
2945         /* obsolete old one */
2946         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2947                         comment, CM_SKIP);
2948         if (rc < 0)
2949                 GOTO(out, rc);
2950         /* write the new one */
2951         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2952                                   mti->mti_svname, comment);
2953         if (rc)
2954                 CERROR("%s: error writing log %s: rc = %d\n",
2955                        mgs->mgs_obd->obd_name, logname, rc);
2956 out:
2957         name_destroy(&logname);
2958 out_lcfg:
2959         lustre_cfg_rec_free(lcr);
2960 out_comment:
2961         OBD_FREE(comment, len + 1);
2962         RETURN(rc);
2963 }
2964
2965 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2966                                         char *param)
2967 {
2968         char    *ptr;
2969
2970         /* disable the adjustable udesc parameter for now, i.e. use default
2971          * setting that client always ship udesc to MDT if possible. to enable
2972          * it simply remove the following line */
2973         goto error_out;
2974
2975         ptr = strchr(param, '=');
2976         if (ptr == NULL)
2977                 goto error_out;
2978         *ptr++ = '\0';
2979
2980         if (strcmp(param, PARAM_SRPC_UDESC))
2981                 goto error_out;
2982
2983         if (strcmp(ptr, "yes") == 0) {
2984                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2985                 CWARN("Enable user descriptor shipping from client to MDT\n");
2986         } else if (strcmp(ptr, "no") == 0) {
2987                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2988                 CWARN("Disable user descriptor shipping from client to MDT\n");
2989         } else {
2990                 *(ptr - 1) = '=';
2991                 goto error_out;
2992         }
2993         return 0;
2994
2995 error_out:
2996         CERROR("Invalid param: %s\n", param);
2997         return -EINVAL;
2998 }
2999
3000 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3001                                   const char *svname,
3002                                   char *param)
3003 {
3004         struct sptlrpc_rule      rule;
3005         struct sptlrpc_rule_set *rset;
3006         int                      rc;
3007         ENTRY;
3008
3009         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3010                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3011                 RETURN(-EINVAL);
3012         }
3013
3014         if (strncmp(param, PARAM_SRPC_UDESC,
3015                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3016                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3017         }
3018
3019         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3020                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3021                 RETURN(-EINVAL);
3022         }
3023
3024         param += sizeof(PARAM_SRPC_FLVR) - 1;
3025
3026         rc = sptlrpc_parse_rule(param, &rule);
3027         if (rc)
3028                 RETURN(rc);
3029
3030         /* mgs rules implies must be mgc->mgs */
3031         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3032                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3033                      rule.sr_from != LUSTRE_SP_ANY) ||
3034                     (rule.sr_to != LUSTRE_SP_MGS &&
3035                      rule.sr_to != LUSTRE_SP_ANY))
3036                         RETURN(-EINVAL);
3037         }
3038
3039         /* preapre room for this coming rule. svcname format should be:
3040          * - fsname: general rule
3041          * - fsname-tgtname: target-specific rule
3042          */
3043         if (strchr(svname, '-')) {
3044                 struct mgs_tgt_srpc_conf *tgtconf;
3045                 int                       found = 0;
3046
3047                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3048                      tgtconf = tgtconf->mtsc_next) {
3049                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3050                                 found = 1;
3051                                 break;
3052                         }
3053                 }
3054
3055                 if (!found) {
3056                         int name_len;
3057
3058                         OBD_ALLOC_PTR(tgtconf);
3059                         if (tgtconf == NULL)
3060                                 RETURN(-ENOMEM);
3061
3062                         name_len = strlen(svname);
3063
3064                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3065                         if (tgtconf->mtsc_tgt == NULL) {
3066                                 OBD_FREE_PTR(tgtconf);
3067                                 RETURN(-ENOMEM);
3068                         }
3069                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3070
3071                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3072                         fsdb->fsdb_srpc_tgt = tgtconf;
3073                 }
3074
3075                 rset = &tgtconf->mtsc_rset;
3076         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3077                 /* put _mgs related srpc rule directly in mgs ruleset */
3078                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3079         } else {
3080                 rset = &fsdb->fsdb_srpc_gen;
3081         }
3082
3083         rc = sptlrpc_rule_set_merge(rset, &rule);
3084
3085         RETURN(rc);
3086 }
3087
3088 static int mgs_srpc_set_param(const struct lu_env *env,
3089                               struct mgs_device *mgs,
3090                               struct fs_db *fsdb,
3091                               struct mgs_target_info *mti,
3092                               char *param)
3093 {
3094         char                   *copy;
3095         int                     rc, copy_size;
3096         ENTRY;
3097
3098 #ifndef HAVE_GSS
3099         RETURN(-EINVAL);
3100 #endif
3101         /* keep a copy of original param, which could be destroied
3102          * during parsing */
3103         copy_size = strlen(param) + 1;
3104         OBD_ALLOC(copy, copy_size);
3105         if (copy == NULL)
3106                 return -ENOMEM;
3107         memcpy(copy, param, copy_size);
3108
3109         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3110         if (rc)
3111                 goto out_free;
3112
3113         /* previous steps guaranteed the syntax is correct */
3114         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3115         if (rc)
3116                 goto out_free;
3117
3118         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3119                 /*
3120                  * for mgs rules, make them effective immediately.
3121                  */
3122                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3123                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3124                                                  &fsdb->fsdb_srpc_gen);
3125         }
3126
3127 out_free:
3128         OBD_FREE(copy, copy_size);
3129         RETURN(rc);
3130 }
3131
3132 struct mgs_srpc_read_data {
3133         struct fs_db   *msrd_fsdb;
3134         int             msrd_skip;
3135 };
3136
3137 static int mgs_srpc_read_handler(const struct lu_env *env,
3138                                  struct llog_handle *llh,
3139                                  struct llog_rec_hdr *rec, void *data)
3140 {
3141         struct mgs_srpc_read_data *msrd = data;
3142         struct cfg_marker         *marker;
3143         struct lustre_cfg         *lcfg = REC_DATA(rec);
3144         char                      *svname, *param;
3145         int                        cfg_len, rc;
3146         ENTRY;
3147
3148         if (rec->lrh_type != OBD_CFG_REC) {
3149                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3150                 RETURN(-EINVAL);
3151         }
3152
3153         cfg_len = REC_DATA_LEN(rec);
3154
3155         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3156         if (rc) {
3157                 CERROR("Insane cfg\n");
3158                 RETURN(rc);
3159         }
3160
3161         if (lcfg->lcfg_command == LCFG_MARKER) {
3162                 marker = lustre_cfg_buf(lcfg, 1);
3163
3164                 if (marker->cm_flags & CM_START &&
3165                     marker->cm_flags & CM_SKIP)
3166                         msrd->msrd_skip = 1;
3167                 if (marker->cm_flags & CM_END)
3168                         msrd->msrd_skip = 0;
3169
3170                 RETURN(0);
3171         }
3172
3173         if (msrd->msrd_skip)
3174                 RETURN(0);
3175
3176         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3177                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3178                 RETURN(0);
3179         }
3180
3181         svname = lustre_cfg_string(lcfg, 0);
3182         if (svname == NULL) {
3183                 CERROR("svname is empty\n");
3184                 RETURN(0);
3185         }
3186
3187         param = lustre_cfg_string(lcfg, 1);
3188         if (param == NULL) {
3189                 CERROR("param is empty\n");
3190                 RETURN(0);
3191         }
3192
3193         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3194         if (rc)
3195                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3196
3197         RETURN(0);
3198 }
3199
3200 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3201                                 struct mgs_device *mgs,
3202                                 struct fs_db *fsdb)
3203 {
3204         struct llog_handle        *llh = NULL;
3205         struct llog_ctxt          *ctxt;
3206         char                      *logname;
3207         struct mgs_srpc_read_data  msrd;
3208         int                        rc;
3209         ENTRY;
3210
3211         /* construct log name */
3212         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3213         if (rc)
3214                 RETURN(rc);
3215
3216         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3217         LASSERT(ctxt != NULL);
3218
3219         if (mgs_log_is_empty(env, mgs, logname))
3220                 GOTO(out, rc = 0);
3221
3222         rc = llog_open(env, ctxt, &llh, NULL, logname,
3223                        LLOG_OPEN_EXISTS);
3224         if (rc < 0) {
3225                 if (rc == -ENOENT)
3226                         rc = 0;
3227                 GOTO(out, rc);
3228         }
3229
3230         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3231         if (rc)
3232                 GOTO(out_close, rc);
3233
3234         if (llog_get_size(llh) <= 1)
3235                 GOTO(out_close, rc = 0);
3236
3237         msrd.msrd_fsdb = fsdb;
3238         msrd.msrd_skip = 0;
3239
3240         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3241                           NULL);
3242
3243 out_close:
3244         llog_close(env, llh);
3245 out:
3246         llog_ctxt_put(ctxt);
3247         name_destroy(&logname);
3248
3249         if (rc)
3250                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3251         RETURN(rc);
3252 }
3253
3254 /* Permanent settings of all parameters by writing into the appropriate
3255  * configuration logs.
3256  * A parameter with null value ("<param>='\0'") means to erase it out of
3257  * the logs.
3258  */
3259 static int mgs_write_log_param(const struct lu_env *env,
3260                                struct mgs_device *mgs, struct fs_db *fsdb,
3261                                struct mgs_target_info *mti, char *ptr)
3262 {
3263         struct mgs_thread_info *mgi = mgs_env_info(env);
3264         char *logname;
3265         char *tmp;
3266         int rc = 0, rc2 = 0;
3267         ENTRY;
3268
3269         /* For various parameter settings, we have to figure out which logs
3270            care about them (e.g. both mdt and client for lov settings) */
3271         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3272
3273         /* The params are stored in MOUNT_DATA_FILE and modified via
3274            tunefs.lustre, or set using lctl conf_param */
3275
3276         /* Processed in lustre_start_mgc */
3277         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3278                 GOTO(end, rc);
3279
3280         /* Processed in ost/mdt */
3281         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3282                 GOTO(end, rc);
3283
3284         /* Processed in mgs_write_log_ost */
3285         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3286                 if (mti->mti_flags & LDD_F_PARAM) {
3287                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3288                                            "changed with tunefs.lustre"
3289                                            "and --writeconf\n", ptr);
3290                         rc = -EPERM;
3291                 }
3292                 GOTO(end, rc);
3293         }
3294
3295         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3296                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3297                 GOTO(end, rc);
3298         }
3299
3300         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3301                 /* Add a failover nidlist */
3302                 rc = 0;
3303                 /* We already processed failovers params for new
3304                    targets in mgs_write_log_target */
3305                 if (mti->mti_flags & LDD_F_PARAM) {
3306                         CDEBUG(D_MGS, "Adding failnode\n");
3307                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3308                 }
3309                 GOTO(end, rc);
3310         }
3311
3312         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3313                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3314                 GOTO(end, rc);
3315         }
3316
3317         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3318                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3319                 GOTO(end, rc);
3320         }
3321
3322         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3323                 /* active=0 means off, anything else means on */
3324                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3325                 int i;
3326
3327                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3328                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3329                                            "be (de)activated.\n",
3330                                            mti->mti_svname);
3331                         GOTO(end, rc = -EINVAL);
3332                 }
3333                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3334                               flag ? "de": "re", mti->mti_svname);
3335                 /* Modify clilov */
3336                 rc = name_create(&logname, mti->mti_fsname, "-client");
3337                 if (rc)
3338                         GOTO(end, rc);
3339                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3340                                 mti->mti_svname, "add osc", flag);
3341                 name_destroy(&logname);
3342                 if (rc)
3343                         goto active_err;
3344                 /* Modify mdtlov */
3345                 /* Add to all MDT logs for CMD */
3346                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3347                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3348                                 continue;
3349                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3350                         if (rc)
3351                                 GOTO(end, rc);
3352                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3353                                         mti->mti_svname, "add osc", flag);
3354                         name_destroy(&logname);
3355                         if (rc)
3356                                 goto active_err;
3357                 }
3358         active_err:
3359                 if (rc) {
3360                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3361                                            "log (%d). No permanent "
3362                                            "changes were made to the "
3363                                            "config log.\n",
3364                                            mti->mti_svname, rc);
3365                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3366                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3367                                                    " because the log"
3368                                                    "is in the old 1.4"
3369                                                    "style. Consider "
3370                                                    " --writeconf to "
3371                                                    "update the logs.\n");
3372                         GOTO(end, rc);
3373                 }
3374                 /* Fall through to osc proc for deactivating live OSC
3375                    on running MDT / clients. */
3376         }
3377         /* Below here, let obd's XXX_process_config methods handle it */
3378
3379         /* All lov. in proc */
3380         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3381                 char *mdtlovname;
3382
3383                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3384                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3385                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3386                                            "set on the MDT, not %s. "
3387                                            "Ignoring.\n",
3388                                            mti->mti_svname);
3389                         GOTO(end, rc = 0);
3390                 }
3391
3392                 /* Modify mdtlov */
3393                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3394                         GOTO(end, rc = -ENODEV);
3395
3396                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3397                                              mti->mti_stripe_index);
3398                 if (rc)
3399                         GOTO(end, rc);
3400                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3401                                   &mgi->mgi_bufs, mdtlovname, ptr);
3402                 name_destroy(&logname);
3403                 name_destroy(&mdtlovname);
3404                 if (rc)
3405                         GOTO(end, rc);
3406
3407                 /* Modify clilov */
3408                 rc = name_create(&logname, mti->mti_fsname, "-client");
3409                 if (rc)
3410                         GOTO(end, rc);
3411                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3412                                   fsdb->fsdb_clilov, ptr);
3413                 name_destroy(&logname);
3414                 GOTO(end, rc);
3415         }
3416
3417         /* All osc., mdc., llite. params in proc */
3418         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3419             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3420             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3421                 char *cname;
3422
3423                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3424                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3425                                            " cannot be modified. Consider"
3426                                            " updating the configuration with"
3427                                            " --writeconf\n",
3428                                            mti->mti_svname);
3429                         GOTO(end, rc = -EINVAL);
3430                 }
3431                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3432                         rc = name_create(&cname, mti->mti_fsname, "-client");
3433                         /* Add the client type to match the obdname in
3434                            class_config_llog_handler */
3435                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3436                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3437                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3438                         rc = name_create(&cname, mti->mti_svname, "-osc");
3439                 } else {
3440                         GOTO(end, rc = -EINVAL);
3441                 }
3442                 if (rc)
3443                         GOTO(end, rc);
3444
3445                 /* Forbid direct update of llite root squash parameters.
3446                  * These parameters are indirectly set via the MDT settings.
3447                  * See (LU-1778) */
3448                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3449                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3450                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3451                         LCONSOLE_ERROR("%s: root squash parameters can only "
3452                                 "be updated through MDT component\n",
3453                                 mti->mti_fsname);
3454                         name_destroy(&cname);
3455                         GOTO(end, rc = -EINVAL);
3456                 }
3457
3458                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3459
3460                 /* Modify client */
3461                 rc = name_create(&logname, mti->mti_fsname, "-client");
3462                 if (rc) {
3463                         name_destroy(&cname);
3464                         GOTO(end, rc);
3465                 }
3466                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3467                                   cname, ptr);
3468
3469                 /* osc params affect the MDT as well */
3470                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3471                         int i;
3472
3473                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3474                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3475                                         continue;
3476                                 name_destroy(&cname);
3477                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3478                                                          fsdb, i);
3479                                 name_destroy(&logname);
3480                                 if (rc)
3481                                         break;
3482                                 rc = name_create_mdt(&logname,
3483                                                      mti->mti_fsname, i);
3484                                 if (rc)
3485                                         break;
3486                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3487                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3488                                                           mti, logname,
3489                                                           &mgi->mgi_bufs,
3490                                                           cname, ptr);
3491                                         if (rc)
3492                                                 break;
3493                                 }
3494                         }
3495                 }
3496                 name_destroy(&logname);
3497                 name_destroy(&cname);
3498                 GOTO(end, rc);
3499         }
3500
3501         /* All mdt. params in proc */
3502         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3503                 int i;
3504                 __u32 idx;
3505
3506                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3507                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3508                             MTI_NAME_MAXLEN) == 0)
3509                         /* device is unspecified completely? */
3510                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3511                 else
3512                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3513                 if (rc < 0)
3514                         goto active_err;
3515                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3516                         goto active_err;
3517                 if (rc & LDD_F_SV_ALL) {
3518                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3519                                 if (!test_bit(i,
3520                                                   fsdb->fsdb_mdt_index_map))
3521                                         continue;
3522                                 rc = name_create_mdt(&logname,
3523                                                 mti->mti_fsname, i);
3524                                 if (rc)
3525                                         goto active_err;
3526                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3527                                                   logname, &mgi->mgi_bufs,
3528                                                   logname, ptr);
3529                                 name_destroy(&logname);
3530                                 if (rc)
3531                                         goto active_err;
3532                         }
3533                 } else {
3534                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3535                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3536                                 LCONSOLE_ERROR("%s: root squash parameters "
3537                                         "cannot be applied to a single MDT\n",
3538                                         mti->mti_fsname);
3539                                 GOTO(end, rc = -EINVAL);
3540                         }
3541                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3542                                           mti->mti_svname, &mgi->mgi_bufs,
3543                                           mti->mti_svname, ptr);
3544                         if (rc)
3545                                 goto active_err;
3546                 }
3547
3548                 /* root squash settings are also applied to llite
3549                  * config log (see LU-1778) */
3550                 if (rc == 0 &&
3551                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3552                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3553                         char *cname;
3554                         char *ptr2;
3555
3556                         rc = name_create(&cname, mti->mti_fsname, "-client");
3557                         if (rc)
3558                                 GOTO(end, rc);
3559                         rc = name_create(&logname, mti->mti_fsname, "-client");
3560                         if (rc) {
3561                                 name_destroy(&cname);
3562                                 GOTO(end, rc);
3563                         }
3564                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3565                         if (rc) {
3566                                 name_destroy(&cname);
3567                                 name_destroy(&logname);
3568                                 GOTO(end, rc);
3569                         }
3570                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3571                                           &mgi->mgi_bufs, cname, ptr2);
3572                         name_destroy(&ptr2);
3573                         name_destroy(&logname);
3574                         name_destroy(&cname);
3575                 }
3576                 GOTO(end, rc);
3577         }
3578
3579         /* All mdd., ost. and osd. params in proc */
3580         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3581             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3582             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3583                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3584                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3585                         GOTO(end, rc = -ENODEV);
3586
3587                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3588                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3589                 GOTO(end, rc);
3590         }
3591
3592         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3593         rc2 = -ENOSYS;
3594
3595 end:
3596         if (rc)
3597                 CERROR("err %d on param '%s'\n", rc, ptr);
3598
3599         RETURN(rc ?: rc2);
3600 }
3601
3602 /* Not implementing automatic failover nid addition at this time. */
3603 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3604                       struct mgs_target_info *mti)
3605 {
3606 #if 0
3607         struct fs_db *fsdb;
3608         int rc;
3609         ENTRY;
3610
3611         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3612         if (rc)
3613                 RETURN(rc);
3614
3615         if (mgs_log_is_empty(obd, mti->mti_svname))
3616                 /* should never happen */
3617                 RETURN(-ENOENT);
3618
3619         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3620
3621         /* FIXME We can just check mti->params to see if we're already in
3622            the failover list.  Modify mti->params for rewriting back at
3623            server_register_target(). */
3624
3625         mutex_lock(&fsdb->fsdb_mutex);
3626         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3627         mutex_unlock(&fsdb->fsdb_mutex);
3628         char    *buf, *params;
3629         int      rc = -EINVAL;
3630
3631         RETURN(rc);
3632 #endif
3633         return 0;
3634 }
3635
3636 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3637                          struct mgs_target_info *mti, struct fs_db *fsdb)
3638 {
3639         char    *buf, *params;
3640         int      rc = -EINVAL;
3641
3642         ENTRY;
3643
3644         /* set/check the new target index */
3645         rc = mgs_set_index(env, mgs, mti);
3646         if (rc < 0)
3647                 RETURN(rc);
3648
3649         if (rc == EALREADY) {
3650                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3651                               mti->mti_stripe_index, mti->mti_svname);
3652                 /* We would like to mark old log sections as invalid
3653                    and add new log sections in the client and mdt logs.
3654                    But if we add new sections, then live clients will
3655                    get repeat setup instructions for already running
3656                    osc's. So don't update the client/mdt logs. */
3657                 mti->mti_flags &= ~LDD_F_UPDATE;
3658                 rc = 0;
3659         }
3660
3661         mutex_lock(&fsdb->fsdb_mutex);
3662
3663         if (mti->mti_flags &
3664             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3665                 /* Generate a log from scratch */
3666                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3667                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3668                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3669                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3670                 } else {
3671                         CERROR("Unknown target type %#x, can't create log for "
3672                                "%s\n", mti->mti_flags, mti->mti_svname);
3673                 }
3674                 if (rc) {
3675                         CERROR("Can't write logs for %s (%d)\n",
3676                                mti->mti_svname, rc);
3677                         GOTO(out_up, rc);
3678                 }
3679         } else {
3680                 /* Just update the params from tunefs in mgs_write_log_params */
3681                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3682                 mti->mti_flags |= LDD_F_PARAM;
3683         }
3684
3685         /* allocate temporary buffer, where class_get_next_param will
3686            make copy of a current  parameter */
3687         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3688         if (buf == NULL)
3689                 GOTO(out_up, rc = -ENOMEM);
3690         params = mti->mti_params;
3691         while (params != NULL) {
3692                 rc = class_get_next_param(&params, buf);
3693                 if (rc) {
3694                         if (rc == 1)
3695                                 /* there is no next parameter, that is
3696                                    not an error */
3697                                 rc = 0;
3698                         break;
3699                 }
3700                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3701                        params, buf);
3702                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3703                 if (rc)
3704                         break;
3705         }
3706
3707         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3708
3709 out_up:
3710         mutex_unlock(&fsdb->fsdb_mutex);
3711         RETURN(rc);
3712 }
3713
3714 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3715 {
3716         struct llog_ctxt        *ctxt;
3717         int                      rc = 0;
3718
3719         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3720         if (ctxt == NULL) {
3721                 CERROR("%s: MGS config context doesn't exist\n",
3722                        mgs->mgs_obd->obd_name);
3723                 rc = -ENODEV;
3724         } else {
3725                 rc = llog_erase(env, ctxt, NULL, name);
3726                 /* llog may not exist */
3727                 if (rc == -ENOENT)
3728                         rc = 0;
3729                 llog_ctxt_put(ctxt);
3730         }
3731
3732         if (rc)
3733                 CERROR("%s: failed to clear log %s: %d\n",
3734                        mgs->mgs_obd->obd_name, name, rc);
3735
3736         return rc;
3737 }
3738
3739 /* erase all logs for the given fs */
3740 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3741 {
3742         struct fs_db *fsdb;
3743         struct list_head log_list;
3744         struct mgs_direntry *dirent, *n;
3745         int rc, len = strlen(fsname);
3746         char *suffix;
3747         ENTRY;
3748
3749         /* Find all the logs in the CONFIGS directory */
3750         rc = class_dentry_readdir(env, mgs, &log_list);
3751         if (rc)
3752                 RETURN(rc);
3753
3754         mutex_lock(&mgs->mgs_mutex);
3755
3756         /* Delete the fs db */
3757         fsdb = mgs_find_fsdb(mgs, fsname);
3758         if (fsdb)
3759                 mgs_free_fsdb(mgs, fsdb);
3760
3761         mutex_unlock(&mgs->mgs_mutex);
3762
3763         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3764                 list_del_init(&dirent->mde_list);
3765                 suffix = strrchr(dirent->mde_name, '-');
3766                 if (suffix != NULL) {
3767                         if ((len == suffix - dirent->mde_name) &&
3768                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3769                                 CDEBUG(D_MGS, "Removing log %s\n",
3770                                        dirent->mde_name);
3771                                 mgs_erase_log(env, mgs, dirent->mde_name);
3772                         }
3773                 }
3774                 mgs_direntry_free(dirent);
3775         }
3776
3777         RETURN(rc);
3778 }
3779
3780 /* list all logs for the given fs */
3781 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3782                   struct obd_ioctl_data *data)
3783 {
3784         struct list_head         log_list;
3785         struct mgs_direntry     *dirent, *n;
3786         char                    *out, *suffix;
3787         int                      l, remains, rc;
3788
3789         ENTRY;
3790
3791         /* Find all the logs in the CONFIGS directory */
3792         rc = class_dentry_readdir(env, mgs, &log_list);
3793         if (rc)
3794                 RETURN(rc);
3795
3796         out = data->ioc_bulk;
3797         remains = data->ioc_inllen1;
3798         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3799                 list_del_init(&dirent->mde_list);
3800                 suffix = strrchr(dirent->mde_name, '-');
3801                 if (suffix != NULL) {
3802                         l = snprintf(out, remains, "config log: $%s\n",
3803                                      dirent->mde_name);
3804                         out += l;
3805                         remains -= l;
3806                 }
3807                 mgs_direntry_free(dirent);
3808                 if (remains <= 0)
3809                         break;
3810         }
3811         RETURN(rc);
3812 }
3813
3814 /* from llog_swab */
3815 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3816 {
3817         int i;
3818         ENTRY;
3819
3820         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3821         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3822
3823         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3824         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3825         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3826         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3827
3828         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3829         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3830                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3831                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3832                                i, lcfg->lcfg_buflens[i],
3833                                lustre_cfg_string(lcfg, i));
3834                 }
3835         EXIT;
3836 }
3837
3838 /* Setup _mgs fsdb and log
3839  */
3840 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3841                           struct fs_db *fsdb)
3842 {
3843         int                     rc;
3844         ENTRY;
3845
3846         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
3847
3848         RETURN(rc);
3849 }
3850
3851 /* Setup params fsdb and log
3852  */
3853 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3854                           struct fs_db *fsdb)
3855 {
3856         struct llog_handle      *params_llh = NULL;
3857         int                     rc;
3858         ENTRY;
3859
3860         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3861         if (fsdb != NULL) {
3862                 mutex_lock(&fsdb->fsdb_mutex);
3863                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3864                 if (rc == 0)
3865                         rc = record_end_log(env, &params_llh);
3866                 mutex_unlock(&fsdb->fsdb_mutex);
3867         }
3868
3869         RETURN(rc);
3870 }
3871
3872 /* Cleanup params fsdb and log
3873  */
3874 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3875 {
3876         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3877 }
3878
3879 /* Set a permanent (config log) param for a target or fs
3880  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3881  *             buf1 contains the single parameter
3882  */
3883 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3884                  struct lustre_cfg *lcfg, char *fsname)
3885 {
3886         struct fs_db *fsdb;
3887         struct mgs_target_info *mti;
3888         char *devname, *param;
3889         char *ptr;
3890         const char *tmp;
3891         __u32 index;
3892         int rc = 0;
3893         ENTRY;
3894
3895         print_lustre_cfg(lcfg);
3896
3897         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3898         devname = lustre_cfg_string(lcfg, 0);
3899         param = lustre_cfg_string(lcfg, 1);
3900         if (!devname) {
3901                 /* Assume device name embedded in param:
3902                    lustre-OST0000.osc.max_dirty_mb=32 */
3903                 ptr = strchr(param, '.');
3904                 if (ptr) {
3905                         devname = param;
3906                         *ptr = 0;
3907                         param = ptr + 1;
3908                 }
3909         }
3910         if (!devname) {
3911                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3912                 RETURN(-ENOSYS);
3913         }
3914
3915         rc = mgs_parse_devname(devname, fsname, NULL);
3916         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3917                 /* param related to llite isn't allowed to set by OST or MDT */
3918                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3919                                        sizeof(PARAM_LLITE) - 1) == 0)
3920                         RETURN(-EINVAL);
3921         } else {
3922                 /* assume devname is the fsname */
3923                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
3924         }
3925         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3926
3927         rc = mgs_find_or_make_fsdb(env, mgs,
3928                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3929                                    PARAMS_FILENAME : fsname, &fsdb);
3930         if (rc)
3931                 RETURN(rc);
3932
3933         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3934             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3935             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3936                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3937                        "is '%s'\n", fsname, devname);
3938                 mgs_free_fsdb(mgs, fsdb);
3939                 RETURN(-EINVAL);
3940         }
3941
3942         /* Create a fake mti to hold everything */
3943         OBD_ALLOC_PTR(mti);
3944         if (!mti)
3945                 GOTO(out, rc = -ENOMEM);
3946         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3947             >= sizeof(mti->mti_fsname))
3948                 GOTO(out, rc = -E2BIG);
3949         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3950             >= sizeof(mti->mti_svname))
3951                 GOTO(out, rc = -E2BIG);
3952         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3953             >= sizeof(mti->mti_params))
3954                 GOTO(out, rc = -E2BIG);
3955         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3956         if (rc < 0)
3957                 /* Not a valid server; may be only fsname */
3958                 rc = 0;
3959         else
3960                 /* Strip -osc or -mdc suffix from svname */
3961                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3962                                      mti->mti_svname))
3963                         GOTO(out, rc = -EINVAL);
3964         /*
3965          * Revoke lock so everyone updates.  Should be alright if
3966          * someone was already reading while we were updating the logs,
3967          * so we don't really need to hold the lock while we're
3968          * writing (above).
3969          */
3970         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3971                 mti->mti_flags = rc | LDD_F_PARAM2;
3972                 mutex_lock(&fsdb->fsdb_mutex);
3973                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3974                 mutex_unlock(&fsdb->fsdb_mutex);
3975                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3976         } else {
3977                 mti->mti_flags = rc | LDD_F_PARAM;
3978                 mutex_lock(&fsdb->fsdb_mutex);
3979                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3980                 mutex_unlock(&fsdb->fsdb_mutex);
3981                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3982         }
3983
3984 out:
3985         OBD_FREE_PTR(mti);
3986         RETURN(rc);
3987 }
3988
3989 static int mgs_write_log_pool(const struct lu_env *env,
3990                               struct mgs_device *mgs, char *logname,
3991                               struct fs_db *fsdb, char *tgtname,
3992                               enum lcfg_command_type cmd,
3993                               char *fsname, char *poolname,
3994                               char *ostname, char *comment)
3995 {
3996         struct llog_handle *llh = NULL;
3997         int rc;
3998
3999         rc = record_start_log(env, mgs, &llh, logname);
4000         if (rc)
4001                 return rc;
4002         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
4003         if (rc)
4004                 goto out;
4005         rc = record_base(env, llh, tgtname, 0, cmd,
4006                          fsname, poolname, ostname, NULL);
4007         if (rc)
4008                 goto out;
4009         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
4010 out:
4011         record_end_log(env, &llh);
4012         return rc;
4013 }
4014
4015 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
4016                     enum lcfg_command_type cmd, const char *nodemap_name,
4017                     char *param)
4018 {
4019         lnet_nid_t      nid[2];
4020         __u32           idmap[2];
4021         bool            bool_switch;
4022         __u32           int_id;
4023         int             rc = 0;
4024         ENTRY;
4025
4026         switch (cmd) {
4027         case LCFG_NODEMAP_ADD:
4028                 rc = nodemap_add(nodemap_name);
4029                 break;
4030         case LCFG_NODEMAP_DEL:
4031                 rc = nodemap_del(nodemap_name);
4032                 break;
4033         case LCFG_NODEMAP_ADD_RANGE:
4034                 rc = nodemap_parse_range(param, nid);
4035                 if (rc != 0)
4036                         break;
4037                 rc = nodemap_add_range(nodemap_name, nid);
4038                 break;
4039         case LCFG_NODEMAP_DEL_RANGE:
4040                 rc = nodemap_parse_range(param, nid);
4041                 if (rc != 0)
4042                         break;
4043                 rc = nodemap_del_range(nodemap_name, nid);
4044                 break;
4045         case LCFG_NODEMAP_ADMIN:
4046                 bool_switch = simple_strtoul(param, NULL, 10);
4047                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4048                 break;
4049         case LCFG_NODEMAP_TRUSTED:
4050                 bool_switch = simple_strtoul(param, NULL, 10);
4051                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4052                 break;
4053         case LCFG_NODEMAP_SQUASH_UID:
4054                 int_id = simple_strtoul(param, NULL, 10);
4055                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4056                 break;
4057         case LCFG_NODEMAP_SQUASH_GID:
4058                 int_id = simple_strtoul(param, NULL, 10);
4059                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4060                 break;
4061         case LCFG_NODEMAP_ADD_UIDMAP:
4062         case LCFG_NODEMAP_ADD_GIDMAP:
4063                 rc = nodemap_parse_idmap(param, idmap);
4064                 if (rc != 0)
4065                         break;
4066                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4067                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4068                                                idmap);
4069                 else
4070                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4071                                                idmap);
4072                 break;
4073         case LCFG_NODEMAP_DEL_UIDMAP:
4074         case LCFG_NODEMAP_DEL_GIDMAP:
4075                 rc = nodemap_parse_idmap(param, idmap);
4076                 if (rc != 0)
4077                         break;
4078                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4079                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4080                                                idmap);
4081                 else
4082                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4083                                                idmap);
4084                 break;
4085         default:
4086                 rc = -EINVAL;
4087         }
4088
4089         RETURN(rc);
4090 }
4091
4092 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4093                  enum lcfg_command_type cmd, char *fsname,
4094                  char *poolname, char *ostname)
4095 {
4096         struct fs_db *fsdb;
4097         char *lovname;
4098         char *logname;
4099         char *label = NULL, *canceled_label = NULL;
4100         int label_sz;
4101         struct mgs_target_info *mti = NULL;
4102         int rc, i;
4103         ENTRY;
4104
4105         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4106         if (rc) {
4107                 CERROR("Can't get db for %s\n", fsname);
4108                 RETURN(rc);
4109         }
4110         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4111                 CERROR("%s is not defined\n", fsname);
4112                 mgs_free_fsdb(mgs, fsdb);
4113                 RETURN(-EINVAL);
4114         }
4115
4116         label_sz = 10 + strlen(fsname) + strlen(poolname);
4117
4118         /* check if ostname match fsname */
4119         if (ostname != NULL) {
4120                 char *ptr;
4121
4122                 ptr = strrchr(ostname, '-');
4123                 if ((ptr == NULL) ||
4124                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4125                         RETURN(-EINVAL);
4126                 label_sz += strlen(ostname);
4127         }
4128
4129         OBD_ALLOC(label, label_sz);
4130         if (label == NULL)
4131                 RETURN(-ENOMEM);
4132
4133         switch(cmd) {
4134         case LCFG_POOL_NEW:
4135                 sprintf(label,
4136                         "new %s.%s", fsname, poolname);
4137                 break;
4138         case LCFG_POOL_ADD:
4139                 sprintf(label,
4140                         "add %s.%s.%s", fsname, poolname, ostname);
4141                 break;
4142         case LCFG_POOL_REM:
4143                 OBD_ALLOC(canceled_label, label_sz);
4144                 if (canceled_label == NULL)
4145                         GOTO(out_label, rc = -ENOMEM);
4146                 sprintf(label,
4147                         "rem %s.%s.%s", fsname, poolname, ostname);
4148                 sprintf(canceled_label,
4149                         "add %s.%s.%s", fsname, poolname, ostname);
4150                 break;
4151         case LCFG_POOL_DEL:
4152                 OBD_ALLOC(canceled_label, label_sz);
4153                 if (canceled_label == NULL)
4154                         GOTO(out_label, rc = -ENOMEM);
4155                 sprintf(label,
4156                         "del %s.%s", fsname, poolname);
4157                 sprintf(canceled_label,
4158                         "new %s.%s", fsname, poolname);
4159                 break;
4160         default:
4161                 break;
4162         }
4163
4164         if (canceled_label != NULL) {
4165                 OBD_ALLOC_PTR(mti);
4166                 if (mti == NULL)
4167                         GOTO(out_cancel, rc = -ENOMEM);
4168         }
4169
4170         mutex_lock(&fsdb->fsdb_mutex);
4171         /* write pool def to all MDT logs */
4172         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4173                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4174                         rc = name_create_mdt_and_lov(&logname, &lovname,
4175                                                      fsdb, i);
4176                         if (rc) {
4177                                 mutex_unlock(&fsdb->fsdb_mutex);
4178                                 GOTO(out_mti, rc);
4179                         }
4180                         if (canceled_label != NULL) {
4181                                 strcpy(mti->mti_svname, "lov pool");
4182                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4183                                                 lovname, canceled_label,
4184                                                 CM_SKIP);
4185                         }
4186
4187                         if (rc >= 0)
4188                                 rc = mgs_write_log_pool(env, mgs, logname,
4189                                                         fsdb, lovname, cmd,
4190                                                         fsname, poolname,
4191                                                         ostname, label);
4192                         name_destroy(&logname);
4193                         name_destroy(&lovname);
4194                         if (rc) {
4195                                 mutex_unlock(&fsdb->fsdb_mutex);
4196                                 GOTO(out_mti, rc);
4197                         }
4198                 }
4199         }
4200
4201         rc = name_create(&logname, fsname, "-client");
4202         if (rc) {
4203                 mutex_unlock(&fsdb->fsdb_mutex);
4204                 GOTO(out_mti, rc);
4205         }
4206         if (canceled_label != NULL) {
4207                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4208                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4209                 if (rc < 0) {
4210                         mutex_unlock(&fsdb->fsdb_mutex);
4211                         name_destroy(&logname);
4212                         GOTO(out_mti, rc);
4213                 }
4214         }
4215
4216         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4217                                 cmd, fsname, poolname, ostname, label);
4218         mutex_unlock(&fsdb->fsdb_mutex);
4219         name_destroy(&logname);
4220         /* request for update */
4221         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4222
4223         EXIT;
4224 out_mti:
4225         if (mti != NULL)
4226                 OBD_FREE_PTR(mti);
4227 out_cancel:
4228         if (canceled_label != NULL)
4229                 OBD_FREE(canceled_label, label_sz);
4230 out_label:
4231         OBD_FREE(label, label_sz);
4232         return rc;
4233 }