Whamcloud - gitweb
LU-1778 llite: fix inconsistencies of root squash feature
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         cfs_list_t *tmp;
318
319         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         cfs_list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         cfs_list_t *tmp, *tmp2;
417         mutex_lock(&mgs->mgs_mutex);
418         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
419                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
420                 mgs_free_fsdb(mgs, fsdb);
421         }
422         mutex_unlock(&mgs->mgs_mutex);
423         return 0;
424 }
425
426 int mgs_find_or_make_fsdb(const struct lu_env *env,
427                           struct mgs_device *mgs, char *name,
428                           struct fs_db **dbh)
429 {
430         struct fs_db *fsdb;
431         int rc = 0;
432
433         ENTRY;
434         mutex_lock(&mgs->mgs_mutex);
435         fsdb = mgs_find_fsdb(mgs, name);
436         if (fsdb) {
437                 mutex_unlock(&mgs->mgs_mutex);
438                 *dbh = fsdb;
439                 RETURN(0);
440         }
441
442         CDEBUG(D_MGS, "Creating new db\n");
443         fsdb = mgs_new_fsdb(env, mgs, name);
444         /* lock fsdb_mutex until the db is loaded from llogs */
445         if (fsdb)
446                 mutex_lock(&fsdb->fsdb_mutex);
447         mutex_unlock(&mgs->mgs_mutex);
448         if (!fsdb)
449                 RETURN(-ENOMEM);
450
451         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
452                 /* populate the db from the client llog */
453                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
454                 if (rc) {
455                         CERROR("Can't get db from client log %d\n", rc);
456                         GOTO(out_free, rc);
457                 }
458         }
459
460         /* populate srpc rules from params llog */
461         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
462         if (rc) {
463                 CERROR("Can't get db from params log %d\n", rc);
464                 GOTO(out_free, rc);
465         }
466
467         mutex_unlock(&fsdb->fsdb_mutex);
468         *dbh = fsdb;
469
470         RETURN(0);
471
472 out_free:
473         mutex_unlock(&fsdb->fsdb_mutex);
474         mgs_free_fsdb(mgs, fsdb);
475         return rc;
476 }
477
478 /* 1 = index in use
479    0 = index unused
480    -1= empty client log */
481 int mgs_check_index(const struct lu_env *env,
482                     struct mgs_device *mgs,
483                     struct mgs_target_info *mti)
484 {
485         struct fs_db *fsdb;
486         void *imap;
487         int rc = 0;
488         ENTRY;
489
490         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
491
492         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
493         if (rc) {
494                 CERROR("Can't get db for %s\n", mti->mti_fsname);
495                 RETURN(rc);
496         }
497
498         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
499                 RETURN(-1);
500
501         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
502                 imap = fsdb->fsdb_ost_index_map;
503         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
504                 imap = fsdb->fsdb_mdt_index_map;
505         else
506                 RETURN(-EINVAL);
507
508         if (test_bit(mti->mti_stripe_index, imap))
509                 RETURN(1);
510         RETURN(0);
511 }
512
513 static __inline__ int next_index(void *index_map, int map_len)
514 {
515         int i;
516         for (i = 0; i < map_len * 8; i++)
517                 if (!test_bit(i, index_map)) {
518                          return i;
519                  }
520         CERROR("max index %d exceeded.\n", i);
521         return -1;
522 }
523
524 /* Return codes:
525         0  newly marked as in use
526         <0 err
527         +EALREADY for update of an old index */
528 static int mgs_set_index(const struct lu_env *env,
529                          struct mgs_device *mgs,
530                          struct mgs_target_info *mti)
531 {
532         struct fs_db *fsdb;
533         void *imap;
534         int rc = 0;
535         ENTRY;
536
537         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
538         if (rc) {
539                 CERROR("Can't get db for %s\n", mti->mti_fsname);
540                 RETURN(rc);
541         }
542
543         mutex_lock(&fsdb->fsdb_mutex);
544         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
545                 imap = fsdb->fsdb_ost_index_map;
546         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
547                 imap = fsdb->fsdb_mdt_index_map;
548         } else {
549                 GOTO(out_up, rc = -EINVAL);
550         }
551
552         if (mti->mti_flags & LDD_F_NEED_INDEX) {
553                 rc = next_index(imap, INDEX_MAP_SIZE);
554                 if (rc == -1)
555                         GOTO(out_up, rc = -ERANGE);
556                 mti->mti_stripe_index = rc;
557                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
558                         fsdb->fsdb_mdt_count ++;
559         }
560
561         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
562                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
563                                    "but the max index is %d.\n",
564                                    mti->mti_svname, mti->mti_stripe_index,
565                                    INDEX_MAP_SIZE * 8);
566                 GOTO(out_up, rc = -ERANGE);
567         }
568
569         if (test_bit(mti->mti_stripe_index, imap)) {
570                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
571                     !(mti->mti_flags & LDD_F_WRITECONF)) {
572                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
573                                            "%d, but that index is already in "
574                                            "use. Use --writeconf to force\n",
575                                            mti->mti_svname,
576                                            mti->mti_stripe_index);
577                         GOTO(out_up, rc = -EADDRINUSE);
578                 } else {
579                         CDEBUG(D_MGS, "Server %s updating index %d\n",
580                                mti->mti_svname, mti->mti_stripe_index);
581                         GOTO(out_up, rc = EALREADY);
582                 }
583         }
584
585         set_bit(mti->mti_stripe_index, imap);
586         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
587         mutex_unlock(&fsdb->fsdb_mutex);
588         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
589                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
590
591         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
592                mti->mti_stripe_index);
593
594         RETURN(0);
595 out_up:
596         mutex_unlock(&fsdb->fsdb_mutex);
597         return rc;
598 }
599
600 struct mgs_modify_lookup {
601         struct cfg_marker mml_marker;
602         int               mml_modified;
603 };
604
605 static int mgs_modify_handler(const struct lu_env *env,
606                               struct llog_handle *llh,
607                               struct llog_rec_hdr *rec, void *data)
608 {
609         struct mgs_modify_lookup *mml = data;
610         struct cfg_marker *marker;
611         struct lustre_cfg *lcfg = REC_DATA(rec);
612         int cfg_len = REC_DATA_LEN(rec);
613         int rc;
614         ENTRY;
615
616         if (rec->lrh_type != OBD_CFG_REC) {
617                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
618                 RETURN(-EINVAL);
619         }
620
621         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
622         if (rc) {
623                 CERROR("Insane cfg\n");
624                 RETURN(rc);
625         }
626
627         /* We only care about markers */
628         if (lcfg->lcfg_command != LCFG_MARKER)
629                 RETURN(0);
630
631         marker = lustre_cfg_buf(lcfg, 1);
632         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
633             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
634             !(marker->cm_flags & CM_SKIP)) {
635                 /* Found a non-skipped marker match */
636                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
637                        rec->lrh_index, marker->cm_step,
638                        marker->cm_flags, mml->mml_marker.cm_flags,
639                        marker->cm_tgtname, marker->cm_comment);
640                 /* Overwrite the old marker llog entry */
641                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
642                 marker->cm_flags |= mml->mml_marker.cm_flags;
643                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
644                 rc = llog_write(env, llh, rec, rec->lrh_index);
645                 if (!rc)
646                          mml->mml_modified++;
647         }
648
649         RETURN(rc);
650 }
651
652 /**
653  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
654  * Return code:
655  * 0 - modified successfully,
656  * 1 - no modification was done
657  * negative - error
658  */
659 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
660                       struct fs_db *fsdb, struct mgs_target_info *mti,
661                       char *logname, char *devname, char *comment, int flags)
662 {
663         struct llog_handle *loghandle;
664         struct llog_ctxt *ctxt;
665         struct mgs_modify_lookup *mml;
666         int rc;
667
668         ENTRY;
669
670         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
671         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
672                flags);
673
674         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
675         LASSERT(ctxt != NULL);
676         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
677         if (rc < 0) {
678                 if (rc == -ENOENT)
679                         rc = 0;
680                 GOTO(out_pop, rc);
681         }
682
683         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
684         if (rc)
685                 GOTO(out_close, rc);
686
687         if (llog_get_size(loghandle) <= 1)
688                 GOTO(out_close, rc = 0);
689
690         OBD_ALLOC_PTR(mml);
691         if (!mml)
692                 GOTO(out_close, rc = -ENOMEM);
693         if (strlcpy(mml->mml_marker.cm_comment, comment,
694                     sizeof(mml->mml_marker.cm_comment)) >=
695             sizeof(mml->mml_marker.cm_comment))
696                 GOTO(out_free, rc = -E2BIG);
697         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
698                     sizeof(mml->mml_marker.cm_tgtname)) >=
699             sizeof(mml->mml_marker.cm_tgtname))
700                 GOTO(out_free, rc = -E2BIG);
701         /* Modify mostly means cancel */
702         mml->mml_marker.cm_flags = flags;
703         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
704         mml->mml_modified = 0;
705         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
706                           NULL);
707         if (!rc && !mml->mml_modified)
708                 rc = 1;
709
710 out_free:
711         OBD_FREE_PTR(mml);
712
713 out_close:
714         llog_close(env, loghandle);
715 out_pop:
716         if (rc < 0)
717                 CERROR("%s: modify %s/%s failed: rc = %d\n",
718                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
719         llog_ctxt_put(ctxt);
720         RETURN(rc);
721 }
722
723 /** This structure is passed to mgs_replace_handler */
724 struct mgs_replace_uuid_lookup {
725         /* Nids are replaced for this target device */
726         struct mgs_target_info target;
727         /* Temporary modified llog */
728         struct llog_handle *temp_llh;
729         /* Flag is set if in target block*/
730         int in_target_device;
731         /* Nids already added. Just skip (multiple nids) */
732         int device_nids_added;
733         /* Flag is set if this block should not be copied */
734         int skip_it;
735 };
736
737 /**
738  * Check: a) if block should be skipped
739  * b) is it target block
740  *
741  * \param[in] lcfg
742  * \param[in] mrul
743  *
744  * \retval 0 should not to be skipped
745  * \retval 1 should to be skipped
746  */
747 static int check_markers(struct lustre_cfg *lcfg,
748                          struct mgs_replace_uuid_lookup *mrul)
749 {
750          struct cfg_marker *marker;
751
752         /* Track markers. Find given device */
753         if (lcfg->lcfg_command == LCFG_MARKER) {
754                 marker = lustre_cfg_buf(lcfg, 1);
755                 /* Clean llog from records marked as CM_EXCLUDE.
756                    CM_SKIP records are used for "active" command
757                    and can be restored if needed */
758                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
759                     (CM_EXCLUDE | CM_START)) {
760                         mrul->skip_it = 1;
761                         return 1;
762                 }
763
764                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
765                     (CM_EXCLUDE | CM_END)) {
766                         mrul->skip_it = 0;
767                         return 1;
768                 }
769
770                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
771                         LASSERT(!(marker->cm_flags & CM_START) ||
772                                 !(marker->cm_flags & CM_END));
773                         if (marker->cm_flags & CM_START) {
774                                 mrul->in_target_device = 1;
775                                 mrul->device_nids_added = 0;
776                         } else if (marker->cm_flags & CM_END)
777                                 mrul->in_target_device = 0;
778                 }
779         }
780
781         return 0;
782 }
783
784 static int record_base(const struct lu_env *env, struct llog_handle *llh,
785                      char *cfgname, lnet_nid_t nid, int cmd,
786                      char *s1, char *s2, char *s3, char *s4)
787 {
788         struct mgs_thread_info  *mgi = mgs_env_info(env);
789         struct llog_cfg_rec     *lcr;
790         int rc;
791
792         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
793                cmd, s1, s2, s3, s4);
794
795         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
796         if (s1)
797                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
798         if (s2)
799                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
800         if (s3)
801                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
802         if (s4)
803                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
804
805         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
806         if (lcr == NULL)
807                 return -ENOMEM;
808
809         lcr->lcr_cfg.lcfg_nid = nid;
810         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
811
812         lustre_cfg_rec_free(lcr);
813
814         if (rc < 0)
815                 CDEBUG(D_MGS,
816                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
817                        cfgname, cmd, s1, s2, s3, s4, rc);
818         return rc;
819 }
820
821 static inline int record_add_uuid(const struct lu_env *env,
822                                   struct llog_handle *llh,
823                                   uint64_t nid, char *uuid)
824 {
825         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
826 }
827
828 static inline int record_add_conn(const struct lu_env *env,
829                                   struct llog_handle *llh,
830                                   char *devname, char *uuid)
831 {
832         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
833 }
834
835 static inline int record_attach(const struct lu_env *env,
836                                 struct llog_handle *llh, char *devname,
837                                 char *type, char *uuid)
838 {
839         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
840 }
841
842 static inline int record_setup(const struct lu_env *env,
843                                struct llog_handle *llh, char *devname,
844                                char *s1, char *s2, char *s3, char *s4)
845 {
846         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
847 }
848
849 /**
850  * \retval <0 record processing error
851  * \retval n record is processed. No need copy original one.
852  * \retval 0 record is not processed.
853  */
854 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
855                            struct mgs_replace_uuid_lookup *mrul)
856 {
857         int nids_added = 0;
858         lnet_nid_t nid;
859         char *ptr;
860         int rc;
861
862         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
863                 /* LCFG_ADD_UUID command found. Let's skip original command
864                    and add passed nids */
865                 ptr = mrul->target.mti_params;
866                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
867                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
868                                "device %s\n", libcfs_nid2str(nid),
869                                 mrul->target.mti_params,
870                                 mrul->target.mti_svname);
871                         rc = record_add_uuid(env,
872                                              mrul->temp_llh, nid,
873                                              mrul->target.mti_params);
874                         if (!rc)
875                                 nids_added++;
876                 }
877
878                 if (nids_added == 0) {
879                         CERROR("No new nids were added, nid %s with uuid %s, "
880                                "device %s\n", libcfs_nid2str(nid),
881                                mrul->target.mti_params,
882                                mrul->target.mti_svname);
883                         RETURN(-ENXIO);
884                 } else {
885                         mrul->device_nids_added = 1;
886                 }
887
888                 return nids_added;
889         }
890
891         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
892                 /* LCFG_SETUP command found. UUID should be changed */
893                 rc = record_setup(env,
894                                   mrul->temp_llh,
895                                   /* devname the same */
896                                   lustre_cfg_string(lcfg, 0),
897                                   /* s1 is not changed */
898                                   lustre_cfg_string(lcfg, 1),
899                                   /* new uuid should be
900                                   the full nidlist */
901                                   mrul->target.mti_params,
902                                   /* s3 is not changed */
903                                   lustre_cfg_string(lcfg, 3),
904                                   /* s4 is not changed */
905                                   lustre_cfg_string(lcfg, 4));
906                 return rc ? rc : 1;
907         }
908
909         /* Another commands in target device block */
910         return 0;
911 }
912
913 /**
914  * Handler that called for every record in llog.
915  * Records are processed in order they placed in llog.
916  *
917  * \param[in] llh       log to be processed
918  * \param[in] rec       current record
919  * \param[in] data      mgs_replace_uuid_lookup structure
920  *
921  * \retval 0    success
922  */
923 static int mgs_replace_handler(const struct lu_env *env,
924                                struct llog_handle *llh,
925                                struct llog_rec_hdr *rec,
926                                void *data)
927 {
928         struct mgs_replace_uuid_lookup *mrul;
929         struct lustre_cfg *lcfg = REC_DATA(rec);
930         int cfg_len = REC_DATA_LEN(rec);
931         int rc;
932         ENTRY;
933
934         mrul = (struct mgs_replace_uuid_lookup *)data;
935
936         if (rec->lrh_type != OBD_CFG_REC) {
937                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
938                        rec->lrh_type, lcfg->lcfg_command,
939                        lustre_cfg_string(lcfg, 0),
940                        lustre_cfg_string(lcfg, 1));
941                 RETURN(-EINVAL);
942         }
943
944         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
945         if (rc) {
946                 /* Do not copy any invalidated records */
947                 GOTO(skip_out, rc = 0);
948         }
949
950         rc = check_markers(lcfg, mrul);
951         if (rc || mrul->skip_it)
952                 GOTO(skip_out, rc = 0);
953
954         /* Write to new log all commands outside target device block */
955         if (!mrul->in_target_device)
956                 GOTO(copy_out, rc = 0);
957
958         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
959            (failover nids) for this target, assuming that if then
960            primary is changing then so is the failover */
961         if (mrul->device_nids_added &&
962             (lcfg->lcfg_command == LCFG_ADD_UUID ||
963              lcfg->lcfg_command == LCFG_ADD_CONN))
964                 GOTO(skip_out, rc = 0);
965
966         rc = process_command(env, lcfg, mrul);
967         if (rc < 0)
968                 RETURN(rc);
969
970         if (rc)
971                 RETURN(0);
972 copy_out:
973         /* Record is placed in temporary llog as is */
974         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
975
976         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
977                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
978                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
979         RETURN(rc);
980
981 skip_out:
982         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
983                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
984                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
985         RETURN(rc);
986 }
987
988 static int mgs_log_is_empty(const struct lu_env *env,
989                             struct mgs_device *mgs, char *name)
990 {
991         struct llog_ctxt        *ctxt;
992         int                      rc;
993
994         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
995         LASSERT(ctxt != NULL);
996
997         rc = llog_is_empty(env, ctxt, name);
998         llog_ctxt_put(ctxt);
999         return rc;
1000 }
1001
1002 static int mgs_replace_nids_log(const struct lu_env *env,
1003                                 struct obd_device *mgs, struct fs_db *fsdb,
1004                                 char *logname, char *devname, char *nids)
1005 {
1006         struct llog_handle *orig_llh, *backup_llh;
1007         struct llog_ctxt *ctxt;
1008         struct mgs_replace_uuid_lookup *mrul;
1009         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1010         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1011         char *backup;
1012         int rc, rc2;
1013         ENTRY;
1014
1015         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1016
1017         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1018         LASSERT(ctxt != NULL);
1019
1020         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1021                 /* Log is empty. Nothing to replace */
1022                 GOTO(out_put, rc = 0);
1023         }
1024
1025         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1026         if (backup == NULL)
1027                 GOTO(out_put, rc = -ENOMEM);
1028
1029         sprintf(backup, "%s.bak", logname);
1030
1031         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1032         if (rc == 0) {
1033                 /* Now erase original log file. Connections are not allowed.
1034                    Backup is already saved */
1035                 rc = llog_erase(env, ctxt, NULL, logname);
1036                 if (rc < 0)
1037                         GOTO(out_free, rc);
1038         } else if (rc != -ENOENT) {
1039                 CERROR("%s: can't make backup for %s: rc = %d\n",
1040                        mgs->obd_name, logname, rc);
1041                 GOTO(out_free,rc);
1042         }
1043
1044         /* open local log */
1045         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1046         if (rc)
1047                 GOTO(out_restore, rc);
1048
1049         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1050         if (rc)
1051                 GOTO(out_closel, rc);
1052
1053         /* open backup llog */
1054         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1055                        LLOG_OPEN_EXISTS);
1056         if (rc)
1057                 GOTO(out_closel, rc);
1058
1059         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1060         if (rc)
1061                 GOTO(out_close, rc);
1062
1063         if (llog_get_size(backup_llh) <= 1)
1064                 GOTO(out_close, rc = 0);
1065
1066         OBD_ALLOC_PTR(mrul);
1067         if (!mrul)
1068                 GOTO(out_close, rc = -ENOMEM);
1069         /* devname is only needed information to replace UUID records */
1070         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1071         /* parse nids later */
1072         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1073         /* Copy records to this temporary llog */
1074         mrul->temp_llh = orig_llh;
1075
1076         rc = llog_process(env, backup_llh, mgs_replace_handler,
1077                           (void *)mrul, NULL);
1078         OBD_FREE_PTR(mrul);
1079 out_close:
1080         rc2 = llog_close(NULL, backup_llh);
1081         if (!rc)
1082                 rc = rc2;
1083 out_closel:
1084         rc2 = llog_close(NULL, orig_llh);
1085         if (!rc)
1086                 rc = rc2;
1087
1088 out_restore:
1089         if (rc) {
1090                 CERROR("%s: llog should be restored: rc = %d\n",
1091                        mgs->obd_name, rc);
1092                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1093                                   logname);
1094                 if (rc2 < 0)
1095                         CERROR("%s: can't restore backup %s: rc = %d\n",
1096                                mgs->obd_name, logname, rc2);
1097         }
1098
1099 out_free:
1100         OBD_FREE(backup, strlen(backup) + 1);
1101
1102 out_put:
1103         llog_ctxt_put(ctxt);
1104
1105         if (rc)
1106                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1107                        mgs->obd_name, logname, rc);
1108
1109         RETURN(rc);
1110 }
1111
1112 /**
1113  * Parse device name and get file system name and/or device index
1114  *
1115  * \param[in]   devname device name (ex. lustre-MDT0000)
1116  * \param[out]  fsname  file system name(optional)
1117  * \param[out]  index   device index(optional)
1118  *
1119  * \retval 0    success
1120  */
1121 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1122 {
1123         int rc;
1124         ENTRY;
1125
1126         /* Extract fsname */
1127         if (fsname) {
1128                 rc = server_name2fsname(devname, fsname, NULL);
1129                 if (rc < 0) {
1130                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1131                                devname);
1132                         RETURN(-EINVAL);
1133                 }
1134         }
1135
1136         if (index) {
1137                 rc = server_name2index(devname, index, NULL);
1138                 if (rc < 0) {
1139                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1140                                devname);
1141                         RETURN(-EINVAL);
1142                 }
1143         }
1144
1145         RETURN(0);
1146 }
1147
1148 /* This is only called during replace_nids */
1149 static int only_mgs_is_running(struct obd_device *mgs_obd)
1150 {
1151         /* TDB: Is global variable with devices count exists? */
1152         int num_devices = get_devices_count();
1153         int num_exports = 0;
1154         struct obd_export *exp;
1155
1156         spin_lock(&mgs_obd->obd_dev_lock);
1157         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1158                 /* skip self export */
1159                 if (exp == mgs_obd->obd_self_export)
1160                         continue;
1161                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1162                         continue;
1163
1164                 ++num_exports;
1165
1166                 CERROR("%s: node %s still connected during replace_nids "
1167                        "connect_flags:%llx\n",
1168                        mgs_obd->obd_name,
1169                        libcfs_nid2str(exp->exp_nid_stats->nid),
1170                        exp_connect_flags(exp));
1171
1172         }
1173         spin_unlock(&mgs_obd->obd_dev_lock);
1174
1175         /* osd, MGS and MGC + self_export
1176            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1177         return (num_devices <= 3) && (num_exports == 0);
1178 }
1179
1180 static int name_create_mdt(char **logname, char *fsname, int i)
1181 {
1182         char mdt_index[9];
1183
1184         sprintf(mdt_index, "-MDT%04x", i);
1185         return name_create(logname, fsname, mdt_index);
1186 }
1187
1188 /**
1189  * Replace nids for \a device to \a nids values
1190  *
1191  * \param obd           MGS obd device
1192  * \param devname       nids need to be replaced for this device
1193  * (ex. lustre-OST0000)
1194  * \param nids          nids list (ex. nid1,nid2,nid3)
1195  *
1196  * \retval 0    success
1197  */
1198 int mgs_replace_nids(const struct lu_env *env,
1199                      struct mgs_device *mgs,
1200                      char *devname, char *nids)
1201 {
1202         /* Assume fsname is part of device name */
1203         char fsname[MTI_NAME_MAXLEN];
1204         int rc;
1205         __u32 index;
1206         char *logname;
1207         struct fs_db *fsdb;
1208         unsigned int i;
1209         int conn_state;
1210         struct obd_device *mgs_obd = mgs->mgs_obd;
1211         ENTRY;
1212
1213         /* We can only change NIDs if no other nodes are connected */
1214         spin_lock(&mgs_obd->obd_dev_lock);
1215         conn_state = mgs_obd->obd_no_conn;
1216         mgs_obd->obd_no_conn = 1;
1217         spin_unlock(&mgs_obd->obd_dev_lock);
1218
1219         /* We can not change nids if not only MGS is started */
1220         if (!only_mgs_is_running(mgs_obd)) {
1221                 CERROR("Only MGS is allowed to be started\n");
1222                 GOTO(out, rc = -EINPROGRESS);
1223         }
1224
1225         /* Get fsname and index*/
1226         rc = mgs_parse_devname(devname, fsname, &index);
1227         if (rc)
1228                 GOTO(out, rc);
1229
1230         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1231         if (rc) {
1232                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1233                 GOTO(out, rc);
1234         }
1235
1236         /* Process client llogs */
1237         name_create(&logname, fsname, "-client");
1238         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1239         name_destroy(&logname);
1240         if (rc) {
1241                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1242                        fsname, devname, rc);
1243                 GOTO(out, rc);
1244         }
1245
1246         /* Process MDT llogs */
1247         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1248                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1249                         continue;
1250                 name_create_mdt(&logname, fsname, i);
1251                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1252                 name_destroy(&logname);
1253                 if (rc)
1254                         GOTO(out, rc);
1255         }
1256
1257 out:
1258         spin_lock(&mgs_obd->obd_dev_lock);
1259         mgs_obd->obd_no_conn = conn_state;
1260         spin_unlock(&mgs_obd->obd_dev_lock);
1261
1262         RETURN(rc);
1263 }
1264
1265 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1266                             char *devname, struct lov_desc *desc)
1267 {
1268         struct mgs_thread_info  *mgi = mgs_env_info(env);
1269         struct llog_cfg_rec     *lcr;
1270         int rc;
1271
1272         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1273         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1274         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1275         if (lcr == NULL)
1276                 return -ENOMEM;
1277
1278         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1279         lustre_cfg_rec_free(lcr);
1280         return rc;
1281 }
1282
1283 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1284                             char *devname, struct lmv_desc *desc)
1285 {
1286         struct mgs_thread_info  *mgi = mgs_env_info(env);
1287         struct llog_cfg_rec     *lcr;
1288         int rc;
1289
1290         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1291         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1292         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1293         if (lcr == NULL)
1294                 return -ENOMEM;
1295
1296         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1297         lustre_cfg_rec_free(lcr);
1298         return rc;
1299 }
1300
1301 static inline int record_mdc_add(const struct lu_env *env,
1302                                  struct llog_handle *llh,
1303                                  char *logname, char *mdcuuid,
1304                                  char *mdtuuid, char *index,
1305                                  char *gen)
1306 {
1307         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1308                            mdtuuid,index,gen,mdcuuid);
1309 }
1310
1311 static inline int record_lov_add(const struct lu_env *env,
1312                                  struct llog_handle *llh,
1313                                  char *lov_name, char *ost_uuid,
1314                                  char *index, char *gen)
1315 {
1316         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1317                            ost_uuid, index, gen, 0);
1318 }
1319
1320 static inline int record_mount_opt(const struct lu_env *env,
1321                                    struct llog_handle *llh,
1322                                    char *profile, char *lov_name,
1323                                    char *mdc_name)
1324 {
1325         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1326                            profile,lov_name,mdc_name,0);
1327 }
1328
1329 static int record_marker(const struct lu_env *env,
1330                          struct llog_handle *llh,
1331                          struct fs_db *fsdb, __u32 flags,
1332                          char *tgtname, char *comment)
1333 {
1334         struct mgs_thread_info *mgi = mgs_env_info(env);
1335         struct llog_cfg_rec *lcr;
1336         int rc;
1337         int cplen = 0;
1338
1339         if (flags & CM_START)
1340                 fsdb->fsdb_gen++;
1341         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1342         mgi->mgi_marker.cm_flags = flags;
1343         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1344         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1345                         sizeof(mgi->mgi_marker.cm_tgtname));
1346         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1347                 return -E2BIG;
1348         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1349                         sizeof(mgi->mgi_marker.cm_comment));
1350         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1351                 return -E2BIG;
1352         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1353         mgi->mgi_marker.cm_canceltime = 0;
1354         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1355         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1356                             sizeof(mgi->mgi_marker));
1357         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1358         if (lcr == NULL)
1359                 return -ENOMEM;
1360
1361         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1362         lustre_cfg_rec_free(lcr);
1363         return rc;
1364 }
1365
1366 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1367                             struct llog_handle **llh, char *name)
1368 {
1369         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1370         struct llog_ctxt        *ctxt;
1371         int                      rc = 0;
1372         ENTRY;
1373
1374         if (*llh)
1375                 GOTO(out, rc = -EBUSY);
1376
1377         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1378         if (!ctxt)
1379                 GOTO(out, rc = -ENODEV);
1380         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1381
1382         rc = llog_open_create(env, ctxt, llh, NULL, name);
1383         if (rc)
1384                 GOTO(out_ctxt, rc);
1385         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1386         if (rc)
1387                 llog_close(env, *llh);
1388 out_ctxt:
1389         llog_ctxt_put(ctxt);
1390 out:
1391         if (rc) {
1392                 CERROR("%s: can't start log %s: rc = %d\n",
1393                        mgs->mgs_obd->obd_name, name, rc);
1394                 *llh = NULL;
1395         }
1396         RETURN(rc);
1397 }
1398
1399 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1400 {
1401         int rc;
1402
1403         rc = llog_close(env, *llh);
1404         *llh = NULL;
1405
1406         return rc;
1407 }
1408
1409 /******************** config "macros" *********************/
1410
1411 /* write an lcfg directly into a log (with markers) */
1412 static int mgs_write_log_direct(const struct lu_env *env,
1413                                 struct mgs_device *mgs, struct fs_db *fsdb,
1414                                 char *logname, struct llog_cfg_rec *lcr,
1415                                 char *devname, char *comment)
1416 {
1417         struct llog_handle *llh = NULL;
1418         int rc;
1419
1420         ENTRY;
1421
1422         rc = record_start_log(env, mgs, &llh, logname);
1423         if (rc)
1424                 RETURN(rc);
1425
1426         /* FIXME These should be a single journal transaction */
1427         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1428         if (rc)
1429                 GOTO(out_end, rc);
1430         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1431         if (rc)
1432                 GOTO(out_end, rc);
1433         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436 out_end:
1437         record_end_log(env, &llh);
1438         RETURN(rc);
1439 }
1440
1441 /* write the lcfg in all logs for the given fs */
1442 int mgs_write_log_direct_all(const struct lu_env *env, struct mgs_device *mgs,
1443                              struct fs_db *fsdb, struct mgs_target_info *mti,
1444                              struct llog_cfg_rec *lcr, char *devname,
1445                              char *comment, int server_only)
1446 {
1447         struct list_head         log_list;
1448         struct mgs_direntry     *dirent, *n;
1449         char                    *fsname = mti->mti_fsname;
1450         char                    *logname;
1451         int                      rc = 0, len = strlen(fsname);
1452
1453         ENTRY;
1454         /* We need to set params for any future logs
1455          * as well.
1456          * FIXME Append this file to every new log.
1457          * Actually, we should store as params (text), not llogs,
1458          * or in a database. */
1459         rc = name_create(&logname, fsname, "-params");
1460         if (rc)
1461                 RETURN(rc);
1462         if (mgs_log_is_empty(env, mgs, logname)) {
1463                 struct llog_handle *llh = NULL;
1464
1465                 rc = record_start_log(env, mgs, &llh, logname);
1466                 if (rc == 0)
1467                         record_end_log(env, &llh);
1468         }
1469         name_destroy(&logname);
1470         if (rc)
1471                 RETURN(rc);
1472
1473         /* Find all the logs in the CONFIGS directory */
1474         rc = class_dentry_readdir(env, mgs, &log_list);
1475         if (rc)
1476                 RETURN(rc);
1477
1478         /* Could use fsdb index maps instead of directory listing */
1479         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1480                 list_del_init(&dirent->mde_list);
1481                 /* don't write to sptlrpc rule log */
1482                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1483                         goto next;
1484
1485                 /* caller wants write server logs only */
1486                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1487                         goto next;
1488
1489                 if (strncmp(fsname, dirent->mde_name, len) != 0)
1490                         goto next;
1491
1492                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1493                 /* Erase any old settings of this same parameter */
1494                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1495                                 devname, comment, CM_SKIP);
1496                 if (rc < 0)
1497                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1498                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1499                 if (lcr == NULL)
1500                         goto next;
1501                 /* Write the new one */
1502                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1503                                           lcr, devname, comment);
1504                 if (rc != 0)
1505                         CERROR("%s: writing log %s: rc = %d\n",
1506                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1507 next:
1508                 mgs_direntry_free(dirent);
1509         }
1510
1511         RETURN(rc);
1512 }
1513
1514 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1515                                     struct mgs_device *mgs,
1516                                     struct fs_db *fsdb,
1517                                     struct mgs_target_info *mti,
1518                                     int index, char *logname);
1519 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1520                                     struct mgs_device *mgs,
1521                                     struct fs_db *fsdb,
1522                                     struct mgs_target_info *mti,
1523                                     char *logname, char *suffix, char *lovname,
1524                                     enum lustre_sec_part sec_part, int flags);
1525 static int name_create_mdt_and_lov(char **logname, char **lovname,
1526                                    struct fs_db *fsdb, int i);
1527
1528 static int add_param(char *params, char *key, char *val)
1529 {
1530         char *start = params + strlen(params);
1531         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1532         int keylen = 0;
1533
1534         if (key != NULL)
1535                 keylen = strlen(key);
1536         if (start + 1 + keylen + strlen(val) >= end) {
1537                 CERROR("params are too long: %s %s%s\n",
1538                        params, key != NULL ? key : "", val);
1539                 return -EINVAL;
1540         }
1541
1542         sprintf(start, " %s%s", key != NULL ? key : "", val);
1543         return 0;
1544 }
1545
1546 /**
1547  * Walk through client config log record and convert the related records
1548  * into the target.
1549  **/
1550 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1551                                          struct llog_handle *llh,
1552                                          struct llog_rec_hdr *rec, void *data)
1553 {
1554         struct mgs_device *mgs;
1555         struct obd_device *obd;
1556         struct mgs_target_info *mti, *tmti;
1557         struct fs_db *fsdb;
1558         int cfg_len = rec->lrh_len;
1559         char *cfg_buf = (char*) (rec + 1);
1560         struct lustre_cfg *lcfg;
1561         int rc = 0;
1562         struct llog_handle *mdt_llh = NULL;
1563         static int got_an_osc_or_mdc = 0;
1564         /* 0: not found any osc/mdc;
1565            1: found osc;
1566            2: found mdc;
1567         */
1568         static int last_step = -1;
1569         int cplen = 0;
1570
1571         ENTRY;
1572
1573         mti = ((struct temp_comp*)data)->comp_mti;
1574         tmti = ((struct temp_comp*)data)->comp_tmti;
1575         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1576         obd = ((struct temp_comp *)data)->comp_obd;
1577         mgs = lu2mgs_dev(obd->obd_lu_dev);
1578         LASSERT(mgs);
1579
1580         if (rec->lrh_type != OBD_CFG_REC) {
1581                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1582                 RETURN(-EINVAL);
1583         }
1584
1585         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1586         if (rc) {
1587                 CERROR("Insane cfg\n");
1588                 RETURN(rc);
1589         }
1590
1591         lcfg = (struct lustre_cfg *)cfg_buf;
1592
1593         if (lcfg->lcfg_command == LCFG_MARKER) {
1594                 struct cfg_marker *marker;
1595                 marker = lustre_cfg_buf(lcfg, 1);
1596                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1597                     (marker->cm_flags & CM_START) &&
1598                      !(marker->cm_flags & CM_SKIP)) {
1599                         got_an_osc_or_mdc = 1;
1600                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1601                                         sizeof(tmti->mti_svname));
1602                         if (cplen >= sizeof(tmti->mti_svname))
1603                                 RETURN(-E2BIG);
1604                         rc = record_start_log(env, mgs, &mdt_llh,
1605                                               mti->mti_svname);
1606                         if (rc)
1607                                 RETURN(rc);
1608                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1609                                            mti->mti_svname, "add osc(copied)");
1610                         record_end_log(env, &mdt_llh);
1611                         last_step = marker->cm_step;
1612                         RETURN(rc);
1613                 }
1614                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1615                     (marker->cm_flags & CM_END) &&
1616                      !(marker->cm_flags & CM_SKIP)) {
1617                         LASSERT(last_step == marker->cm_step);
1618                         last_step = -1;
1619                         got_an_osc_or_mdc = 0;
1620                         memset(tmti, 0, sizeof(*tmti));
1621                         rc = record_start_log(env, mgs, &mdt_llh,
1622                                               mti->mti_svname);
1623                         if (rc)
1624                                 RETURN(rc);
1625                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1626                                            mti->mti_svname, "add osc(copied)");
1627                         record_end_log(env, &mdt_llh);
1628                         RETURN(rc);
1629                 }
1630                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1631                     (marker->cm_flags & CM_START) &&
1632                      !(marker->cm_flags & CM_SKIP)) {
1633                         got_an_osc_or_mdc = 2;
1634                         last_step = marker->cm_step;
1635                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1636                                strlen(marker->cm_tgtname));
1637
1638                         RETURN(rc);
1639                 }
1640                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1641                     (marker->cm_flags & CM_END) &&
1642                      !(marker->cm_flags & CM_SKIP)) {
1643                         LASSERT(last_step == marker->cm_step);
1644                         last_step = -1;
1645                         got_an_osc_or_mdc = 0;
1646                         memset(tmti, 0, sizeof(*tmti));
1647                         RETURN(rc);
1648                 }
1649         }
1650
1651         if (got_an_osc_or_mdc == 0 || last_step < 0)
1652                 RETURN(rc);
1653
1654         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1655                 uint64_t nodenid = lcfg->lcfg_nid;
1656
1657                 if (strlen(tmti->mti_uuid) == 0) {
1658                         /* target uuid not set, this config record is before
1659                          * LCFG_SETUP, this nid is one of target node nid.
1660                          */
1661                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1662                         tmti->mti_nid_count++;
1663                 } else {
1664                         /* failover node nid */
1665                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1666                                        libcfs_nid2str(nodenid));
1667                 }
1668
1669                 RETURN(rc);
1670         }
1671
1672         if (lcfg->lcfg_command == LCFG_SETUP) {
1673                 char *target;
1674
1675                 target = lustre_cfg_string(lcfg, 1);
1676                 memcpy(tmti->mti_uuid, target, strlen(target));
1677                 RETURN(rc);
1678         }
1679
1680         /* ignore client side sptlrpc_conf_log */
1681         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1682                 RETURN(rc);
1683
1684         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1685                 int index;
1686
1687                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1688                         RETURN (-EINVAL);
1689
1690                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1691                        strlen(mti->mti_fsname));
1692                 tmti->mti_stripe_index = index;
1693
1694                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1695                                               mti->mti_stripe_index,
1696                                               mti->mti_svname);
1697                 memset(tmti, 0, sizeof(*tmti));
1698                 RETURN(rc);
1699         }
1700
1701         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1702                 int index;
1703                 char mdt_index[9];
1704                 char *logname, *lovname;
1705
1706                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1707                                              mti->mti_stripe_index);
1708                 if (rc)
1709                         RETURN(rc);
1710                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1711
1712                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1713                         name_destroy(&logname);
1714                         name_destroy(&lovname);
1715                         RETURN(-EINVAL);
1716                 }
1717
1718                 tmti->mti_stripe_index = index;
1719                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1720                                          mdt_index, lovname,
1721                                          LUSTRE_SP_MDT, 0);
1722                 name_destroy(&logname);
1723                 name_destroy(&lovname);
1724                 RETURN(rc);
1725         }
1726         RETURN(rc);
1727 }
1728
1729 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1730 /* stealed from mgs_get_fsdb_from_llog*/
1731 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1732                                               struct mgs_device *mgs,
1733                                               char *client_name,
1734                                               struct temp_comp* comp)
1735 {
1736         struct llog_handle *loghandle;
1737         struct mgs_target_info *tmti;
1738         struct llog_ctxt *ctxt;
1739         int rc;
1740
1741         ENTRY;
1742
1743         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1744         LASSERT(ctxt != NULL);
1745
1746         OBD_ALLOC_PTR(tmti);
1747         if (tmti == NULL)
1748                 GOTO(out_ctxt, rc = -ENOMEM);
1749
1750         comp->comp_tmti = tmti;
1751         comp->comp_obd = mgs->mgs_obd;
1752
1753         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1754                        LLOG_OPEN_EXISTS);
1755         if (rc < 0) {
1756                 if (rc == -ENOENT)
1757                         rc = 0;
1758                 GOTO(out_pop, rc);
1759         }
1760
1761         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1762         if (rc)
1763                 GOTO(out_close, rc);
1764
1765         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1766                                   (void *)comp, NULL, false);
1767         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1768 out_close:
1769         llog_close(env, loghandle);
1770 out_pop:
1771         OBD_FREE_PTR(tmti);
1772 out_ctxt:
1773         llog_ctxt_put(ctxt);
1774         RETURN(rc);
1775 }
1776
1777 /* lmv is the second thing for client logs */
1778 /* copied from mgs_write_log_lov. Please refer to that.  */
1779 static int mgs_write_log_lmv(const struct lu_env *env,
1780                              struct mgs_device *mgs,
1781                              struct fs_db *fsdb,
1782                              struct mgs_target_info *mti,
1783                              char *logname, char *lmvname)
1784 {
1785         struct llog_handle *llh = NULL;
1786         struct lmv_desc *lmvdesc;
1787         char *uuid;
1788         int rc = 0;
1789         ENTRY;
1790
1791         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1792
1793         OBD_ALLOC_PTR(lmvdesc);
1794         if (lmvdesc == NULL)
1795                 RETURN(-ENOMEM);
1796         lmvdesc->ld_active_tgt_count = 0;
1797         lmvdesc->ld_tgt_count = 0;
1798         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1799         uuid = (char *)lmvdesc->ld_uuid.uuid;
1800
1801         rc = record_start_log(env, mgs, &llh, logname);
1802         if (rc)
1803                 GOTO(out_free, rc);
1804         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1805         if (rc)
1806                 GOTO(out_end, rc);
1807         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1808         if (rc)
1809                 GOTO(out_end, rc);
1810         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1811         if (rc)
1812                 GOTO(out_end, rc);
1813         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1814         if (rc)
1815                 GOTO(out_end, rc);
1816 out_end:
1817         record_end_log(env, &llh);
1818 out_free:
1819         OBD_FREE_PTR(lmvdesc);
1820         RETURN(rc);
1821 }
1822
1823 /* lov is the first thing in the mdt and client logs */
1824 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1825                              struct fs_db *fsdb, struct mgs_target_info *mti,
1826                              char *logname, char *lovname)
1827 {
1828         struct llog_handle *llh = NULL;
1829         struct lov_desc *lovdesc;
1830         char *uuid;
1831         int rc = 0;
1832         ENTRY;
1833
1834         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1835
1836         /*
1837         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1838         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1839               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1840         */
1841
1842         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1843         OBD_ALLOC_PTR(lovdesc);
1844         if (lovdesc == NULL)
1845                 RETURN(-ENOMEM);
1846         lovdesc->ld_magic = LOV_DESC_MAGIC;
1847         lovdesc->ld_tgt_count = 0;
1848         /* Defaults.  Can be changed later by lcfg config_param */
1849         lovdesc->ld_default_stripe_count = 1;
1850         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1851         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1852         lovdesc->ld_default_stripe_offset = -1;
1853         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1854         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1855         /* can these be the same? */
1856         uuid = (char *)lovdesc->ld_uuid.uuid;
1857
1858         /* This should always be the first entry in a log.
1859         rc = mgs_clear_log(obd, logname); */
1860         rc = record_start_log(env, mgs, &llh, logname);
1861         if (rc)
1862                 GOTO(out_free, rc);
1863         /* FIXME these should be a single journal transaction */
1864         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1865         if (rc)
1866                 GOTO(out_end, rc);
1867         rc = record_attach(env, llh, lovname, "lov", uuid);
1868         if (rc)
1869                 GOTO(out_end, rc);
1870         rc = record_lov_setup(env, llh, lovname, lovdesc);
1871         if (rc)
1872                 GOTO(out_end, rc);
1873         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1874         if (rc)
1875                 GOTO(out_end, rc);
1876         EXIT;
1877 out_end:
1878         record_end_log(env, &llh);
1879 out_free:
1880         OBD_FREE_PTR(lovdesc);
1881         return rc;
1882 }
1883
1884 /* add failnids to open log */
1885 static int mgs_write_log_failnids(const struct lu_env *env,
1886                                   struct mgs_target_info *mti,
1887                                   struct llog_handle *llh,
1888                                   char *cliname)
1889 {
1890         char *failnodeuuid = NULL;
1891         char *ptr = mti->mti_params;
1892         lnet_nid_t nid;
1893         int rc = 0;
1894
1895         /*
1896         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1897         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1898         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1899         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1900         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1901         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1902         */
1903
1904         /* Pull failnid info out of params string */
1905         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1906                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1907                         if (failnodeuuid == NULL) {
1908                                 /* We don't know the failover node name,
1909                                    so just use the first nid as the uuid */
1910                                 rc = name_create(&failnodeuuid,
1911                                                  libcfs_nid2str(nid), "");
1912                                 if (rc)
1913                                         return rc;
1914                         }
1915                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1916                                "client %s\n", libcfs_nid2str(nid),
1917                                failnodeuuid, cliname);
1918                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1919                 }
1920                 if (failnodeuuid) {
1921                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1922                         name_destroy(&failnodeuuid);
1923                         failnodeuuid = NULL;
1924                 }
1925         }
1926
1927         return rc;
1928 }
1929
1930 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1931                                     struct mgs_device *mgs,
1932                                     struct fs_db *fsdb,
1933                                     struct mgs_target_info *mti,
1934                                     char *logname, char *lmvname)
1935 {
1936         struct llog_handle *llh = NULL;
1937         char *mdcname = NULL;
1938         char *nodeuuid = NULL;
1939         char *mdcuuid = NULL;
1940         char *lmvuuid = NULL;
1941         char index[6];
1942         int i, rc;
1943         ENTRY;
1944
1945         if (mgs_log_is_empty(env, mgs, logname)) {
1946                 CERROR("log is empty! Logical error\n");
1947                 RETURN(-EINVAL);
1948         }
1949
1950         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1951                mti->mti_svname, logname, lmvname);
1952
1953         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1954         if (rc)
1955                 RETURN(rc);
1956         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1957         if (rc)
1958                 GOTO(out_free, rc);
1959         rc = name_create(&mdcuuid, mdcname, "_UUID");
1960         if (rc)
1961                 GOTO(out_free, rc);
1962         rc = name_create(&lmvuuid, lmvname, "_UUID");
1963         if (rc)
1964                 GOTO(out_free, rc);
1965
1966         rc = record_start_log(env, mgs, &llh, logname);
1967         if (rc)
1968                 GOTO(out_free, rc);
1969         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1970                            "add mdc");
1971         if (rc)
1972                 GOTO(out_end, rc);
1973         for (i = 0; i < mti->mti_nid_count; i++) {
1974                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1975                        libcfs_nid2str(mti->mti_nids[i]));
1976
1977                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1978                 if (rc)
1979                         GOTO(out_end, rc);
1980         }
1981
1982         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1983         if (rc)
1984                 GOTO(out_end, rc);
1985         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1986         if (rc)
1987                 GOTO(out_end, rc);
1988         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1989         if (rc)
1990                 GOTO(out_end, rc);
1991         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1992         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1993                             index, "1");
1994         if (rc)
1995                 GOTO(out_end, rc);
1996         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1997                            "add mdc");
1998         if (rc)
1999                 GOTO(out_end, rc);
2000 out_end:
2001         record_end_log(env, &llh);
2002 out_free:
2003         name_destroy(&lmvuuid);
2004         name_destroy(&mdcuuid);
2005         name_destroy(&mdcname);
2006         name_destroy(&nodeuuid);
2007         RETURN(rc);
2008 }
2009
2010 static inline int name_create_lov(char **lovname, char *mdtname,
2011                                   struct fs_db *fsdb, int index)
2012 {
2013         /* COMPAT_180 */
2014         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2015                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2016         else
2017                 return name_create(lovname, mdtname, "-mdtlov");
2018 }
2019
2020 static int name_create_mdt_and_lov(char **logname, char **lovname,
2021                                    struct fs_db *fsdb, int i)
2022 {
2023         int rc;
2024
2025         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2026         if (rc)
2027                 return rc;
2028         /* COMPAT_180 */
2029         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2030                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2031         else
2032                 rc = name_create(lovname, *logname, "-mdtlov");
2033         if (rc) {
2034                 name_destroy(logname);
2035                 *logname = NULL;
2036         }
2037         return rc;
2038 }
2039
2040 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2041                                       struct fs_db *fsdb, int i)
2042 {
2043         char suffix[16];
2044
2045         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2046                 sprintf(suffix, "-osc");
2047         else
2048                 sprintf(suffix, "-osc-MDT%04x", i);
2049         return name_create(oscname, ostname, suffix);
2050 }
2051
2052 /* add new mdc to already existent MDS */
2053 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2054                                     struct mgs_device *mgs,
2055                                     struct fs_db *fsdb,
2056                                     struct mgs_target_info *mti,
2057                                     int mdt_index, char *logname)
2058 {
2059         struct llog_handle      *llh = NULL;
2060         char    *nodeuuid = NULL;
2061         char    *ospname = NULL;
2062         char    *lovuuid = NULL;
2063         char    *mdtuuid = NULL;
2064         char    *svname = NULL;
2065         char    *mdtname = NULL;
2066         char    *lovname = NULL;
2067         char    index_str[16];
2068         int     i, rc;
2069
2070         ENTRY;
2071         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2072                 CERROR("log is empty! Logical error\n");
2073                 RETURN (-EINVAL);
2074         }
2075
2076         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2077                logname);
2078
2079         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2080         if (rc)
2081                 RETURN(rc);
2082
2083         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2084         if (rc)
2085                 GOTO(out_destory, rc);
2086
2087         rc = name_create(&svname, mdtname, "-osp");
2088         if (rc)
2089                 GOTO(out_destory, rc);
2090
2091         sprintf(index_str, "-MDT%04x", mdt_index);
2092         rc = name_create(&ospname, svname, index_str);
2093         if (rc)
2094                 GOTO(out_destory, rc);
2095
2096         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2097         if (rc)
2098                 GOTO(out_destory, rc);
2099
2100         rc = name_create(&lovuuid, lovname, "_UUID");
2101         if (rc)
2102                 GOTO(out_destory, rc);
2103
2104         rc = name_create(&mdtuuid, mdtname, "_UUID");
2105         if (rc)
2106                 GOTO(out_destory, rc);
2107
2108         rc = record_start_log(env, mgs, &llh, logname);
2109         if (rc)
2110                 GOTO(out_destory, rc);
2111
2112         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2113                            "add osp");
2114         if (rc)
2115                 GOTO(out_destory, rc);
2116
2117         for (i = 0; i < mti->mti_nid_count; i++) {
2118                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2119                        libcfs_nid2str(mti->mti_nids[i]));
2120                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2121                 if (rc)
2122                         GOTO(out_end, rc);
2123         }
2124
2125         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2126         if (rc)
2127                 GOTO(out_end, rc);
2128
2129         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2130                           NULL, NULL);
2131         if (rc)
2132                 GOTO(out_end, rc);
2133
2134         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2135         if (rc)
2136                 GOTO(out_end, rc);
2137
2138         /* Add mdc(osp) to lod */
2139         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2140                  mti->mti_stripe_index);
2141         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2142                          index_str, "1", NULL);
2143         if (rc)
2144                 GOTO(out_end, rc);
2145
2146         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2147         if (rc)
2148                 GOTO(out_end, rc);
2149
2150 out_end:
2151         record_end_log(env, &llh);
2152
2153 out_destory:
2154         name_destroy(&mdtuuid);
2155         name_destroy(&lovuuid);
2156         name_destroy(&lovname);
2157         name_destroy(&ospname);
2158         name_destroy(&svname);
2159         name_destroy(&nodeuuid);
2160         name_destroy(&mdtname);
2161         RETURN(rc);
2162 }
2163
2164 static int mgs_write_log_mdt0(const struct lu_env *env,
2165                               struct mgs_device *mgs,
2166                               struct fs_db *fsdb,
2167                               struct mgs_target_info *mti)
2168 {
2169         char *log = mti->mti_svname;
2170         struct llog_handle *llh = NULL;
2171         char *uuid, *lovname;
2172         char mdt_index[6];
2173         char *ptr = mti->mti_params;
2174         int rc = 0, failout = 0;
2175         ENTRY;
2176
2177         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2178         if (uuid == NULL)
2179                 RETURN(-ENOMEM);
2180
2181         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2182                 failout = (strncmp(ptr, "failout", 7) == 0);
2183
2184         rc = name_create(&lovname, log, "-mdtlov");
2185         if (rc)
2186                 GOTO(out_free, rc);
2187         if (mgs_log_is_empty(env, mgs, log)) {
2188                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2189                 if (rc)
2190                         GOTO(out_lod, rc);
2191         }
2192
2193         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2194
2195         rc = record_start_log(env, mgs, &llh, log);
2196         if (rc)
2197                 GOTO(out_lod, rc);
2198
2199         /* add MDT itself */
2200
2201         /* FIXME this whole fn should be a single journal transaction */
2202         sprintf(uuid, "%s_UUID", log);
2203         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2204         if (rc)
2205                 GOTO(out_lod, rc);
2206         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2207         if (rc)
2208                 GOTO(out_end, rc);
2209         rc = record_mount_opt(env, llh, log, lovname, NULL);
2210         if (rc)
2211                 GOTO(out_end, rc);
2212         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2213                         failout ? "n" : "f");
2214         if (rc)
2215                 GOTO(out_end, rc);
2216         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2217         if (rc)
2218                 GOTO(out_end, rc);
2219 out_end:
2220         record_end_log(env, &llh);
2221 out_lod:
2222         name_destroy(&lovname);
2223 out_free:
2224         OBD_FREE(uuid, sizeof(struct obd_uuid));
2225         RETURN(rc);
2226 }
2227
2228 /* envelope method for all layers log */
2229 static int mgs_write_log_mdt(const struct lu_env *env,
2230                              struct mgs_device *mgs,
2231                              struct fs_db *fsdb,
2232                              struct mgs_target_info *mti)
2233 {
2234         struct mgs_thread_info *mgi = mgs_env_info(env);
2235         struct llog_handle *llh = NULL;
2236         char *cliname;
2237         int rc, i = 0;
2238         ENTRY;
2239
2240         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2241
2242         if (mti->mti_uuid[0] == '\0') {
2243                 /* Make up our own uuid */
2244                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2245                          "%s_UUID", mti->mti_svname);
2246         }
2247
2248         /* add mdt */
2249         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2250         if (rc)
2251                 RETURN(rc);
2252         /* Append the mdt info to the client log */
2253         rc = name_create(&cliname, mti->mti_fsname, "-client");
2254         if (rc)
2255                 RETURN(rc);
2256
2257         if (mgs_log_is_empty(env, mgs, cliname)) {
2258                 /* Start client log */
2259                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2260                                        fsdb->fsdb_clilov);
2261                 if (rc)
2262                         GOTO(out_free, rc);
2263                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2264                                        fsdb->fsdb_clilmv);
2265                 if (rc)
2266                         GOTO(out_free, rc);
2267         }
2268
2269         /*
2270         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2271         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2272         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2273         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2274         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2275         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2276         */
2277
2278                 /* copy client info about lov/lmv */
2279                 mgi->mgi_comp.comp_mti = mti;
2280                 mgi->mgi_comp.comp_fsdb = fsdb;
2281
2282                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2283                                                         &mgi->mgi_comp);
2284                 if (rc)
2285                         GOTO(out_free, rc);
2286                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2287                                               fsdb->fsdb_clilmv);
2288                 if (rc)
2289                         GOTO(out_free, rc);
2290
2291                 /* add mountopts */
2292                 rc = record_start_log(env, mgs, &llh, cliname);
2293                 if (rc)
2294                         GOTO(out_free, rc);
2295
2296                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2297                                    "mount opts");
2298                 if (rc)
2299                         GOTO(out_end, rc);
2300                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2301                                       fsdb->fsdb_clilmv);
2302                 if (rc)
2303                         GOTO(out_end, rc);
2304                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2305                                    "mount opts");
2306
2307         if (rc)
2308                 GOTO(out_end, rc);
2309
2310         /* for_all_existing_mdt except current one */
2311         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2312                 if (i !=  mti->mti_stripe_index &&
2313                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2314                         char *logname;
2315
2316                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2317                         if (rc)
2318                                 GOTO(out_end, rc);
2319
2320                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2321                                                       i, logname);
2322                         name_destroy(&logname);
2323                         if (rc)
2324                                 GOTO(out_end, rc);
2325                 }
2326         }
2327 out_end:
2328         record_end_log(env, &llh);
2329 out_free:
2330         name_destroy(&cliname);
2331         RETURN(rc);
2332 }
2333
2334 /* Add the ost info to the client/mdt lov */
2335 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2336                                     struct mgs_device *mgs, struct fs_db *fsdb,
2337                                     struct mgs_target_info *mti,
2338                                     char *logname, char *suffix, char *lovname,
2339                                     enum lustre_sec_part sec_part, int flags)
2340 {
2341         struct llog_handle *llh = NULL;
2342         char *nodeuuid = NULL;
2343         char *oscname = NULL;
2344         char *oscuuid = NULL;
2345         char *lovuuid = NULL;
2346         char *svname = NULL;
2347         char index[6];
2348         int i, rc;
2349
2350         ENTRY;
2351         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2352                mti->mti_svname, logname);
2353
2354         if (mgs_log_is_empty(env, mgs, logname)) {
2355                 CERROR("log is empty! Logical error\n");
2356                 RETURN (-EINVAL);
2357         }
2358
2359         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2360         if (rc)
2361                 RETURN(rc);
2362         rc = name_create(&svname, mti->mti_svname, "-osc");
2363         if (rc)
2364                 GOTO(out_free, rc);
2365
2366         /* for the system upgraded from old 1.8, keep using the old osc naming
2367          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2368         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2369                 rc = name_create(&oscname, svname, "");
2370         else
2371                 rc = name_create(&oscname, svname, suffix);
2372         if (rc)
2373                 GOTO(out_free, rc);
2374
2375         rc = name_create(&oscuuid, oscname, "_UUID");
2376         if (rc)
2377                 GOTO(out_free, rc);
2378         rc = name_create(&lovuuid, lovname, "_UUID");
2379         if (rc)
2380                 GOTO(out_free, rc);
2381
2382
2383         /*
2384         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2385         multihomed (#4)
2386         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2387         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2388         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2389         failover (#6,7)
2390         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2391         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2392         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2393         */
2394
2395         rc = record_start_log(env, mgs, &llh, logname);
2396         if (rc)
2397                 GOTO(out_free, rc);
2398
2399         /* FIXME these should be a single journal transaction */
2400         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2401                            "add osc");
2402         if (rc)
2403                 GOTO(out_end, rc);
2404
2405         /* NB: don't change record order, because upon MDT steal OSC config
2406          * from client, it treats all nids before LCFG_SETUP as target nids
2407          * (multiple interfaces), while nids after as failover node nids.
2408          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2409          */
2410         for (i = 0; i < mti->mti_nid_count; i++) {
2411                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2412                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2413                 if (rc)
2414                         GOTO(out_end, rc);
2415         }
2416         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2417         if (rc)
2418                 GOTO(out_end, rc);
2419         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2420         if (rc)
2421                 GOTO(out_end, rc);
2422         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2423         if (rc)
2424                 GOTO(out_end, rc);
2425
2426         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2427
2428         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2429         if (rc)
2430                 GOTO(out_end, rc);
2431         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2432                            "add osc");
2433         if (rc)
2434                 GOTO(out_end, rc);
2435 out_end:
2436         record_end_log(env, &llh);
2437 out_free:
2438         name_destroy(&lovuuid);
2439         name_destroy(&oscuuid);
2440         name_destroy(&oscname);
2441         name_destroy(&svname);
2442         name_destroy(&nodeuuid);
2443         RETURN(rc);
2444 }
2445
2446 static int mgs_write_log_ost(const struct lu_env *env,
2447                              struct mgs_device *mgs, struct fs_db *fsdb,
2448                              struct mgs_target_info *mti)
2449 {
2450         struct llog_handle *llh = NULL;
2451         char *logname, *lovname;
2452         char *ptr = mti->mti_params;
2453         int rc, flags = 0, failout = 0, i;
2454         ENTRY;
2455
2456         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2457
2458         /* The ost startup log */
2459
2460         /* If the ost log already exists, that means that someone reformatted
2461            the ost and it called target_add again. */
2462         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2463                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2464                                    "exists, yet the server claims it never "
2465                                    "registered. It may have been reformatted, "
2466                                    "or the index changed. writeconf the MDT to "
2467                                    "regenerate all logs.\n", mti->mti_svname);
2468                 RETURN(-EALREADY);
2469         }
2470
2471         /*
2472         attach obdfilter ost1 ost1_UUID
2473         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2474         */
2475         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2476                 failout = (strncmp(ptr, "failout", 7) == 0);
2477         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2478         if (rc)
2479                 RETURN(rc);
2480         /* FIXME these should be a single journal transaction */
2481         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2482         if (rc)
2483                 GOTO(out_end, rc);
2484         if (*mti->mti_uuid == '\0')
2485                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2486                          "%s_UUID", mti->mti_svname);
2487         rc = record_attach(env, llh, mti->mti_svname,
2488                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2489         if (rc)
2490                 GOTO(out_end, rc);
2491         rc = record_setup(env, llh, mti->mti_svname,
2492                           "dev"/*ignored*/, "type"/*ignored*/,
2493                           failout ? "n" : "f", 0/*options*/);
2494         if (rc)
2495                 GOTO(out_end, rc);
2496         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2497         if (rc)
2498                 GOTO(out_end, rc);
2499 out_end:
2500         record_end_log(env, &llh);
2501         if (rc)
2502                 RETURN(rc);
2503         /* We also have to update the other logs where this osc is part of
2504            the lov */
2505
2506         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2507                 /* If we're upgrading, the old mdt log already has our
2508                    entry. Let's do a fake one for fun. */
2509                 /* Note that we can't add any new failnids, since we don't
2510                    know the old osc names. */
2511                 flags = CM_SKIP | CM_UPGRADE146;
2512
2513         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2514                 /* If the update flag isn't set, don't update client/mdt
2515                    logs. */
2516                 flags |= CM_SKIP;
2517                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2518                               "the MDT first to regenerate it.\n",
2519                               mti->mti_svname);
2520         }
2521
2522         /* Add ost to all MDT lov defs */
2523         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2524                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2525                         char mdt_index[9];
2526
2527                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2528                                                      i);
2529                         if (rc)
2530                                 RETURN(rc);
2531                         sprintf(mdt_index, "-MDT%04x", i);
2532                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2533                                                       logname, mdt_index,
2534                                                       lovname, LUSTRE_SP_MDT,
2535                                                       flags);
2536                         name_destroy(&logname);
2537                         name_destroy(&lovname);
2538                         if (rc)
2539                                 RETURN(rc);
2540                 }
2541         }
2542
2543         /* Append ost info to the client log */
2544         rc = name_create(&logname, mti->mti_fsname, "-client");
2545         if (rc)
2546                 RETURN(rc);
2547         if (mgs_log_is_empty(env, mgs, logname)) {
2548                 /* Start client log */
2549                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2550                                        fsdb->fsdb_clilov);
2551                 if (rc)
2552                         GOTO(out_free, rc);
2553                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2554                                        fsdb->fsdb_clilmv);
2555                 if (rc)
2556                         GOTO(out_free, rc);
2557         }
2558         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2559                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2560 out_free:
2561         name_destroy(&logname);
2562         RETURN(rc);
2563 }
2564
2565 static __inline__ int mgs_param_empty(char *ptr)
2566 {
2567         char *tmp;
2568
2569         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2570                 return 1;
2571         return 0;
2572 }
2573
2574 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2575                                           struct mgs_device *mgs,
2576                                           struct fs_db *fsdb,
2577                                           struct mgs_target_info *mti,
2578                                           char *logname, char *cliname)
2579 {
2580         int rc;
2581         struct llog_handle *llh = NULL;
2582
2583         if (mgs_param_empty(mti->mti_params)) {
2584                 /* Remove _all_ failnids */
2585                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2586                                 mti->mti_svname, "add failnid", CM_SKIP);
2587                 return rc < 0 ? rc : 0;
2588         }
2589
2590         /* Otherwise failover nids are additive */
2591         rc = record_start_log(env, mgs, &llh, logname);
2592         if (rc)
2593                 return rc;
2594                 /* FIXME this should be a single journal transaction */
2595         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2596                            "add failnid");
2597         if (rc)
2598                 goto out_end;
2599         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2600         if (rc)
2601                 goto out_end;
2602         rc = record_marker(env, llh, fsdb, CM_END,
2603                            mti->mti_svname, "add failnid");
2604 out_end:
2605         record_end_log(env, &llh);
2606         return rc;
2607 }
2608
2609
2610 /* Add additional failnids to an existing log.
2611    The mdc/osc must have been added to logs first */
2612 /* tcp nids must be in dotted-quad ascii -
2613    we can't resolve hostnames from the kernel. */
2614 static int mgs_write_log_add_failnid(const struct lu_env *env,
2615                                      struct mgs_device *mgs,
2616                                      struct fs_db *fsdb,
2617                                      struct mgs_target_info *mti)
2618 {
2619         char *logname, *cliname;
2620         int rc;
2621         ENTRY;
2622
2623         /* FIXME we currently can't erase the failnids
2624          * given when a target first registers, since they aren't part of
2625          * an "add uuid" stanza */
2626
2627         /* Verify that we know about this target */
2628         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2629                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2630                                    "yet. It must be started before failnids "
2631                                    "can be added.\n", mti->mti_svname);
2632                 RETURN(-ENOENT);
2633         }
2634
2635         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2636         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2637                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2638         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2639                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2640         } else {
2641                 RETURN(-EINVAL);
2642         }
2643         if (rc)
2644                 RETURN(rc);
2645         /* Add failover nids to the client log */
2646         rc = name_create(&logname, mti->mti_fsname, "-client");
2647         if (rc) {
2648                 name_destroy(&cliname);
2649                 RETURN(rc);
2650         }
2651         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2652         name_destroy(&logname);
2653         name_destroy(&cliname);
2654         if (rc)
2655                 RETURN(rc);
2656
2657         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2658                 /* Add OST failover nids to the MDT logs as well */
2659                 int i;
2660
2661                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2662                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2663                                 continue;
2664                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2665                         if (rc)
2666                                 RETURN(rc);
2667                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2668                                                  fsdb, i);
2669                         if (rc) {
2670                                 name_destroy(&logname);
2671                                 RETURN(rc);
2672                         }
2673                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2674                                                             mti, logname,
2675                                                             cliname);
2676                         name_destroy(&cliname);
2677                         name_destroy(&logname);
2678                         if (rc)
2679                                 RETURN(rc);
2680                 }
2681         }
2682
2683         RETURN(rc);
2684 }
2685
2686 static int mgs_wlp_lcfg(const struct lu_env *env,
2687                         struct mgs_device *mgs, struct fs_db *fsdb,
2688                         struct mgs_target_info *mti,
2689                         char *logname, struct lustre_cfg_bufs *bufs,
2690                         char *tgtname, char *ptr)
2691 {
2692         char comment[MTI_NAME_MAXLEN];
2693         char *tmp;
2694         struct llog_cfg_rec *lcr;
2695         int rc, del;
2696
2697         /* Erase any old settings of this same parameter */
2698         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2699         comment[MTI_NAME_MAXLEN - 1] = 0;
2700         /* But don't try to match the value. */
2701         tmp = strchr(comment, '=');
2702         if (tmp != NULL)
2703                 *tmp = 0;
2704         /* FIXME we should skip settings that are the same as old values */
2705         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2706         if (rc < 0)
2707                 return rc;
2708         del = mgs_param_empty(ptr);
2709
2710         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2711                       "Setting" : "Modifying", tgtname, comment, logname);
2712         if (del) {
2713                 /* mgs_modify() will return 1 if nothing had to be done */
2714                 if (rc == 1)
2715                         rc = 0;
2716                 return rc;
2717         }
2718
2719         lustre_cfg_bufs_reset(bufs, tgtname);
2720         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2721         if (mti->mti_flags & LDD_F_PARAM2)
2722                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2723
2724         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2725                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2726         if (lcr == NULL)
2727                 return -ENOMEM;
2728
2729         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2730                                   comment);
2731         lustre_cfg_rec_free(lcr);
2732         return rc;
2733 }
2734
2735 static int mgs_write_log_param2(const struct lu_env *env,
2736                                 struct mgs_device *mgs,
2737                                 struct fs_db *fsdb,
2738                                 struct mgs_target_info *mti, char *ptr)
2739 {
2740         struct lustre_cfg_bufs  bufs;
2741         int                     rc = 0;
2742         ENTRY;
2743
2744         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2745         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2746                           mti->mti_svname, ptr);
2747
2748         RETURN(rc);
2749 }
2750
2751 /* write global variable settings into log */
2752 static int mgs_write_log_sys(const struct lu_env *env,
2753                              struct mgs_device *mgs, struct fs_db *fsdb,
2754                              struct mgs_target_info *mti, char *sys, char *ptr)
2755 {
2756         struct mgs_thread_info  *mgi = mgs_env_info(env);
2757         struct lustre_cfg       *lcfg;
2758         struct llog_cfg_rec     *lcr;
2759         char *tmp, sep;
2760         int rc, cmd, convert = 1;
2761
2762         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2763                 cmd = LCFG_SET_TIMEOUT;
2764         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2765                 cmd = LCFG_SET_LDLM_TIMEOUT;
2766         /* Check for known params here so we can return error to lctl */
2767         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2769                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2770                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2771                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2772                 cmd = LCFG_PARAM;
2773         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2774                 convert = 0; /* Don't convert string value to integer */
2775                 cmd = LCFG_PARAM;
2776         } else {
2777                 return -EINVAL;
2778         }
2779
2780         if (mgs_param_empty(ptr))
2781                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2782         else
2783                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2784
2785         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2786         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2787         if (!convert && *tmp != '\0')
2788                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2789         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2790         if (lcr == NULL)
2791                 return -ENOMEM;
2792
2793         lcfg = &lcr->lcr_cfg;
2794         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2795         /* truncate the comment to the parameter name */
2796         ptr = tmp - 1;
2797         sep = *ptr;
2798         *ptr = '\0';
2799         /* modify all servers and clients */
2800         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2801                                       *tmp == '\0' ? NULL : lcr,
2802                                       mti->mti_fsname, sys, 0);
2803         if (rc == 0 && *tmp != '\0') {
2804                 switch (cmd) {
2805                 case LCFG_SET_TIMEOUT:
2806                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2807                                 class_process_config(lcfg);
2808                         break;
2809                 case LCFG_SET_LDLM_TIMEOUT:
2810                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2811                                 class_process_config(lcfg);
2812                         break;
2813                 default:
2814                         break;
2815                 }
2816         }
2817         *ptr = sep;
2818         lustre_cfg_rec_free(lcr);
2819         return rc;
2820 }
2821
2822 /* write quota settings into log */
2823 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2824                                struct fs_db *fsdb, struct mgs_target_info *mti,
2825                                char *quota, char *ptr)
2826 {
2827         struct mgs_thread_info  *mgi = mgs_env_info(env);
2828         struct llog_cfg_rec     *lcr;
2829         char                    *tmp;
2830         char                     sep;
2831         int                      rc, cmd = LCFG_PARAM;
2832
2833         /* support only 'meta' and 'data' pools so far */
2834         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2835             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2836                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2837                        "& quota.ost are)\n", ptr);
2838                 return -EINVAL;
2839         }
2840
2841         if (*tmp == '\0') {
2842                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2843         } else {
2844                 CDEBUG(D_MGS, "global '%s'\n", quota);
2845
2846                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2847                     strcmp(tmp, "none") != 0) {
2848                         CERROR("enable option(%s) isn't supported\n", tmp);
2849                         return -EINVAL;
2850                 }
2851         }
2852
2853         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2854         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2855         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2856         if (lcr == NULL)
2857                 return -ENOMEM;
2858
2859         /* truncate the comment to the parameter name */
2860         ptr = tmp - 1;
2861         sep = *ptr;
2862         *ptr = '\0';
2863
2864         /* XXX we duplicated quota enable information in all server
2865          *     config logs, it should be moved to a separate config
2866          *     log once we cleanup the config log for global param. */
2867         /* modify all servers */
2868         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2869                                       *tmp == '\0' ? NULL : lcr,
2870                                       mti->mti_fsname, quota, 1);
2871         *ptr = sep;
2872         lustre_cfg_rec_free(lcr);
2873         return rc < 0 ? rc : 0;
2874 }
2875
2876 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2877                                    struct mgs_device *mgs,
2878                                    struct fs_db *fsdb,
2879                                    struct mgs_target_info *mti,
2880                                    char *param)
2881 {
2882         struct mgs_thread_info  *mgi = mgs_env_info(env);
2883         struct llog_cfg_rec     *lcr;
2884         struct llog_handle      *llh = NULL;
2885         char                    *logname;
2886         char                    *comment, *ptr;
2887         int                      rc, len;
2888
2889         ENTRY;
2890
2891         /* get comment */
2892         ptr = strchr(param, '=');
2893         LASSERT(ptr != NULL);
2894         len = ptr - param;
2895
2896         OBD_ALLOC(comment, len + 1);
2897         if (comment == NULL)
2898                 RETURN(-ENOMEM);
2899         strncpy(comment, param, len);
2900         comment[len] = '\0';
2901
2902         /* prepare lcfg */
2903         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2904         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2905         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2906         if (lcr == NULL)
2907                 GOTO(out_comment, rc = -ENOMEM);
2908
2909         /* construct log name */
2910         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2911         if (rc < 0)
2912                 GOTO(out_lcfg, rc);
2913
2914         if (mgs_log_is_empty(env, mgs, logname)) {
2915                 rc = record_start_log(env, mgs, &llh, logname);
2916                 if (rc < 0)
2917                         GOTO(out, rc);
2918                 record_end_log(env, &llh);
2919         }
2920
2921         /* obsolete old one */
2922         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2923                         comment, CM_SKIP);
2924         if (rc < 0)
2925                 GOTO(out, rc);
2926         /* write the new one */
2927         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2928                                   mti->mti_svname, comment);
2929         if (rc)
2930                 CERROR("%s: error writing log %s: rc = %d\n",
2931                        mgs->mgs_obd->obd_name, logname, rc);
2932 out:
2933         name_destroy(&logname);
2934 out_lcfg:
2935         lustre_cfg_rec_free(lcr);
2936 out_comment:
2937         OBD_FREE(comment, len + 1);
2938         RETURN(rc);
2939 }
2940
2941 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2942                                         char *param)
2943 {
2944         char    *ptr;
2945
2946         /* disable the adjustable udesc parameter for now, i.e. use default
2947          * setting that client always ship udesc to MDT if possible. to enable
2948          * it simply remove the following line */
2949         goto error_out;
2950
2951         ptr = strchr(param, '=');
2952         if (ptr == NULL)
2953                 goto error_out;
2954         *ptr++ = '\0';
2955
2956         if (strcmp(param, PARAM_SRPC_UDESC))
2957                 goto error_out;
2958
2959         if (strcmp(ptr, "yes") == 0) {
2960                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2961                 CWARN("Enable user descriptor shipping from client to MDT\n");
2962         } else if (strcmp(ptr, "no") == 0) {
2963                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2964                 CWARN("Disable user descriptor shipping from client to MDT\n");
2965         } else {
2966                 *(ptr - 1) = '=';
2967                 goto error_out;
2968         }
2969         return 0;
2970
2971 error_out:
2972         CERROR("Invalid param: %s\n", param);
2973         return -EINVAL;
2974 }
2975
2976 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2977                                   const char *svname,
2978                                   char *param)
2979 {
2980         struct sptlrpc_rule      rule;
2981         struct sptlrpc_rule_set *rset;
2982         int                      rc;
2983         ENTRY;
2984
2985         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2986                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2987                 RETURN(-EINVAL);
2988         }
2989
2990         if (strncmp(param, PARAM_SRPC_UDESC,
2991                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2992                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2993         }
2994
2995         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2996                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2997                 RETURN(-EINVAL);
2998         }
2999
3000         param += sizeof(PARAM_SRPC_FLVR) - 1;
3001
3002         rc = sptlrpc_parse_rule(param, &rule);
3003         if (rc)
3004                 RETURN(rc);
3005
3006         /* mgs rules implies must be mgc->mgs */
3007         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3008                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3009                      rule.sr_from != LUSTRE_SP_ANY) ||
3010                     (rule.sr_to != LUSTRE_SP_MGS &&
3011                      rule.sr_to != LUSTRE_SP_ANY))
3012                         RETURN(-EINVAL);
3013         }
3014
3015         /* preapre room for this coming rule. svcname format should be:
3016          * - fsname: general rule
3017          * - fsname-tgtname: target-specific rule
3018          */
3019         if (strchr(svname, '-')) {
3020                 struct mgs_tgt_srpc_conf *tgtconf;
3021                 int                       found = 0;
3022
3023                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3024                      tgtconf = tgtconf->mtsc_next) {
3025                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3026                                 found = 1;
3027                                 break;
3028                         }
3029                 }
3030
3031                 if (!found) {
3032                         int name_len;
3033
3034                         OBD_ALLOC_PTR(tgtconf);
3035                         if (tgtconf == NULL)
3036                                 RETURN(-ENOMEM);
3037
3038                         name_len = strlen(svname);
3039
3040                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3041                         if (tgtconf->mtsc_tgt == NULL) {
3042                                 OBD_FREE_PTR(tgtconf);
3043                                 RETURN(-ENOMEM);
3044                         }
3045                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3046
3047                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3048                         fsdb->fsdb_srpc_tgt = tgtconf;
3049                 }
3050
3051                 rset = &tgtconf->mtsc_rset;
3052         } else {
3053                 rset = &fsdb->fsdb_srpc_gen;
3054         }
3055
3056         rc = sptlrpc_rule_set_merge(rset, &rule);
3057
3058         RETURN(rc);
3059 }
3060
3061 static int mgs_srpc_set_param(const struct lu_env *env,
3062                               struct mgs_device *mgs,
3063                               struct fs_db *fsdb,
3064                               struct mgs_target_info *mti,
3065                               char *param)
3066 {
3067         char                   *copy;
3068         int                     rc, copy_size;
3069         ENTRY;
3070
3071 #ifndef HAVE_GSS
3072         RETURN(-EINVAL);
3073 #endif
3074         /* keep a copy of original param, which could be destroied
3075          * during parsing */
3076         copy_size = strlen(param) + 1;
3077         OBD_ALLOC(copy, copy_size);
3078         if (copy == NULL)
3079                 return -ENOMEM;
3080         memcpy(copy, param, copy_size);
3081
3082         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3083         if (rc)
3084                 goto out_free;
3085
3086         /* previous steps guaranteed the syntax is correct */
3087         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3088         if (rc)
3089                 goto out_free;
3090
3091         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3092                 /*
3093                  * for mgs rules, make them effective immediately.
3094                  */
3095                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3096                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3097                                                  &fsdb->fsdb_srpc_gen);
3098         }
3099
3100 out_free:
3101         OBD_FREE(copy, copy_size);
3102         RETURN(rc);
3103 }
3104
3105 struct mgs_srpc_read_data {
3106         struct fs_db   *msrd_fsdb;
3107         int             msrd_skip;
3108 };
3109
3110 static int mgs_srpc_read_handler(const struct lu_env *env,
3111                                  struct llog_handle *llh,
3112                                  struct llog_rec_hdr *rec, void *data)
3113 {
3114         struct mgs_srpc_read_data *msrd = data;
3115         struct cfg_marker         *marker;
3116         struct lustre_cfg         *lcfg = REC_DATA(rec);
3117         char                      *svname, *param;
3118         int                        cfg_len, rc;
3119         ENTRY;
3120
3121         if (rec->lrh_type != OBD_CFG_REC) {
3122                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3123                 RETURN(-EINVAL);
3124         }
3125
3126         cfg_len = REC_DATA_LEN(rec);
3127
3128         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3129         if (rc) {
3130                 CERROR("Insane cfg\n");
3131                 RETURN(rc);
3132         }
3133
3134         if (lcfg->lcfg_command == LCFG_MARKER) {
3135                 marker = lustre_cfg_buf(lcfg, 1);
3136
3137                 if (marker->cm_flags & CM_START &&
3138                     marker->cm_flags & CM_SKIP)
3139                         msrd->msrd_skip = 1;
3140                 if (marker->cm_flags & CM_END)
3141                         msrd->msrd_skip = 0;
3142
3143                 RETURN(0);
3144         }
3145
3146         if (msrd->msrd_skip)
3147                 RETURN(0);
3148
3149         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3150                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3151                 RETURN(0);
3152         }
3153
3154         svname = lustre_cfg_string(lcfg, 0);
3155         if (svname == NULL) {
3156                 CERROR("svname is empty\n");
3157                 RETURN(0);
3158         }
3159
3160         param = lustre_cfg_string(lcfg, 1);
3161         if (param == NULL) {
3162                 CERROR("param is empty\n");
3163                 RETURN(0);
3164         }
3165
3166         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3167         if (rc)
3168                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3169
3170         RETURN(0);
3171 }
3172
3173 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3174                                 struct mgs_device *mgs,
3175                                 struct fs_db *fsdb)
3176 {
3177         struct llog_handle        *llh = NULL;
3178         struct llog_ctxt          *ctxt;
3179         char                      *logname;
3180         struct mgs_srpc_read_data  msrd;
3181         int                        rc;
3182         ENTRY;
3183
3184         /* construct log name */
3185         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3186         if (rc)
3187                 RETURN(rc);
3188
3189         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3190         LASSERT(ctxt != NULL);
3191
3192         if (mgs_log_is_empty(env, mgs, logname))
3193                 GOTO(out, rc = 0);
3194
3195         rc = llog_open(env, ctxt, &llh, NULL, logname,
3196                        LLOG_OPEN_EXISTS);
3197         if (rc < 0) {
3198                 if (rc == -ENOENT)
3199                         rc = 0;
3200                 GOTO(out, rc);
3201         }
3202
3203         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3204         if (rc)
3205                 GOTO(out_close, rc);
3206
3207         if (llog_get_size(llh) <= 1)
3208                 GOTO(out_close, rc = 0);
3209
3210         msrd.msrd_fsdb = fsdb;
3211         msrd.msrd_skip = 0;
3212
3213         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3214                           NULL);
3215
3216 out_close:
3217         llog_close(env, llh);
3218 out:
3219         llog_ctxt_put(ctxt);
3220         name_destroy(&logname);
3221
3222         if (rc)
3223                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3224         RETURN(rc);
3225 }
3226
3227 /* Permanent settings of all parameters by writing into the appropriate
3228  * configuration logs.
3229  * A parameter with null value ("<param>='\0'") means to erase it out of
3230  * the logs.
3231  */
3232 static int mgs_write_log_param(const struct lu_env *env,
3233                                struct mgs_device *mgs, struct fs_db *fsdb,
3234                                struct mgs_target_info *mti, char *ptr)
3235 {
3236         struct mgs_thread_info *mgi = mgs_env_info(env);
3237         char *logname;
3238         char *tmp;
3239         int rc = 0, rc2 = 0;
3240         ENTRY;
3241
3242         /* For various parameter settings, we have to figure out which logs
3243            care about them (e.g. both mdt and client for lov settings) */
3244         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3245
3246         /* The params are stored in MOUNT_DATA_FILE and modified via
3247            tunefs.lustre, or set using lctl conf_param */
3248
3249         /* Processed in lustre_start_mgc */
3250         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3251                 GOTO(end, rc);
3252
3253         /* Processed in ost/mdt */
3254         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3255                 GOTO(end, rc);
3256
3257         /* Processed in mgs_write_log_ost */
3258         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3259                 if (mti->mti_flags & LDD_F_PARAM) {
3260                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3261                                            "changed with tunefs.lustre"
3262                                            "and --writeconf\n", ptr);
3263                         rc = -EPERM;
3264                 }
3265                 GOTO(end, rc);
3266         }
3267
3268         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3269                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3270                 GOTO(end, rc);
3271         }
3272
3273         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3274                 /* Add a failover nidlist */
3275                 rc = 0;
3276                 /* We already processed failovers params for new
3277                    targets in mgs_write_log_target */
3278                 if (mti->mti_flags & LDD_F_PARAM) {
3279                         CDEBUG(D_MGS, "Adding failnode\n");
3280                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3281                 }
3282                 GOTO(end, rc);
3283         }
3284
3285         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3286                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3287                 GOTO(end, rc);
3288         }
3289
3290         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3291                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3292                 GOTO(end, rc);
3293         }
3294
3295         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3296                 /* active=0 means off, anything else means on */
3297                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3298                 int i;
3299
3300                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3301                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3302                                            "be (de)activated.\n",
3303                                            mti->mti_svname);
3304                         GOTO(end, rc = -EINVAL);
3305                 }
3306                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3307                               flag ? "de": "re", mti->mti_svname);
3308                 /* Modify clilov */
3309                 rc = name_create(&logname, mti->mti_fsname, "-client");
3310                 if (rc)
3311                         GOTO(end, rc);
3312                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3313                                 mti->mti_svname, "add osc", flag);
3314                 name_destroy(&logname);
3315                 if (rc)
3316                         goto active_err;
3317                 /* Modify mdtlov */
3318                 /* Add to all MDT logs for CMD */
3319                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3320                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3321                                 continue;
3322                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3323                         if (rc)
3324                                 GOTO(end, rc);
3325                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3326                                         mti->mti_svname, "add osc", flag);
3327                         name_destroy(&logname);
3328                         if (rc)
3329                                 goto active_err;
3330                 }
3331         active_err:
3332                 if (rc) {
3333                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3334                                            "log (%d). No permanent "
3335                                            "changes were made to the "
3336                                            "config log.\n",
3337                                            mti->mti_svname, rc);
3338                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3339                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3340                                                    " because the log"
3341                                                    "is in the old 1.4"
3342                                                    "style. Consider "
3343                                                    " --writeconf to "
3344                                                    "update the logs.\n");
3345                         GOTO(end, rc);
3346                 }
3347                 /* Fall through to osc proc for deactivating live OSC
3348                    on running MDT / clients. */
3349         }
3350         /* Below here, let obd's XXX_process_config methods handle it */
3351
3352         /* All lov. in proc */
3353         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3354                 char *mdtlovname;
3355
3356                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3357                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3358                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3359                                            "set on the MDT, not %s. "
3360                                            "Ignoring.\n",
3361                                            mti->mti_svname);
3362                         GOTO(end, rc = 0);
3363                 }
3364
3365                 /* Modify mdtlov */
3366                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3367                         GOTO(end, rc = -ENODEV);
3368
3369                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3370                                              mti->mti_stripe_index);
3371                 if (rc)
3372                         GOTO(end, rc);
3373                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3374                                   &mgi->mgi_bufs, mdtlovname, ptr);
3375                 name_destroy(&logname);
3376                 name_destroy(&mdtlovname);
3377                 if (rc)
3378                         GOTO(end, rc);
3379
3380                 /* Modify clilov */
3381                 rc = name_create(&logname, mti->mti_fsname, "-client");
3382                 if (rc)
3383                         GOTO(end, rc);
3384                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3385                                   fsdb->fsdb_clilov, ptr);
3386                 name_destroy(&logname);
3387                 GOTO(end, rc);
3388         }
3389
3390         /* All osc., mdc., llite. params in proc */
3391         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3392             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3393             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3394                 char *cname;
3395
3396                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3397                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3398                                            " cannot be modified. Consider"
3399                                            " updating the configuration with"
3400                                            " --writeconf\n",
3401                                            mti->mti_svname);
3402                         GOTO(end, rc = -EINVAL);
3403                 }
3404                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3405                         rc = name_create(&cname, mti->mti_fsname, "-client");
3406                         /* Add the client type to match the obdname in
3407                            class_config_llog_handler */
3408                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3409                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3410                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3411                         rc = name_create(&cname, mti->mti_svname, "-osc");
3412                 } else {
3413                         GOTO(end, rc = -EINVAL);
3414                 }
3415                 if (rc)
3416                         GOTO(end, rc);
3417
3418                 /* Forbid direct update of llite root squash parameters.
3419                  * These parameters are indirectly set via the MDT settings.
3420                  * See (LU-1778) */
3421                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3422                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3423                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3424                         LCONSOLE_ERROR("%s: root squash parameters can only "
3425                                 "be updated through MDT component\n",
3426                                 mti->mti_fsname);
3427                         name_destroy(&cname);
3428                         GOTO(end, rc = -EINVAL);
3429                 }
3430
3431                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3432
3433                 /* Modify client */
3434                 rc = name_create(&logname, mti->mti_fsname, "-client");
3435                 if (rc) {
3436                         name_destroy(&cname);
3437                         GOTO(end, rc);
3438                 }
3439                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3440                                   cname, ptr);
3441
3442                 /* osc params affect the MDT as well */
3443                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3444                         int i;
3445
3446                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3447                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3448                                         continue;
3449                                 name_destroy(&cname);
3450                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3451                                                          fsdb, i);
3452                                 name_destroy(&logname);
3453                                 if (rc)
3454                                         break;
3455                                 rc = name_create_mdt(&logname,
3456                                                      mti->mti_fsname, i);
3457                                 if (rc)
3458                                         break;
3459                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3460                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3461                                                           mti, logname,
3462                                                           &mgi->mgi_bufs,
3463                                                           cname, ptr);
3464                                         if (rc)
3465                                                 break;
3466                                 }
3467                         }
3468                 }
3469                 name_destroy(&logname);
3470                 name_destroy(&cname);
3471                 GOTO(end, rc);
3472         }
3473
3474         /* All mdt. params in proc */
3475         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3476                 int i;
3477                 __u32 idx;
3478
3479                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3480                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3481                             MTI_NAME_MAXLEN) == 0)
3482                         /* device is unspecified completely? */
3483                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3484                 else
3485                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3486                 if (rc < 0)
3487                         goto active_err;
3488                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3489                         goto active_err;
3490                 if (rc & LDD_F_SV_ALL) {
3491                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3492                                 if (!test_bit(i,
3493                                                   fsdb->fsdb_mdt_index_map))
3494                                         continue;
3495                                 rc = name_create_mdt(&logname,
3496                                                 mti->mti_fsname, i);
3497                                 if (rc)
3498                                         goto active_err;
3499                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3500                                                   logname, &mgi->mgi_bufs,
3501                                                   logname, ptr);
3502                                 name_destroy(&logname);
3503                                 if (rc)
3504                                         goto active_err;
3505                         }
3506                 } else {
3507                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3508                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3509                                 LCONSOLE_ERROR("%s: root squash parameters "
3510                                         "cannot be applied to a single MDT\n",
3511                                         mti->mti_fsname);
3512                                 GOTO(end, rc = -EINVAL);
3513                         }
3514                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3515                                           mti->mti_svname, &mgi->mgi_bufs,
3516                                           mti->mti_svname, ptr);
3517                         if (rc)
3518                                 goto active_err;
3519                 }
3520
3521                 /* root squash settings are also applied to llite
3522                  * config log (see LU-1778) */
3523                 if (rc == 0 &&
3524                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3525                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3526                         char *cname;
3527                         char *ptr2;
3528
3529                         rc = name_create(&cname, mti->mti_fsname, "-client");
3530                         if (rc)
3531                                 GOTO(end, rc);
3532                         rc = name_create(&logname, mti->mti_fsname, "-client");
3533                         if (rc) {
3534                                 name_destroy(&cname);
3535                                 GOTO(end, rc);
3536                         }
3537                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3538                         if (rc) {
3539                                 name_destroy(&cname);
3540                                 name_destroy(&logname);
3541                                 GOTO(end, rc);
3542                         }
3543                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3544                                           &mgi->mgi_bufs, cname, ptr2);
3545                         name_destroy(&ptr2);
3546                         name_destroy(&logname);
3547                         name_destroy(&cname);
3548                 }
3549                 GOTO(end, rc);
3550         }
3551
3552         /* All mdd., ost. and osd. params in proc */
3553         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3554             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3555             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3556                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3557                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3558                         GOTO(end, rc = -ENODEV);
3559
3560                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3561                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3562                 GOTO(end, rc);
3563         }
3564
3565         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3566         rc2 = -ENOSYS;
3567
3568 end:
3569         if (rc)
3570                 CERROR("err %d on param '%s'\n", rc, ptr);
3571
3572         RETURN(rc ?: rc2);
3573 }
3574
3575 /* Not implementing automatic failover nid addition at this time. */
3576 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3577                       struct mgs_target_info *mti)
3578 {
3579 #if 0
3580         struct fs_db *fsdb;
3581         int rc;
3582         ENTRY;
3583
3584         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3585         if (rc)
3586                 RETURN(rc);
3587
3588         if (mgs_log_is_empty(obd, mti->mti_svname))
3589                 /* should never happen */
3590                 RETURN(-ENOENT);
3591
3592         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3593
3594         /* FIXME We can just check mti->params to see if we're already in
3595            the failover list.  Modify mti->params for rewriting back at
3596            server_register_target(). */
3597
3598         mutex_lock(&fsdb->fsdb_mutex);
3599         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3600         mutex_unlock(&fsdb->fsdb_mutex);
3601         char    *buf, *params;
3602         int      rc = -EINVAL;
3603
3604         RETURN(rc);
3605 #endif
3606         return 0;
3607 }
3608
3609 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3610                          struct mgs_target_info *mti, struct fs_db *fsdb)
3611 {
3612         char    *buf, *params;
3613         int      rc = -EINVAL;
3614
3615         ENTRY;
3616
3617         /* set/check the new target index */
3618         rc = mgs_set_index(env, mgs, mti);
3619         if (rc < 0)
3620                 RETURN(rc);
3621
3622         if (rc == EALREADY) {
3623                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3624                               mti->mti_stripe_index, mti->mti_svname);
3625                 /* We would like to mark old log sections as invalid
3626                    and add new log sections in the client and mdt logs.
3627                    But if we add new sections, then live clients will
3628                    get repeat setup instructions for already running
3629                    osc's. So don't update the client/mdt logs. */
3630                 mti->mti_flags &= ~LDD_F_UPDATE;
3631                 rc = 0;
3632         }
3633
3634         mutex_lock(&fsdb->fsdb_mutex);
3635
3636         if (mti->mti_flags &
3637             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3638                 /* Generate a log from scratch */
3639                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3640                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3641                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3642                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3643                 } else {
3644                         CERROR("Unknown target type %#x, can't create log for "
3645                                "%s\n", mti->mti_flags, mti->mti_svname);
3646                 }
3647                 if (rc) {
3648                         CERROR("Can't write logs for %s (%d)\n",
3649                                mti->mti_svname, rc);
3650                         GOTO(out_up, rc);
3651                 }
3652         } else {
3653                 /* Just update the params from tunefs in mgs_write_log_params */
3654                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3655                 mti->mti_flags |= LDD_F_PARAM;
3656         }
3657
3658         /* allocate temporary buffer, where class_get_next_param will
3659            make copy of a current  parameter */
3660         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3661         if (buf == NULL)
3662                 GOTO(out_up, rc = -ENOMEM);
3663         params = mti->mti_params;
3664         while (params != NULL) {
3665                 rc = class_get_next_param(&params, buf);
3666                 if (rc) {
3667                         if (rc == 1)
3668                                 /* there is no next parameter, that is
3669                                    not an error */
3670                                 rc = 0;
3671                         break;
3672                 }
3673                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3674                        params, buf);
3675                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3676                 if (rc)
3677                         break;
3678         }
3679
3680         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3681
3682 out_up:
3683         mutex_unlock(&fsdb->fsdb_mutex);
3684         RETURN(rc);
3685 }
3686
3687 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3688 {
3689         struct llog_ctxt        *ctxt;
3690         int                      rc = 0;
3691
3692         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3693         if (ctxt == NULL) {
3694                 CERROR("%s: MGS config context doesn't exist\n",
3695                        mgs->mgs_obd->obd_name);
3696                 rc = -ENODEV;
3697         } else {
3698                 rc = llog_erase(env, ctxt, NULL, name);
3699                 /* llog may not exist */
3700                 if (rc == -ENOENT)
3701                         rc = 0;
3702                 llog_ctxt_put(ctxt);
3703         }
3704
3705         if (rc)
3706                 CERROR("%s: failed to clear log %s: %d\n",
3707                        mgs->mgs_obd->obd_name, name, rc);
3708
3709         return rc;
3710 }
3711
3712 /* erase all logs for the given fs */
3713 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3714 {
3715         struct fs_db *fsdb;
3716         struct list_head log_list;
3717         struct mgs_direntry *dirent, *n;
3718         int rc, len = strlen(fsname);
3719         char *suffix;
3720         ENTRY;
3721
3722         /* Find all the logs in the CONFIGS directory */
3723         rc = class_dentry_readdir(env, mgs, &log_list);
3724         if (rc)
3725                 RETURN(rc);
3726
3727         mutex_lock(&mgs->mgs_mutex);
3728
3729         /* Delete the fs db */
3730         fsdb = mgs_find_fsdb(mgs, fsname);
3731         if (fsdb)
3732                 mgs_free_fsdb(mgs, fsdb);
3733
3734         mutex_unlock(&mgs->mgs_mutex);
3735
3736         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3737                 list_del_init(&dirent->mde_list);
3738                 suffix = strrchr(dirent->mde_name, '-');
3739                 if (suffix != NULL) {
3740                         if ((len == suffix - dirent->mde_name) &&
3741                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3742                                 CDEBUG(D_MGS, "Removing log %s\n",
3743                                        dirent->mde_name);
3744                                 mgs_erase_log(env, mgs, dirent->mde_name);
3745                         }
3746                 }
3747                 mgs_direntry_free(dirent);
3748         }
3749
3750         RETURN(rc);
3751 }
3752
3753 /* list all logs for the given fs */
3754 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3755                   struct obd_ioctl_data *data)
3756 {
3757         struct list_head         log_list;
3758         struct mgs_direntry     *dirent, *n;
3759         char                    *out, *suffix;
3760         int                      l, remains, rc;
3761
3762         ENTRY;
3763
3764         /* Find all the logs in the CONFIGS directory */
3765         rc = class_dentry_readdir(env, mgs, &log_list);
3766         if (rc)
3767                 RETURN(rc);
3768
3769         out = data->ioc_bulk;
3770         remains = data->ioc_inllen1;
3771         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3772                 list_del_init(&dirent->mde_list);
3773                 suffix = strrchr(dirent->mde_name, '-');
3774                 if (suffix != NULL) {
3775                         l = snprintf(out, remains, "config log: $%s\n",
3776                                      dirent->mde_name);
3777                         out += l;
3778                         remains -= l;
3779                 }
3780                 mgs_direntry_free(dirent);
3781                 if (remains <= 0)
3782                         break;
3783         }
3784         RETURN(rc);
3785 }
3786
3787 /* from llog_swab */
3788 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3789 {
3790         int i;
3791         ENTRY;
3792
3793         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3794         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3795
3796         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3797         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3798         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3799         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3800
3801         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3802         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3803                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3804                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3805                                i, lcfg->lcfg_buflens[i],
3806                                lustre_cfg_string(lcfg, i));
3807                 }
3808         EXIT;
3809 }
3810
3811 /* Setup params fsdb and log
3812  */
3813 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3814                           struct fs_db *fsdb)
3815 {
3816         struct llog_handle      *params_llh = NULL;
3817         int                     rc;
3818         ENTRY;
3819
3820         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3821         if (fsdb != NULL) {
3822                 mutex_lock(&fsdb->fsdb_mutex);
3823                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3824                 if (rc == 0)
3825                         rc = record_end_log(env, &params_llh);
3826                 mutex_unlock(&fsdb->fsdb_mutex);
3827         }
3828
3829         RETURN(rc);
3830 }
3831
3832 /* Cleanup params fsdb and log
3833  */
3834 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3835 {
3836         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3837 }
3838
3839 /* Set a permanent (config log) param for a target or fs
3840  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3841  *             buf1 contains the single parameter
3842  */
3843 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3844                  struct lustre_cfg *lcfg, char *fsname)
3845 {
3846         struct fs_db *fsdb;
3847         struct mgs_target_info *mti;
3848         char *devname, *param;
3849         char *ptr;
3850         const char *tmp;
3851         __u32 index;
3852         int rc = 0;
3853         ENTRY;
3854
3855         print_lustre_cfg(lcfg);
3856
3857         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3858         devname = lustre_cfg_string(lcfg, 0);
3859         param = lustre_cfg_string(lcfg, 1);
3860         if (!devname) {
3861                 /* Assume device name embedded in param:
3862                    lustre-OST0000.osc.max_dirty_mb=32 */
3863                 ptr = strchr(param, '.');
3864                 if (ptr) {
3865                         devname = param;
3866                         *ptr = 0;
3867                         param = ptr + 1;
3868                 }
3869         }
3870         if (!devname) {
3871                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3872                 RETURN(-ENOSYS);
3873         }
3874
3875         rc = mgs_parse_devname(devname, fsname, NULL);
3876         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3877                 /* param related to llite isn't allowed to set by OST or MDT */
3878                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3879                                        sizeof(PARAM_LLITE) - 1) == 0)
3880                         RETURN(-EINVAL);
3881         } else {
3882                 /* assume devname is the fsname */
3883                 memset(fsname, 0, MTI_NAME_MAXLEN);
3884                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3885                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3886         }
3887         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3888
3889         rc = mgs_find_or_make_fsdb(env, mgs,
3890                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3891                                    PARAMS_FILENAME : fsname, &fsdb);
3892         if (rc)
3893                 RETURN(rc);
3894
3895         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3896             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3897             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3898                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3899                        "is '%s'\n", fsname, devname);
3900                 mgs_free_fsdb(mgs, fsdb);
3901                 RETURN(-EINVAL);
3902         }
3903
3904         /* Create a fake mti to hold everything */
3905         OBD_ALLOC_PTR(mti);
3906         if (!mti)
3907                 GOTO(out, rc = -ENOMEM);
3908         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3909             >= sizeof(mti->mti_fsname))
3910                 GOTO(out, rc = -E2BIG);
3911         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3912             >= sizeof(mti->mti_svname))
3913                 GOTO(out, rc = -E2BIG);
3914         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3915             >= sizeof(mti->mti_params))
3916                 GOTO(out, rc = -E2BIG);
3917         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3918         if (rc < 0)
3919                 /* Not a valid server; may be only fsname */
3920                 rc = 0;
3921         else
3922                 /* Strip -osc or -mdc suffix from svname */
3923                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3924                                      mti->mti_svname))
3925                         GOTO(out, rc = -EINVAL);
3926         /*
3927          * Revoke lock so everyone updates.  Should be alright if
3928          * someone was already reading while we were updating the logs,
3929          * so we don't really need to hold the lock while we're
3930          * writing (above).
3931          */
3932         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3933                 mti->mti_flags = rc | LDD_F_PARAM2;
3934                 mutex_lock(&fsdb->fsdb_mutex);
3935                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3936                 mutex_unlock(&fsdb->fsdb_mutex);
3937                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3938         } else {
3939                 mti->mti_flags = rc | LDD_F_PARAM;
3940                 mutex_lock(&fsdb->fsdb_mutex);
3941                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3942                 mutex_unlock(&fsdb->fsdb_mutex);
3943                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3944         }
3945
3946 out:
3947         OBD_FREE_PTR(mti);
3948         RETURN(rc);
3949 }
3950
3951 static int mgs_write_log_pool(const struct lu_env *env,
3952                               struct mgs_device *mgs, char *logname,
3953                               struct fs_db *fsdb, char *tgtname,
3954                               enum lcfg_command_type cmd,
3955                               char *fsname, char *poolname,
3956                               char *ostname, char *comment)
3957 {
3958         struct llog_handle *llh = NULL;
3959         int rc;
3960
3961         rc = record_start_log(env, mgs, &llh, logname);
3962         if (rc)
3963                 return rc;
3964         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3965         if (rc)
3966                 goto out;
3967         rc = record_base(env, llh, tgtname, 0, cmd,
3968                          fsname, poolname, ostname, 0);
3969         if (rc)
3970                 goto out;
3971         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3972 out:
3973         record_end_log(env, &llh);
3974         return rc;
3975 }
3976
3977 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3978                     enum lcfg_command_type cmd, const char *nodemap_name,
3979                     const char *param)
3980 {
3981         lnet_nid_t      nid[2];
3982         __u32           idmap[2];
3983         bool            bool_switch;
3984         __u32           int_id;
3985         int             rc = 0;
3986         ENTRY;
3987
3988         switch (cmd) {
3989         case LCFG_NODEMAP_ADD:
3990                 rc = nodemap_add(nodemap_name);
3991                 break;
3992         case LCFG_NODEMAP_DEL:
3993                 rc = nodemap_del(nodemap_name);
3994                 break;
3995         case LCFG_NODEMAP_ADD_RANGE:
3996                 rc = nodemap_parse_range(param, nid);
3997                 if (rc != 0)
3998                         break;
3999                 rc = nodemap_add_range(nodemap_name, nid);
4000                 break;
4001         case LCFG_NODEMAP_DEL_RANGE:
4002                 rc = nodemap_parse_range(param, nid);
4003                 if (rc != 0)
4004                         break;
4005                 rc = nodemap_del_range(nodemap_name, nid);
4006                 break;
4007         case LCFG_NODEMAP_ADMIN:
4008                 bool_switch = simple_strtoul(param, NULL, 10);
4009                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4010                 break;
4011         case LCFG_NODEMAP_TRUSTED:
4012                 bool_switch = simple_strtoul(param, NULL, 10);
4013                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4014                 break;
4015         case LCFG_NODEMAP_SQUASH_UID:
4016                 int_id = simple_strtoul(param, NULL, 10);
4017                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4018                 break;
4019         case LCFG_NODEMAP_SQUASH_GID:
4020                 int_id = simple_strtoul(param, NULL, 10);
4021                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4022                 break;
4023         case LCFG_NODEMAP_ADD_UIDMAP:
4024         case LCFG_NODEMAP_ADD_GIDMAP:
4025                 rc = nodemap_parse_idmap(param, idmap);
4026                 if (rc != 0)
4027                         break;
4028                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4029                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4030                                                idmap);
4031                 else
4032                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4033                                                idmap);
4034                 break;
4035         case LCFG_NODEMAP_DEL_UIDMAP:
4036         case LCFG_NODEMAP_DEL_GIDMAP:
4037                 rc = nodemap_parse_idmap(param, idmap);
4038                 if (rc != 0)
4039                         break;
4040                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4041                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4042                                                idmap);
4043                 else
4044                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4045                                                idmap);
4046                 break;
4047         default:
4048                 rc = -EINVAL;
4049         }
4050
4051         RETURN(rc);
4052 }
4053
4054 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4055                  enum lcfg_command_type cmd, char *fsname,
4056                  char *poolname, char *ostname)
4057 {
4058         struct fs_db *fsdb;
4059         char *lovname;
4060         char *logname;
4061         char *label = NULL, *canceled_label = NULL;
4062         int label_sz;
4063         struct mgs_target_info *mti = NULL;
4064         int rc, i;
4065         ENTRY;
4066
4067         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4068         if (rc) {
4069                 CERROR("Can't get db for %s\n", fsname);
4070                 RETURN(rc);
4071         }
4072         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4073                 CERROR("%s is not defined\n", fsname);
4074                 mgs_free_fsdb(mgs, fsdb);
4075                 RETURN(-EINVAL);
4076         }
4077
4078         label_sz = 10 + strlen(fsname) + strlen(poolname);
4079
4080         /* check if ostname match fsname */
4081         if (ostname != NULL) {
4082                 char *ptr;
4083
4084                 ptr = strrchr(ostname, '-');
4085                 if ((ptr == NULL) ||
4086                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4087                         RETURN(-EINVAL);
4088                 label_sz += strlen(ostname);
4089         }
4090
4091         OBD_ALLOC(label, label_sz);
4092         if (label == NULL)
4093                 RETURN(-ENOMEM);
4094
4095         switch(cmd) {
4096         case LCFG_POOL_NEW:
4097                 sprintf(label,
4098                         "new %s.%s", fsname, poolname);
4099                 break;
4100         case LCFG_POOL_ADD:
4101                 sprintf(label,
4102                         "add %s.%s.%s", fsname, poolname, ostname);
4103                 break;
4104         case LCFG_POOL_REM:
4105                 OBD_ALLOC(canceled_label, label_sz);
4106                 if (canceled_label == NULL)
4107                         GOTO(out_label, rc = -ENOMEM);
4108                 sprintf(label,
4109                         "rem %s.%s.%s", fsname, poolname, ostname);
4110                 sprintf(canceled_label,
4111                         "add %s.%s.%s", fsname, poolname, ostname);
4112                 break;
4113         case LCFG_POOL_DEL:
4114                 OBD_ALLOC(canceled_label, label_sz);
4115                 if (canceled_label == NULL)
4116                         GOTO(out_label, rc = -ENOMEM);
4117                 sprintf(label,
4118                         "del %s.%s", fsname, poolname);
4119                 sprintf(canceled_label,
4120                         "new %s.%s", fsname, poolname);
4121                 break;
4122         default:
4123                 break;
4124         }
4125
4126         if (canceled_label != NULL) {
4127                 OBD_ALLOC_PTR(mti);
4128                 if (mti == NULL)
4129                         GOTO(out_cancel, rc = -ENOMEM);
4130         }
4131
4132         mutex_lock(&fsdb->fsdb_mutex);
4133         /* write pool def to all MDT logs */
4134         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4135                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4136                         rc = name_create_mdt_and_lov(&logname, &lovname,
4137                                                      fsdb, i);
4138                         if (rc) {
4139                                 mutex_unlock(&fsdb->fsdb_mutex);
4140                                 GOTO(out_mti, rc);
4141                         }
4142                         if (canceled_label != NULL) {
4143                                 strcpy(mti->mti_svname, "lov pool");
4144                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4145                                                 lovname, canceled_label,
4146                                                 CM_SKIP);
4147                         }
4148
4149                         if (rc >= 0)
4150                                 rc = mgs_write_log_pool(env, mgs, logname,
4151                                                         fsdb, lovname, cmd,
4152                                                         fsname, poolname,
4153                                                         ostname, label);
4154                         name_destroy(&logname);
4155                         name_destroy(&lovname);
4156                         if (rc) {
4157                                 mutex_unlock(&fsdb->fsdb_mutex);
4158                                 GOTO(out_mti, rc);
4159                         }
4160                 }
4161         }
4162
4163         rc = name_create(&logname, fsname, "-client");
4164         if (rc) {
4165                 mutex_unlock(&fsdb->fsdb_mutex);
4166                 GOTO(out_mti, rc);
4167         }
4168         if (canceled_label != NULL) {
4169                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4170                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4171                 if (rc < 0) {
4172                         mutex_unlock(&fsdb->fsdb_mutex);
4173                         name_destroy(&logname);
4174                         GOTO(out_mti, rc);
4175                 }
4176         }
4177
4178         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4179                                 cmd, fsname, poolname, ostname, label);
4180         mutex_unlock(&fsdb->fsdb_mutex);
4181         name_destroy(&logname);
4182         /* request for update */
4183         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4184
4185         EXIT;
4186 out_mti:
4187         if (mti != NULL)
4188                 OBD_FREE_PTR(mti);
4189 out_cancel:
4190         if (canceled_label != NULL)
4191                 OBD_FREE(canceled_label, label_sz);
4192 out_label:
4193         OBD_FREE(label, label_sz);
4194         return rc;
4195 }