Whamcloud - gitweb
LU-8454 llite: normal user can't set FS default stripe
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         if (rc > 0)
118                 rc = 0;
119
120         iops->put(env, it);
121
122 fini:
123         iops->fini(env, it);
124         if (rc)
125                 CERROR("%s: key failed when listing %s: rc = %d\n",
126                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
127         RETURN(rc);
128 }
129
130 /******************** DB functions *********************/
131
132 static inline int name_create(char **newname, char *prefix, char *suffix)
133 {
134         LASSERT(newname);
135         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
136         if (!*newname)
137                 return -ENOMEM;
138         sprintf(*newname, "%s%s", prefix, suffix);
139         return 0;
140 }
141
142 static inline void name_destroy(char **name)
143 {
144         if (*name)
145                 OBD_FREE(*name, strlen(*name) + 1);
146         *name = NULL;
147 }
148
149 struct mgs_fsdb_handler_data
150 {
151         struct fs_db   *fsdb;
152         __u32           ver;
153 };
154
155 /* from the (client) config log, figure out:
156         1. which ost's/mdt's are configured (by index)
157         2. what the last config step is
158         3. COMPAT_18 osc name
159 */
160 /* It might be better to have a separate db file, instead of parsing the info
161    out of the client log.  This is slow and potentially error-prone. */
162 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
163                             struct llog_rec_hdr *rec, void *data)
164 {
165         struct mgs_fsdb_handler_data *d = data;
166         struct fs_db *fsdb = d->fsdb;
167         int cfg_len = rec->lrh_len;
168         char *cfg_buf = (char*) (rec + 1);
169         struct lustre_cfg *lcfg;
170         __u32 index;
171         int rc = 0;
172         ENTRY;
173
174         if (rec->lrh_type != OBD_CFG_REC) {
175                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
176                 RETURN(-EINVAL);
177         }
178
179         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
180         if (rc) {
181                 CERROR("Insane cfg\n");
182                 RETURN(rc);
183         }
184
185         lcfg = (struct lustre_cfg *)cfg_buf;
186
187         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
188                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
189
190         /* Figure out ost indicies */
191         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
192         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
193             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
194                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
195                                        NULL, 10);
196                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
197                        lustre_cfg_string(lcfg, 1), index,
198                        lustre_cfg_string(lcfg, 2));
199                 set_bit(index, fsdb->fsdb_ost_index_map);
200         }
201
202         /* Figure out mdt indicies */
203         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
204         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
205             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
206                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
207                                        &index, NULL);
208                 if (rc != LDD_F_SV_TYPE_MDT) {
209                         CWARN("Unparsable MDC name %s, assuming index 0\n",
210                               lustre_cfg_string(lcfg, 0));
211                         index = 0;
212                 }
213                 rc = 0;
214                 CDEBUG(D_MGS, "MDT index is %u\n", index);
215                 set_bit(index, fsdb->fsdb_mdt_index_map);
216                 fsdb->fsdb_mdt_count ++;
217         }
218
219         /**
220          * figure out the old config. fsdb_gen = 0 means old log
221          * It is obsoleted and not supported anymore
222          */
223         if (fsdb->fsdb_gen == 0) {
224                 CERROR("Old config format is not supported\n");
225                 RETURN(-EINVAL);
226         }
227
228         /*
229          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
230          */
231         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
232             lcfg->lcfg_command == LCFG_ATTACH &&
233             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
234                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
235                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
236                         CWARN("MDT using 1.8 OSC name scheme\n");
237                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
238                 }
239         }
240
241         if (lcfg->lcfg_command == LCFG_MARKER) {
242                 struct cfg_marker *marker;
243                 marker = lustre_cfg_buf(lcfg, 1);
244
245                 d->ver = marker->cm_vers;
246
247                 /* Keep track of the latest marker step */
248                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
249         }
250
251         RETURN(rc);
252 }
253
254 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
255 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
256                                   struct mgs_device *mgs,
257                                   struct fs_db *fsdb)
258 {
259         char                            *logname;
260         struct llog_handle              *loghandle;
261         struct llog_ctxt                *ctxt;
262         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
263         int rc;
264
265         ENTRY;
266
267         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
268         LASSERT(ctxt != NULL);
269         rc = name_create(&logname, fsdb->fsdb_name, "-client");
270         if (rc)
271                 GOTO(out_put, rc);
272         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
273         if (rc)
274                 GOTO(out_pop, rc);
275
276         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
277         if (rc)
278                 GOTO(out_close, rc);
279
280         if (llog_get_size(loghandle) <= 1)
281                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
282
283         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
284         CDEBUG(D_INFO, "get_db = %d\n", rc);
285 out_close:
286         llog_close(env, loghandle);
287 out_pop:
288         name_destroy(&logname);
289 out_put:
290         llog_ctxt_put(ctxt);
291
292         RETURN(rc);
293 }
294
295 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
296 {
297         struct mgs_tgt_srpc_conf *tgtconf;
298
299         /* free target-specific rules */
300         while (fsdb->fsdb_srpc_tgt) {
301                 tgtconf = fsdb->fsdb_srpc_tgt;
302                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
303
304                 LASSERT(tgtconf->mtsc_tgt);
305
306                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
307                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
308                 OBD_FREE_PTR(tgtconf);
309         }
310
311         /* free general rules */
312         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
313 }
314
315 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
316 {
317         struct fs_db *fsdb;
318         struct list_head *tmp;
319
320         list_for_each(tmp, &mgs->mgs_fs_db_list) {
321                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
322                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
323                         return fsdb;
324         }
325         return NULL;
326 }
327
328 /* caller must hold the mgs->mgs_fs_db_lock */
329 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
330                                   struct mgs_device *mgs, char *fsname)
331 {
332         struct fs_db *fsdb;
333         int rc;
334         ENTRY;
335
336         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
337                 CERROR("fsname %s is too long\n", fsname);
338                 RETURN(ERR_PTR(-EINVAL));
339         }
340
341         OBD_ALLOC_PTR(fsdb);
342         if (!fsdb)
343                 RETURN(ERR_PTR(-ENOMEM));
344
345         strcpy(fsdb->fsdb_name, fsname);
346         mutex_init(&fsdb->fsdb_mutex);
347         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
348         fsdb->fsdb_gen = 1;
349
350         if (strcmp(fsname, MGSSELF_NAME) == 0) {
351                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
352                 fsdb->fsdb_mgs = mgs;
353         } else {
354                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
355                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
356                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
357                         CERROR("No memory for index maps\n");
358                         GOTO(err, rc = -ENOMEM);
359                 }
360
361                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
362                 if (rc)
363                         GOTO(err, rc);
364                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
365                 if (rc)
366                         GOTO(err, rc);
367
368                 /* initialise data for NID table */
369                 mgs_ir_init_fs(env, mgs, fsdb);
370
371                 lproc_mgs_add_live(mgs, fsdb);
372         }
373
374         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
375
376         RETURN(fsdb);
377 err:
378         if (fsdb->fsdb_ost_index_map)
379                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
380         if (fsdb->fsdb_mdt_index_map)
381                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
382         name_destroy(&fsdb->fsdb_clilov);
383         name_destroy(&fsdb->fsdb_clilmv);
384         OBD_FREE_PTR(fsdb);
385         RETURN(ERR_PTR(rc));
386 }
387
388 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
389 {
390         /* wait for anyone with the sem */
391         mutex_lock(&fsdb->fsdb_mutex);
392         lproc_mgs_del_live(mgs, fsdb);
393         list_del(&fsdb->fsdb_list);
394
395         /* deinitialize fsr */
396         mgs_ir_fini_fs(mgs, fsdb);
397
398         if (fsdb->fsdb_ost_index_map)
399                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
400         if (fsdb->fsdb_mdt_index_map)
401                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
402         name_destroy(&fsdb->fsdb_clilov);
403         name_destroy(&fsdb->fsdb_clilmv);
404         mgs_free_fsdb_srpc(fsdb);
405         mutex_unlock(&fsdb->fsdb_mutex);
406         OBD_FREE_PTR(fsdb);
407 }
408
409 int mgs_init_fsdb_list(struct mgs_device *mgs)
410 {
411         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
412         return 0;
413 }
414
415 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
416 {
417         struct fs_db *fsdb;
418         struct list_head *tmp, *tmp2;
419
420         mutex_lock(&mgs->mgs_mutex);
421         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (!IS_ERR(fsdb))
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (IS_ERR(fsdb))
452                 RETURN(PTR_ERR(fsdb));
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         /* the last index(0xffff) is reserved for default value. */
565         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
566                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
567                                    "but index must be less than %u.\n",
568                                    mti->mti_svname, mti->mti_stripe_index,
569                                    INDEX_MAP_SIZE * 8 - 1);
570                 GOTO(out_up, rc = -ERANGE);
571         }
572
573         if (test_bit(mti->mti_stripe_index, imap)) {
574                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
575                     !(mti->mti_flags & LDD_F_WRITECONF)) {
576                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
577                                            "%d, but that index is already in "
578                                            "use. Use --writeconf to force\n",
579                                            mti->mti_svname,
580                                            mti->mti_stripe_index);
581                         GOTO(out_up, rc = -EADDRINUSE);
582                 } else {
583                         CDEBUG(D_MGS, "Server %s updating index %d\n",
584                                mti->mti_svname, mti->mti_stripe_index);
585                         GOTO(out_up, rc = EALREADY);
586                 }
587         }
588
589         set_bit(mti->mti_stripe_index, imap);
590         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
591         mutex_unlock(&fsdb->fsdb_mutex);
592         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
593                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
594
595         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
596                mti->mti_stripe_index);
597
598         RETURN(0);
599 out_up:
600         mutex_unlock(&fsdb->fsdb_mutex);
601         return rc;
602 }
603
604 struct mgs_modify_lookup {
605         struct cfg_marker mml_marker;
606         int               mml_modified;
607 };
608
609 static int mgs_modify_handler(const struct lu_env *env,
610                               struct llog_handle *llh,
611                               struct llog_rec_hdr *rec, void *data)
612 {
613         struct mgs_modify_lookup *mml = data;
614         struct cfg_marker *marker;
615         struct lustre_cfg *lcfg = REC_DATA(rec);
616         int cfg_len = REC_DATA_LEN(rec);
617         int rc;
618         ENTRY;
619
620         if (rec->lrh_type != OBD_CFG_REC) {
621                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
622                 RETURN(-EINVAL);
623         }
624
625         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
626         if (rc) {
627                 CERROR("Insane cfg\n");
628                 RETURN(rc);
629         }
630
631         /* We only care about markers */
632         if (lcfg->lcfg_command != LCFG_MARKER)
633                 RETURN(0);
634
635         marker = lustre_cfg_buf(lcfg, 1);
636         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
637             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
638             !(marker->cm_flags & CM_SKIP)) {
639                 /* Found a non-skipped marker match */
640                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
641                        rec->lrh_index, marker->cm_step,
642                        marker->cm_flags, mml->mml_marker.cm_flags,
643                        marker->cm_tgtname, marker->cm_comment);
644                 /* Overwrite the old marker llog entry */
645                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
646                 marker->cm_flags |= mml->mml_marker.cm_flags;
647                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
648                 rc = llog_write(env, llh, rec, rec->lrh_index);
649                 if (!rc)
650                          mml->mml_modified++;
651         }
652
653         RETURN(rc);
654 }
655
656 /**
657  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
658  * Return code:
659  * 0 - modified successfully,
660  * 1 - no modification was done
661  * negative - error
662  */
663 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
664                       struct fs_db *fsdb, struct mgs_target_info *mti,
665                       char *logname, char *devname, char *comment, int flags)
666 {
667         struct llog_handle *loghandle;
668         struct llog_ctxt *ctxt;
669         struct mgs_modify_lookup *mml;
670         int rc;
671
672         ENTRY;
673
674         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
675         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
676                flags);
677
678         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
679         LASSERT(ctxt != NULL);
680         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
681         if (rc < 0) {
682                 if (rc == -ENOENT)
683                         rc = 0;
684                 GOTO(out_pop, rc);
685         }
686
687         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
688         if (rc)
689                 GOTO(out_close, rc);
690
691         if (llog_get_size(loghandle) <= 1)
692                 GOTO(out_close, rc = 0);
693
694         OBD_ALLOC_PTR(mml);
695         if (!mml)
696                 GOTO(out_close, rc = -ENOMEM);
697         if (strlcpy(mml->mml_marker.cm_comment, comment,
698                     sizeof(mml->mml_marker.cm_comment)) >=
699             sizeof(mml->mml_marker.cm_comment))
700                 GOTO(out_free, rc = -E2BIG);
701         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
702                     sizeof(mml->mml_marker.cm_tgtname)) >=
703             sizeof(mml->mml_marker.cm_tgtname))
704                 GOTO(out_free, rc = -E2BIG);
705         /* Modify mostly means cancel */
706         mml->mml_marker.cm_flags = flags;
707         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
708         mml->mml_modified = 0;
709         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
710                           NULL);
711         if (!rc && !mml->mml_modified)
712                 rc = 1;
713
714 out_free:
715         OBD_FREE_PTR(mml);
716
717 out_close:
718         llog_close(env, loghandle);
719 out_pop:
720         if (rc < 0)
721                 CERROR("%s: modify %s/%s failed: rc = %d\n",
722                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
723         llog_ctxt_put(ctxt);
724         RETURN(rc);
725 }
726
727 /** This structure is passed to mgs_replace_handler */
728 struct mgs_replace_uuid_lookup {
729         /* Nids are replaced for this target device */
730         struct mgs_target_info target;
731         /* Temporary modified llog */
732         struct llog_handle *temp_llh;
733         /* Flag is set if in target block*/
734         int in_target_device;
735         /* Nids already added. Just skip (multiple nids) */
736         int device_nids_added;
737         /* Flag is set if this block should not be copied */
738         int skip_it;
739 };
740
741 /**
742  * Check: a) if block should be skipped
743  * b) is it target block
744  *
745  * \param[in] lcfg
746  * \param[in] mrul
747  *
748  * \retval 0 should not to be skipped
749  * \retval 1 should to be skipped
750  */
751 static int check_markers(struct lustre_cfg *lcfg,
752                          struct mgs_replace_uuid_lookup *mrul)
753 {
754          struct cfg_marker *marker;
755
756         /* Track markers. Find given device */
757         if (lcfg->lcfg_command == LCFG_MARKER) {
758                 marker = lustre_cfg_buf(lcfg, 1);
759                 /* Clean llog from records marked as CM_EXCLUDE.
760                    CM_SKIP records are used for "active" command
761                    and can be restored if needed */
762                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
763                     (CM_EXCLUDE | CM_START)) {
764                         mrul->skip_it = 1;
765                         return 1;
766                 }
767
768                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
769                     (CM_EXCLUDE | CM_END)) {
770                         mrul->skip_it = 0;
771                         return 1;
772                 }
773
774                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
775                         LASSERT(!(marker->cm_flags & CM_START) ||
776                                 !(marker->cm_flags & CM_END));
777                         if (marker->cm_flags & CM_START) {
778                                 mrul->in_target_device = 1;
779                                 mrul->device_nids_added = 0;
780                         } else if (marker->cm_flags & CM_END)
781                                 mrul->in_target_device = 0;
782                 }
783         }
784
785         return 0;
786 }
787
788 static int record_base(const struct lu_env *env, struct llog_handle *llh,
789                      char *cfgname, lnet_nid_t nid, int cmd,
790                      char *s1, char *s2, char *s3, char *s4)
791 {
792         struct mgs_thread_info  *mgi = mgs_env_info(env);
793         struct llog_cfg_rec     *lcr;
794         int rc;
795
796         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
797                cmd, s1, s2, s3, s4);
798
799         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
800         if (s1)
801                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
802         if (s2)
803                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
804         if (s3)
805                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
806         if (s4)
807                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
808
809         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
810         if (lcr == NULL)
811                 return -ENOMEM;
812
813         lcr->lcr_cfg.lcfg_nid = nid;
814         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
815
816         lustre_cfg_rec_free(lcr);
817
818         if (rc < 0)
819                 CDEBUG(D_MGS,
820                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
821                        cfgname, cmd, s1, s2, s3, s4, rc);
822         return rc;
823 }
824
825 static inline int record_add_uuid(const struct lu_env *env,
826                                   struct llog_handle *llh,
827                                   uint64_t nid, char *uuid)
828 {
829         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
830                            NULL, NULL, NULL);
831 }
832
833 static inline int record_add_conn(const struct lu_env *env,
834                                   struct llog_handle *llh,
835                                   char *devname, char *uuid)
836 {
837         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
838                            NULL, NULL, NULL);
839 }
840
841 static inline int record_attach(const struct lu_env *env,
842                                 struct llog_handle *llh, char *devname,
843                                 char *type, char *uuid)
844 {
845         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
846                            NULL, NULL);
847 }
848
849 static inline int record_setup(const struct lu_env *env,
850                                struct llog_handle *llh, char *devname,
851                                char *s1, char *s2, char *s3, char *s4)
852 {
853         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
854 }
855
856 /**
857  * \retval <0 record processing error
858  * \retval n record is processed. No need copy original one.
859  * \retval 0 record is not processed.
860  */
861 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
862                            struct mgs_replace_uuid_lookup *mrul)
863 {
864         int nids_added = 0;
865         lnet_nid_t nid;
866         char *ptr;
867         int rc;
868
869         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
870                 /* LCFG_ADD_UUID command found. Let's skip original command
871                    and add passed nids */
872                 ptr = mrul->target.mti_params;
873                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
874                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
875                                "device %s\n", libcfs_nid2str(nid),
876                                 mrul->target.mti_params,
877                                 mrul->target.mti_svname);
878                         rc = record_add_uuid(env,
879                                              mrul->temp_llh, nid,
880                                              mrul->target.mti_params);
881                         if (!rc)
882                                 nids_added++;
883                 }
884
885                 if (nids_added == 0) {
886                         CERROR("No new nids were added, nid %s with uuid %s, "
887                                "device %s\n", libcfs_nid2str(nid),
888                                mrul->target.mti_params,
889                                mrul->target.mti_svname);
890                         RETURN(-ENXIO);
891                 } else {
892                         mrul->device_nids_added = 1;
893                 }
894
895                 return nids_added;
896         }
897
898         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
899                 /* LCFG_SETUP command found. UUID should be changed */
900                 rc = record_setup(env,
901                                   mrul->temp_llh,
902                                   /* devname the same */
903                                   lustre_cfg_string(lcfg, 0),
904                                   /* s1 is not changed */
905                                   lustre_cfg_string(lcfg, 1),
906                                   /* new uuid should be
907                                   the full nidlist */
908                                   mrul->target.mti_params,
909                                   /* s3 is not changed */
910                                   lustre_cfg_string(lcfg, 3),
911                                   /* s4 is not changed */
912                                   lustre_cfg_string(lcfg, 4));
913                 return rc ? rc : 1;
914         }
915
916         /* Another commands in target device block */
917         return 0;
918 }
919
920 /**
921  * Handler that called for every record in llog.
922  * Records are processed in order they placed in llog.
923  *
924  * \param[in] llh       log to be processed
925  * \param[in] rec       current record
926  * \param[in] data      mgs_replace_uuid_lookup structure
927  *
928  * \retval 0    success
929  */
930 static int mgs_replace_handler(const struct lu_env *env,
931                                struct llog_handle *llh,
932                                struct llog_rec_hdr *rec,
933                                void *data)
934 {
935         struct mgs_replace_uuid_lookup *mrul;
936         struct lustre_cfg *lcfg = REC_DATA(rec);
937         int cfg_len = REC_DATA_LEN(rec);
938         int rc;
939         ENTRY;
940
941         mrul = (struct mgs_replace_uuid_lookup *)data;
942
943         if (rec->lrh_type != OBD_CFG_REC) {
944                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
945                        rec->lrh_type, lcfg->lcfg_command,
946                        lustre_cfg_string(lcfg, 0),
947                        lustre_cfg_string(lcfg, 1));
948                 RETURN(-EINVAL);
949         }
950
951         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
952         if (rc) {
953                 /* Do not copy any invalidated records */
954                 GOTO(skip_out, rc = 0);
955         }
956
957         rc = check_markers(lcfg, mrul);
958         if (rc || mrul->skip_it)
959                 GOTO(skip_out, rc = 0);
960
961         /* Write to new log all commands outside target device block */
962         if (!mrul->in_target_device)
963                 GOTO(copy_out, rc = 0);
964
965         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
966            (failover nids) for this target, assuming that if then
967            primary is changing then so is the failover */
968         if (mrul->device_nids_added &&
969             (lcfg->lcfg_command == LCFG_ADD_UUID ||
970              lcfg->lcfg_command == LCFG_ADD_CONN))
971                 GOTO(skip_out, rc = 0);
972
973         rc = process_command(env, lcfg, mrul);
974         if (rc < 0)
975                 RETURN(rc);
976
977         if (rc)
978                 RETURN(0);
979 copy_out:
980         /* Record is placed in temporary llog as is */
981         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
982
983         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
984                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
985                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
986         RETURN(rc);
987
988 skip_out:
989         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
990                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
991                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
992         RETURN(rc);
993 }
994
995 static int mgs_log_is_empty(const struct lu_env *env,
996                             struct mgs_device *mgs, char *name)
997 {
998         struct llog_ctxt        *ctxt;
999         int                      rc;
1000
1001         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1002         LASSERT(ctxt != NULL);
1003
1004         rc = llog_is_empty(env, ctxt, name);
1005         llog_ctxt_put(ctxt);
1006         return rc;
1007 }
1008
1009 static int mgs_replace_nids_log(const struct lu_env *env,
1010                                 struct obd_device *mgs, struct fs_db *fsdb,
1011                                 char *logname, char *devname, char *nids)
1012 {
1013         struct llog_handle *orig_llh, *backup_llh;
1014         struct llog_ctxt *ctxt;
1015         struct mgs_replace_uuid_lookup *mrul;
1016         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1017         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1018         char *backup;
1019         int rc, rc2;
1020         ENTRY;
1021
1022         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1023
1024         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1025         LASSERT(ctxt != NULL);
1026
1027         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1028                 /* Log is empty. Nothing to replace */
1029                 GOTO(out_put, rc = 0);
1030         }
1031
1032         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1033         if (backup == NULL)
1034                 GOTO(out_put, rc = -ENOMEM);
1035
1036         sprintf(backup, "%s.bak", logname);
1037
1038         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1039         if (rc == 0) {
1040                 /* Now erase original log file. Connections are not allowed.
1041                    Backup is already saved */
1042                 rc = llog_erase(env, ctxt, NULL, logname);
1043                 if (rc < 0)
1044                         GOTO(out_free, rc);
1045         } else if (rc != -ENOENT) {
1046                 CERROR("%s: can't make backup for %s: rc = %d\n",
1047                        mgs->obd_name, logname, rc);
1048                 GOTO(out_free,rc);
1049         }
1050
1051         /* open local log */
1052         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1053         if (rc)
1054                 GOTO(out_restore, rc);
1055
1056         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1057         if (rc)
1058                 GOTO(out_closel, rc);
1059
1060         /* open backup llog */
1061         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1062                        LLOG_OPEN_EXISTS);
1063         if (rc)
1064                 GOTO(out_closel, rc);
1065
1066         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1067         if (rc)
1068                 GOTO(out_close, rc);
1069
1070         if (llog_get_size(backup_llh) <= 1)
1071                 GOTO(out_close, rc = 0);
1072
1073         OBD_ALLOC_PTR(mrul);
1074         if (!mrul)
1075                 GOTO(out_close, rc = -ENOMEM);
1076         /* devname is only needed information to replace UUID records */
1077         strlcpy(mrul->target.mti_svname, devname,
1078                 sizeof(mrul->target.mti_svname));
1079         /* parse nids later */
1080         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1081         /* Copy records to this temporary llog */
1082         mrul->temp_llh = orig_llh;
1083
1084         rc = llog_process(env, backup_llh, mgs_replace_handler,
1085                           (void *)mrul, NULL);
1086         OBD_FREE_PTR(mrul);
1087 out_close:
1088         rc2 = llog_close(NULL, backup_llh);
1089         if (!rc)
1090                 rc = rc2;
1091 out_closel:
1092         rc2 = llog_close(NULL, orig_llh);
1093         if (!rc)
1094                 rc = rc2;
1095
1096 out_restore:
1097         if (rc) {
1098                 CERROR("%s: llog should be restored: rc = %d\n",
1099                        mgs->obd_name, rc);
1100                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1101                                   logname);
1102                 if (rc2 < 0)
1103                         CERROR("%s: can't restore backup %s: rc = %d\n",
1104                                mgs->obd_name, logname, rc2);
1105         }
1106
1107 out_free:
1108         OBD_FREE(backup, strlen(backup) + 1);
1109
1110 out_put:
1111         llog_ctxt_put(ctxt);
1112
1113         if (rc)
1114                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1115                        mgs->obd_name, logname, rc);
1116
1117         RETURN(rc);
1118 }
1119
1120 /**
1121  * Parse device name and get file system name and/or device index
1122  *
1123  * \param[in]   devname device name (ex. lustre-MDT0000)
1124  * \param[out]  fsname  file system name(optional)
1125  * \param[out]  index   device index(optional)
1126  *
1127  * \retval 0    success
1128  */
1129 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1130 {
1131         int rc;
1132         ENTRY;
1133
1134         /* Extract fsname */
1135         if (fsname) {
1136                 rc = server_name2fsname(devname, fsname, NULL);
1137                 if (rc < 0) {
1138                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1139                                devname);
1140                         RETURN(-EINVAL);
1141                 }
1142         }
1143
1144         if (index) {
1145                 rc = server_name2index(devname, index, NULL);
1146                 if (rc < 0) {
1147                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1148                                devname);
1149                         RETURN(-EINVAL);
1150                 }
1151         }
1152
1153         RETURN(0);
1154 }
1155
1156 /* This is only called during replace_nids */
1157 static int only_mgs_is_running(struct obd_device *mgs_obd)
1158 {
1159         /* TDB: Is global variable with devices count exists? */
1160         int num_devices = get_devices_count();
1161         int num_exports = 0;
1162         struct obd_export *exp;
1163
1164         spin_lock(&mgs_obd->obd_dev_lock);
1165         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1166                 /* skip self export */
1167                 if (exp == mgs_obd->obd_self_export)
1168                         continue;
1169                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1170                         continue;
1171
1172                 ++num_exports;
1173
1174                 CERROR("%s: node %s still connected during replace_nids "
1175                        "connect_flags:%llx\n",
1176                        mgs_obd->obd_name,
1177                        libcfs_nid2str(exp->exp_nid_stats->nid),
1178                        exp_connect_flags(exp));
1179
1180         }
1181         spin_unlock(&mgs_obd->obd_dev_lock);
1182
1183         /* osd, MGS and MGC + self_export
1184            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1185         return (num_devices <= 3) && (num_exports == 0);
1186 }
1187
1188 static int name_create_mdt(char **logname, char *fsname, int i)
1189 {
1190         char mdt_index[9];
1191
1192         sprintf(mdt_index, "-MDT%04x", i);
1193         return name_create(logname, fsname, mdt_index);
1194 }
1195
1196 /**
1197  * Replace nids for \a device to \a nids values
1198  *
1199  * \param obd           MGS obd device
1200  * \param devname       nids need to be replaced for this device
1201  * (ex. lustre-OST0000)
1202  * \param nids          nids list (ex. nid1,nid2,nid3)
1203  *
1204  * \retval 0    success
1205  */
1206 int mgs_replace_nids(const struct lu_env *env,
1207                      struct mgs_device *mgs,
1208                      char *devname, char *nids)
1209 {
1210         /* Assume fsname is part of device name */
1211         char fsname[MTI_NAME_MAXLEN];
1212         int rc;
1213         __u32 index;
1214         char *logname;
1215         struct fs_db *fsdb;
1216         unsigned int i;
1217         int conn_state;
1218         struct obd_device *mgs_obd = mgs->mgs_obd;
1219         ENTRY;
1220
1221         /* We can only change NIDs if no other nodes are connected */
1222         spin_lock(&mgs_obd->obd_dev_lock);
1223         conn_state = mgs_obd->obd_no_conn;
1224         mgs_obd->obd_no_conn = 1;
1225         spin_unlock(&mgs_obd->obd_dev_lock);
1226
1227         /* We can not change nids if not only MGS is started */
1228         if (!only_mgs_is_running(mgs_obd)) {
1229                 CERROR("Only MGS is allowed to be started\n");
1230                 GOTO(out, rc = -EINPROGRESS);
1231         }
1232
1233         /* Get fsname and index*/
1234         rc = mgs_parse_devname(devname, fsname, &index);
1235         if (rc)
1236                 GOTO(out, rc);
1237
1238         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1239         if (rc) {
1240                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1241                 GOTO(out, rc);
1242         }
1243
1244         /* Process client llogs */
1245         name_create(&logname, fsname, "-client");
1246         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1247         name_destroy(&logname);
1248         if (rc) {
1249                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1250                        fsname, devname, rc);
1251                 GOTO(out, rc);
1252         }
1253
1254         /* Process MDT llogs */
1255         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1256                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1257                         continue;
1258                 name_create_mdt(&logname, fsname, i);
1259                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1260                 name_destroy(&logname);
1261                 if (rc)
1262                         GOTO(out, rc);
1263         }
1264
1265 out:
1266         spin_lock(&mgs_obd->obd_dev_lock);
1267         mgs_obd->obd_no_conn = conn_state;
1268         spin_unlock(&mgs_obd->obd_dev_lock);
1269
1270         RETURN(rc);
1271 }
1272
1273 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1274                             char *devname, struct lov_desc *desc)
1275 {
1276         struct mgs_thread_info  *mgi = mgs_env_info(env);
1277         struct llog_cfg_rec     *lcr;
1278         int rc;
1279
1280         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1281         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1282         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1283         if (lcr == NULL)
1284                 return -ENOMEM;
1285
1286         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1287         lustre_cfg_rec_free(lcr);
1288         return rc;
1289 }
1290
1291 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1292                             char *devname, struct lmv_desc *desc)
1293 {
1294         struct mgs_thread_info  *mgi = mgs_env_info(env);
1295         struct llog_cfg_rec     *lcr;
1296         int rc;
1297
1298         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1299         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1300         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1301         if (lcr == NULL)
1302                 return -ENOMEM;
1303
1304         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1305         lustre_cfg_rec_free(lcr);
1306         return rc;
1307 }
1308
1309 static inline int record_mdc_add(const struct lu_env *env,
1310                                  struct llog_handle *llh,
1311                                  char *logname, char *mdcuuid,
1312                                  char *mdtuuid, char *index,
1313                                  char *gen)
1314 {
1315         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1316                            mdtuuid,index,gen,mdcuuid);
1317 }
1318
1319 static inline int record_lov_add(const struct lu_env *env,
1320                                  struct llog_handle *llh,
1321                                  char *lov_name, char *ost_uuid,
1322                                  char *index, char *gen)
1323 {
1324         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1325                            ost_uuid, index, gen, NULL);
1326 }
1327
1328 static inline int record_mount_opt(const struct lu_env *env,
1329                                    struct llog_handle *llh,
1330                                    char *profile, char *lov_name,
1331                                    char *mdc_name)
1332 {
1333         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1334                            profile, lov_name, mdc_name, NULL);
1335 }
1336
1337 static int record_marker(const struct lu_env *env,
1338                          struct llog_handle *llh,
1339                          struct fs_db *fsdb, __u32 flags,
1340                          char *tgtname, char *comment)
1341 {
1342         struct mgs_thread_info *mgi = mgs_env_info(env);
1343         struct llog_cfg_rec *lcr;
1344         int rc;
1345         int cplen = 0;
1346
1347         if (flags & CM_START)
1348                 fsdb->fsdb_gen++;
1349         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1350         mgi->mgi_marker.cm_flags = flags;
1351         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1352         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1353                         sizeof(mgi->mgi_marker.cm_tgtname));
1354         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1355                 return -E2BIG;
1356         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1357                         sizeof(mgi->mgi_marker.cm_comment));
1358         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1359                 return -E2BIG;
1360         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1361         mgi->mgi_marker.cm_canceltime = 0;
1362         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1363         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1364                             sizeof(mgi->mgi_marker));
1365         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1366         if (lcr == NULL)
1367                 return -ENOMEM;
1368
1369         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1370         lustre_cfg_rec_free(lcr);
1371         return rc;
1372 }
1373
1374 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1375                             struct llog_handle **llh, char *name)
1376 {
1377         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1378         struct llog_ctxt        *ctxt;
1379         int                      rc = 0;
1380         ENTRY;
1381
1382         if (*llh)
1383                 GOTO(out, rc = -EBUSY);
1384
1385         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1386         if (!ctxt)
1387                 GOTO(out, rc = -ENODEV);
1388         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1389
1390         rc = llog_open_create(env, ctxt, llh, NULL, name);
1391         if (rc)
1392                 GOTO(out_ctxt, rc);
1393         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1394         if (rc)
1395                 llog_close(env, *llh);
1396 out_ctxt:
1397         llog_ctxt_put(ctxt);
1398 out:
1399         if (rc) {
1400                 CERROR("%s: can't start log %s: rc = %d\n",
1401                        mgs->mgs_obd->obd_name, name, rc);
1402                 *llh = NULL;
1403         }
1404         RETURN(rc);
1405 }
1406
1407 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1408 {
1409         int rc;
1410
1411         rc = llog_close(env, *llh);
1412         *llh = NULL;
1413
1414         return rc;
1415 }
1416
1417 /******************** config "macros" *********************/
1418
1419 /* write an lcfg directly into a log (with markers) */
1420 static int mgs_write_log_direct(const struct lu_env *env,
1421                                 struct mgs_device *mgs, struct fs_db *fsdb,
1422                                 char *logname, struct llog_cfg_rec *lcr,
1423                                 char *devname, char *comment)
1424 {
1425         struct llog_handle *llh = NULL;
1426         int rc;
1427
1428         ENTRY;
1429
1430         rc = record_start_log(env, mgs, &llh, logname);
1431         if (rc)
1432                 RETURN(rc);
1433
1434         /* FIXME These should be a single journal transaction */
1435         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1436         if (rc)
1437                 GOTO(out_end, rc);
1438         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1439         if (rc)
1440                 GOTO(out_end, rc);
1441         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1442         if (rc)
1443                 GOTO(out_end, rc);
1444 out_end:
1445         record_end_log(env, &llh);
1446         RETURN(rc);
1447 }
1448
1449 /* write the lcfg in all logs for the given fs */
1450 static int mgs_write_log_direct_all(const struct lu_env *env,
1451                                     struct mgs_device *mgs,
1452                                     struct fs_db *fsdb,
1453                                     struct mgs_target_info *mti,
1454                                     struct llog_cfg_rec *lcr, char *devname,
1455                                     char *comment, int server_only)
1456 {
1457         struct list_head         log_list;
1458         struct mgs_direntry     *dirent, *n;
1459         char                    *fsname = mti->mti_fsname;
1460         int                      rc = 0, len = strlen(fsname);
1461
1462         ENTRY;
1463         /* Find all the logs in the CONFIGS directory */
1464         rc = class_dentry_readdir(env, mgs, &log_list);
1465         if (rc)
1466                 RETURN(rc);
1467
1468         /* Could use fsdb index maps instead of directory listing */
1469         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1470                 list_del_init(&dirent->mde_list);
1471                 /* don't write to sptlrpc rule log */
1472                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1473                         goto next;
1474
1475                 /* caller wants write server logs only */
1476                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1477                         goto next;
1478
1479                 if (strlen(dirent->mde_name) <= len ||
1480                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1481                     dirent->mde_name[len] != '-')
1482                         goto next;
1483
1484                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1485                 /* Erase any old settings of this same parameter */
1486                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1487                                 devname, comment, CM_SKIP);
1488                 if (rc < 0)
1489                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1490                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1491                 if (lcr == NULL)
1492                         goto next;
1493                 /* Write the new one */
1494                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1495                                           lcr, devname, comment);
1496                 if (rc != 0)
1497                         CERROR("%s: writing log %s: rc = %d\n",
1498                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1499 next:
1500                 mgs_direntry_free(dirent);
1501         }
1502
1503         RETURN(rc);
1504 }
1505
1506 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1507                                     struct mgs_device *mgs,
1508                                     struct fs_db *fsdb,
1509                                     struct mgs_target_info *mti,
1510                                     int index, char *logname);
1511 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1512                                     struct mgs_device *mgs,
1513                                     struct fs_db *fsdb,
1514                                     struct mgs_target_info *mti,
1515                                     char *logname, char *suffix, char *lovname,
1516                                     enum lustre_sec_part sec_part, int flags);
1517 static int name_create_mdt_and_lov(char **logname, char **lovname,
1518                                    struct fs_db *fsdb, int i);
1519
1520 static int add_param(char *params, char *key, char *val)
1521 {
1522         char *start = params + strlen(params);
1523         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1524         int keylen = 0;
1525
1526         if (key != NULL)
1527                 keylen = strlen(key);
1528         if (start + 1 + keylen + strlen(val) >= end) {
1529                 CERROR("params are too long: %s %s%s\n",
1530                        params, key != NULL ? key : "", val);
1531                 return -EINVAL;
1532         }
1533
1534         sprintf(start, " %s%s", key != NULL ? key : "", val);
1535         return 0;
1536 }
1537
1538 /**
1539  * Walk through client config log record and convert the related records
1540  * into the target.
1541  **/
1542 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1543                                          struct llog_handle *llh,
1544                                          struct llog_rec_hdr *rec, void *data)
1545 {
1546         struct mgs_device *mgs;
1547         struct obd_device *obd;
1548         struct mgs_target_info *mti, *tmti;
1549         struct fs_db *fsdb;
1550         int cfg_len = rec->lrh_len;
1551         char *cfg_buf = (char*) (rec + 1);
1552         struct lustre_cfg *lcfg;
1553         int rc = 0;
1554         struct llog_handle *mdt_llh = NULL;
1555         static int got_an_osc_or_mdc = 0;
1556         /* 0: not found any osc/mdc;
1557            1: found osc;
1558            2: found mdc;
1559         */
1560         static int last_step = -1;
1561         int cplen = 0;
1562
1563         ENTRY;
1564
1565         mti = ((struct temp_comp*)data)->comp_mti;
1566         tmti = ((struct temp_comp*)data)->comp_tmti;
1567         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1568         obd = ((struct temp_comp *)data)->comp_obd;
1569         mgs = lu2mgs_dev(obd->obd_lu_dev);
1570         LASSERT(mgs);
1571
1572         if (rec->lrh_type != OBD_CFG_REC) {
1573                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1574                 RETURN(-EINVAL);
1575         }
1576
1577         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1578         if (rc) {
1579                 CERROR("Insane cfg\n");
1580                 RETURN(rc);
1581         }
1582
1583         lcfg = (struct lustre_cfg *)cfg_buf;
1584
1585         if (lcfg->lcfg_command == LCFG_MARKER) {
1586                 struct cfg_marker *marker;
1587                 marker = lustre_cfg_buf(lcfg, 1);
1588                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1589                     (marker->cm_flags & CM_START) &&
1590                      !(marker->cm_flags & CM_SKIP)) {
1591                         got_an_osc_or_mdc = 1;
1592                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1593                                         sizeof(tmti->mti_svname));
1594                         if (cplen >= sizeof(tmti->mti_svname))
1595                                 RETURN(-E2BIG);
1596                         rc = record_start_log(env, mgs, &mdt_llh,
1597                                               mti->mti_svname);
1598                         if (rc)
1599                                 RETURN(rc);
1600                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1601                                            mti->mti_svname, "add osc(copied)");
1602                         record_end_log(env, &mdt_llh);
1603                         last_step = marker->cm_step;
1604                         RETURN(rc);
1605                 }
1606                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1607                     (marker->cm_flags & CM_END) &&
1608                      !(marker->cm_flags & CM_SKIP)) {
1609                         LASSERT(last_step == marker->cm_step);
1610                         last_step = -1;
1611                         got_an_osc_or_mdc = 0;
1612                         memset(tmti, 0, sizeof(*tmti));
1613                         rc = record_start_log(env, mgs, &mdt_llh,
1614                                               mti->mti_svname);
1615                         if (rc)
1616                                 RETURN(rc);
1617                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1618                                            mti->mti_svname, "add osc(copied)");
1619                         record_end_log(env, &mdt_llh);
1620                         RETURN(rc);
1621                 }
1622                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1623                     (marker->cm_flags & CM_START) &&
1624                      !(marker->cm_flags & CM_SKIP)) {
1625                         got_an_osc_or_mdc = 2;
1626                         last_step = marker->cm_step;
1627                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1628                                strlen(marker->cm_tgtname));
1629
1630                         RETURN(rc);
1631                 }
1632                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1633                     (marker->cm_flags & CM_END) &&
1634                      !(marker->cm_flags & CM_SKIP)) {
1635                         LASSERT(last_step == marker->cm_step);
1636                         last_step = -1;
1637                         got_an_osc_or_mdc = 0;
1638                         memset(tmti, 0, sizeof(*tmti));
1639                         RETURN(rc);
1640                 }
1641         }
1642
1643         if (got_an_osc_or_mdc == 0 || last_step < 0)
1644                 RETURN(rc);
1645
1646         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1647                 __u64 nodenid = lcfg->lcfg_nid;
1648
1649                 if (strlen(tmti->mti_uuid) == 0) {
1650                         /* target uuid not set, this config record is before
1651                          * LCFG_SETUP, this nid is one of target node nid.
1652                          */
1653                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1654                         tmti->mti_nid_count++;
1655                 } else {
1656                         char nidstr[LNET_NIDSTR_SIZE];
1657
1658                         /* failover node nid */
1659                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1660                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1661                                         nidstr);
1662                 }
1663
1664                 RETURN(rc);
1665         }
1666
1667         if (lcfg->lcfg_command == LCFG_SETUP) {
1668                 char *target;
1669
1670                 target = lustre_cfg_string(lcfg, 1);
1671                 memcpy(tmti->mti_uuid, target, strlen(target));
1672                 RETURN(rc);
1673         }
1674
1675         /* ignore client side sptlrpc_conf_log */
1676         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1677                 RETURN(rc);
1678
1679         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1680                 int index;
1681
1682                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1683                         RETURN (-EINVAL);
1684
1685                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1686                        strlen(mti->mti_fsname));
1687                 tmti->mti_stripe_index = index;
1688
1689                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1690                                               mti->mti_stripe_index,
1691                                               mti->mti_svname);
1692                 memset(tmti, 0, sizeof(*tmti));
1693                 RETURN(rc);
1694         }
1695
1696         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1697                 int index;
1698                 char mdt_index[9];
1699                 char *logname, *lovname;
1700
1701                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1702                                              mti->mti_stripe_index);
1703                 if (rc)
1704                         RETURN(rc);
1705                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1706
1707                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1708                         name_destroy(&logname);
1709                         name_destroy(&lovname);
1710                         RETURN(-EINVAL);
1711                 }
1712
1713                 tmti->mti_stripe_index = index;
1714                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1715                                          mdt_index, lovname,
1716                                          LUSTRE_SP_MDT, 0);
1717                 name_destroy(&logname);
1718                 name_destroy(&lovname);
1719                 RETURN(rc);
1720         }
1721         RETURN(rc);
1722 }
1723
1724 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1725 /* stealed from mgs_get_fsdb_from_llog*/
1726 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1727                                               struct mgs_device *mgs,
1728                                               char *client_name,
1729                                               struct temp_comp* comp)
1730 {
1731         struct llog_handle *loghandle;
1732         struct mgs_target_info *tmti;
1733         struct llog_ctxt *ctxt;
1734         int rc;
1735
1736         ENTRY;
1737
1738         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1739         LASSERT(ctxt != NULL);
1740
1741         OBD_ALLOC_PTR(tmti);
1742         if (tmti == NULL)
1743                 GOTO(out_ctxt, rc = -ENOMEM);
1744
1745         comp->comp_tmti = tmti;
1746         comp->comp_obd = mgs->mgs_obd;
1747
1748         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1749                        LLOG_OPEN_EXISTS);
1750         if (rc < 0) {
1751                 if (rc == -ENOENT)
1752                         rc = 0;
1753                 GOTO(out_pop, rc);
1754         }
1755
1756         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1757         if (rc)
1758                 GOTO(out_close, rc);
1759
1760         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1761                                   (void *)comp, NULL, false);
1762         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1763 out_close:
1764         llog_close(env, loghandle);
1765 out_pop:
1766         OBD_FREE_PTR(tmti);
1767 out_ctxt:
1768         llog_ctxt_put(ctxt);
1769         RETURN(rc);
1770 }
1771
1772 /* lmv is the second thing for client logs */
1773 /* copied from mgs_write_log_lov. Please refer to that.  */
1774 static int mgs_write_log_lmv(const struct lu_env *env,
1775                              struct mgs_device *mgs,
1776                              struct fs_db *fsdb,
1777                              struct mgs_target_info *mti,
1778                              char *logname, char *lmvname)
1779 {
1780         struct llog_handle *llh = NULL;
1781         struct lmv_desc *lmvdesc;
1782         char *uuid;
1783         int rc = 0;
1784         ENTRY;
1785
1786         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1787
1788         OBD_ALLOC_PTR(lmvdesc);
1789         if (lmvdesc == NULL)
1790                 RETURN(-ENOMEM);
1791         lmvdesc->ld_active_tgt_count = 0;
1792         lmvdesc->ld_tgt_count = 0;
1793         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1794         uuid = (char *)lmvdesc->ld_uuid.uuid;
1795
1796         rc = record_start_log(env, mgs, &llh, logname);
1797         if (rc)
1798                 GOTO(out_free, rc);
1799         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1800         if (rc)
1801                 GOTO(out_end, rc);
1802         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1803         if (rc)
1804                 GOTO(out_end, rc);
1805         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1806         if (rc)
1807                 GOTO(out_end, rc);
1808         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1809         if (rc)
1810                 GOTO(out_end, rc);
1811 out_end:
1812         record_end_log(env, &llh);
1813 out_free:
1814         OBD_FREE_PTR(lmvdesc);
1815         RETURN(rc);
1816 }
1817
1818 /* lov is the first thing in the mdt and client logs */
1819 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1820                              struct fs_db *fsdb, struct mgs_target_info *mti,
1821                              char *logname, char *lovname)
1822 {
1823         struct llog_handle *llh = NULL;
1824         struct lov_desc *lovdesc;
1825         char *uuid;
1826         int rc = 0;
1827         ENTRY;
1828
1829         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1830
1831         /*
1832         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1833         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1834               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1835         */
1836
1837         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1838         OBD_ALLOC_PTR(lovdesc);
1839         if (lovdesc == NULL)
1840                 RETURN(-ENOMEM);
1841         lovdesc->ld_magic = LOV_DESC_MAGIC;
1842         lovdesc->ld_tgt_count = 0;
1843         /* Defaults.  Can be changed later by lcfg config_param */
1844         lovdesc->ld_default_stripe_count = 1;
1845         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1846         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1847         lovdesc->ld_default_stripe_offset = -1;
1848         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1849         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1850         /* can these be the same? */
1851         uuid = (char *)lovdesc->ld_uuid.uuid;
1852
1853         /* This should always be the first entry in a log.
1854         rc = mgs_clear_log(obd, logname); */
1855         rc = record_start_log(env, mgs, &llh, logname);
1856         if (rc)
1857                 GOTO(out_free, rc);
1858         /* FIXME these should be a single journal transaction */
1859         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1860         if (rc)
1861                 GOTO(out_end, rc);
1862         rc = record_attach(env, llh, lovname, "lov", uuid);
1863         if (rc)
1864                 GOTO(out_end, rc);
1865         rc = record_lov_setup(env, llh, lovname, lovdesc);
1866         if (rc)
1867                 GOTO(out_end, rc);
1868         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1869         if (rc)
1870                 GOTO(out_end, rc);
1871         EXIT;
1872 out_end:
1873         record_end_log(env, &llh);
1874 out_free:
1875         OBD_FREE_PTR(lovdesc);
1876         return rc;
1877 }
1878
1879 /* add failnids to open log */
1880 static int mgs_write_log_failnids(const struct lu_env *env,
1881                                   struct mgs_target_info *mti,
1882                                   struct llog_handle *llh,
1883                                   char *cliname)
1884 {
1885         char *failnodeuuid = NULL;
1886         char *ptr = mti->mti_params;
1887         lnet_nid_t nid;
1888         int rc = 0;
1889
1890         /*
1891         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1892         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1893         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1894         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1895         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1896         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1897         */
1898
1899         /*
1900          * Pull failnid info out of params string, which may contain something
1901          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1902          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1903          * etc.  However, convert_hostnames() should have caught those.
1904          */
1905         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1906                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1907                         char nidstr[LNET_NIDSTR_SIZE];
1908
1909                         if (failnodeuuid == NULL) {
1910                                 /* We don't know the failover node name,
1911                                  * so just use the first nid as the uuid */
1912                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1913                                 rc = name_create(&failnodeuuid, nidstr, "");
1914                                 if (rc != 0)
1915                                         return rc;
1916                         }
1917                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1918                                 "client %s\n",
1919                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1920                                 failnodeuuid, cliname);
1921                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1922                         /*
1923                          * If *ptr is ':', we have added all NIDs for
1924                          * failnodeuuid.
1925                          */
1926                         if (*ptr == ':') {
1927                                 rc = record_add_conn(env, llh, cliname,
1928                                                      failnodeuuid);
1929                                 name_destroy(&failnodeuuid);
1930                                 failnodeuuid = NULL;
1931                         }
1932                 }
1933                 if (failnodeuuid) {
1934                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1935                         name_destroy(&failnodeuuid);
1936                         failnodeuuid = NULL;
1937                 }
1938         }
1939
1940         return rc;
1941 }
1942
1943 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1944                                     struct mgs_device *mgs,
1945                                     struct fs_db *fsdb,
1946                                     struct mgs_target_info *mti,
1947                                     char *logname, char *lmvname)
1948 {
1949         struct llog_handle *llh = NULL;
1950         char *mdcname = NULL;
1951         char *nodeuuid = NULL;
1952         char *mdcuuid = NULL;
1953         char *lmvuuid = NULL;
1954         char index[6];
1955         char nidstr[LNET_NIDSTR_SIZE];
1956         int i, rc;
1957         ENTRY;
1958
1959         if (mgs_log_is_empty(env, mgs, logname)) {
1960                 CERROR("log is empty! Logical error\n");
1961                 RETURN(-EINVAL);
1962         }
1963
1964         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1965                mti->mti_svname, logname, lmvname);
1966
1967         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1968         rc = name_create(&nodeuuid, nidstr, "");
1969         if (rc)
1970                 RETURN(rc);
1971         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1972         if (rc)
1973                 GOTO(out_free, rc);
1974         rc = name_create(&mdcuuid, mdcname, "_UUID");
1975         if (rc)
1976                 GOTO(out_free, rc);
1977         rc = name_create(&lmvuuid, lmvname, "_UUID");
1978         if (rc)
1979                 GOTO(out_free, rc);
1980
1981         rc = record_start_log(env, mgs, &llh, logname);
1982         if (rc)
1983                 GOTO(out_free, rc);
1984         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1985                            "add mdc");
1986         if (rc)
1987                 GOTO(out_end, rc);
1988         for (i = 0; i < mti->mti_nid_count; i++) {
1989                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1990                         libcfs_nid2str_r(mti->mti_nids[i],
1991                                          nidstr, sizeof(nidstr)));
1992
1993                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1994                 if (rc)
1995                         GOTO(out_end, rc);
1996         }
1997
1998         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1999         if (rc)
2000                 GOTO(out_end, rc);
2001         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2002                           NULL, NULL);
2003         if (rc)
2004                 GOTO(out_end, rc);
2005         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2006         if (rc)
2007                 GOTO(out_end, rc);
2008         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2009         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2010                             index, "1");
2011         if (rc)
2012                 GOTO(out_end, rc);
2013         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2014                            "add mdc");
2015         if (rc)
2016                 GOTO(out_end, rc);
2017 out_end:
2018         record_end_log(env, &llh);
2019 out_free:
2020         name_destroy(&lmvuuid);
2021         name_destroy(&mdcuuid);
2022         name_destroy(&mdcname);
2023         name_destroy(&nodeuuid);
2024         RETURN(rc);
2025 }
2026
2027 static inline int name_create_lov(char **lovname, char *mdtname,
2028                                   struct fs_db *fsdb, int index)
2029 {
2030         /* COMPAT_180 */
2031         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2032                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2033         else
2034                 return name_create(lovname, mdtname, "-mdtlov");
2035 }
2036
2037 static int name_create_mdt_and_lov(char **logname, char **lovname,
2038                                    struct fs_db *fsdb, int i)
2039 {
2040         int rc;
2041
2042         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2043         if (rc)
2044                 return rc;
2045         /* COMPAT_180 */
2046         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2047                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2048         else
2049                 rc = name_create(lovname, *logname, "-mdtlov");
2050         if (rc) {
2051                 name_destroy(logname);
2052                 *logname = NULL;
2053         }
2054         return rc;
2055 }
2056
2057 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2058                                       struct fs_db *fsdb, int i)
2059 {
2060         char suffix[16];
2061
2062         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2063                 sprintf(suffix, "-osc");
2064         else
2065                 sprintf(suffix, "-osc-MDT%04x", i);
2066         return name_create(oscname, ostname, suffix);
2067 }
2068
2069 /* add new mdc to already existent MDS */
2070 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2071                                     struct mgs_device *mgs,
2072                                     struct fs_db *fsdb,
2073                                     struct mgs_target_info *mti,
2074                                     int mdt_index, char *logname)
2075 {
2076         struct llog_handle      *llh = NULL;
2077         char    *nodeuuid = NULL;
2078         char    *ospname = NULL;
2079         char    *lovuuid = NULL;
2080         char    *mdtuuid = NULL;
2081         char    *svname = NULL;
2082         char    *mdtname = NULL;
2083         char    *lovname = NULL;
2084         char    index_str[16];
2085         char    nidstr[LNET_NIDSTR_SIZE];
2086         int     i, rc;
2087
2088         ENTRY;
2089         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2090                 CERROR("log is empty! Logical error\n");
2091                 RETURN (-EINVAL);
2092         }
2093
2094         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2095                logname);
2096
2097         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2098         if (rc)
2099                 RETURN(rc);
2100
2101         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2102         rc = name_create(&nodeuuid, nidstr, "");
2103         if (rc)
2104                 GOTO(out_destory, rc);
2105
2106         rc = name_create(&svname, mdtname, "-osp");
2107         if (rc)
2108                 GOTO(out_destory, rc);
2109
2110         sprintf(index_str, "-MDT%04x", mdt_index);
2111         rc = name_create(&ospname, svname, index_str);
2112         if (rc)
2113                 GOTO(out_destory, rc);
2114
2115         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2116         if (rc)
2117                 GOTO(out_destory, rc);
2118
2119         rc = name_create(&lovuuid, lovname, "_UUID");
2120         if (rc)
2121                 GOTO(out_destory, rc);
2122
2123         rc = name_create(&mdtuuid, mdtname, "_UUID");
2124         if (rc)
2125                 GOTO(out_destory, rc);
2126
2127         rc = record_start_log(env, mgs, &llh, logname);
2128         if (rc)
2129                 GOTO(out_destory, rc);
2130
2131         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2132                            "add osp");
2133         if (rc)
2134                 GOTO(out_destory, rc);
2135
2136         for (i = 0; i < mti->mti_nid_count; i++) {
2137                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2138                         libcfs_nid2str_r(mti->mti_nids[i],
2139                                          nidstr, sizeof(nidstr)));
2140                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2141                 if (rc)
2142                         GOTO(out_end, rc);
2143         }
2144
2145         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2146         if (rc)
2147                 GOTO(out_end, rc);
2148
2149         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2150                           NULL, NULL);
2151         if (rc)
2152                 GOTO(out_end, rc);
2153
2154         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2155         if (rc)
2156                 GOTO(out_end, rc);
2157
2158         /* Add mdc(osp) to lod */
2159         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2160         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2161                          index_str, "1", NULL);
2162         if (rc)
2163                 GOTO(out_end, rc);
2164
2165         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2166         if (rc)
2167                 GOTO(out_end, rc);
2168
2169 out_end:
2170         record_end_log(env, &llh);
2171
2172 out_destory:
2173         name_destroy(&mdtuuid);
2174         name_destroy(&lovuuid);
2175         name_destroy(&lovname);
2176         name_destroy(&ospname);
2177         name_destroy(&svname);
2178         name_destroy(&nodeuuid);
2179         name_destroy(&mdtname);
2180         RETURN(rc);
2181 }
2182
2183 static int mgs_write_log_mdt0(const struct lu_env *env,
2184                               struct mgs_device *mgs,
2185                               struct fs_db *fsdb,
2186                               struct mgs_target_info *mti)
2187 {
2188         char *log = mti->mti_svname;
2189         struct llog_handle *llh = NULL;
2190         char *uuid, *lovname;
2191         char mdt_index[6];
2192         char *ptr = mti->mti_params;
2193         int rc = 0, failout = 0;
2194         ENTRY;
2195
2196         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2197         if (uuid == NULL)
2198                 RETURN(-ENOMEM);
2199
2200         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2201                 failout = (strncmp(ptr, "failout", 7) == 0);
2202
2203         rc = name_create(&lovname, log, "-mdtlov");
2204         if (rc)
2205                 GOTO(out_free, rc);
2206         if (mgs_log_is_empty(env, mgs, log)) {
2207                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2208                 if (rc)
2209                         GOTO(out_lod, rc);
2210         }
2211
2212         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2213
2214         rc = record_start_log(env, mgs, &llh, log);
2215         if (rc)
2216                 GOTO(out_lod, rc);
2217
2218         /* add MDT itself */
2219
2220         /* FIXME this whole fn should be a single journal transaction */
2221         sprintf(uuid, "%s_UUID", log);
2222         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2223         if (rc)
2224                 GOTO(out_lod, rc);
2225         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2226         if (rc)
2227                 GOTO(out_end, rc);
2228         rc = record_mount_opt(env, llh, log, lovname, NULL);
2229         if (rc)
2230                 GOTO(out_end, rc);
2231         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2232                         failout ? "n" : "f");
2233         if (rc)
2234                 GOTO(out_end, rc);
2235         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2236         if (rc)
2237                 GOTO(out_end, rc);
2238 out_end:
2239         record_end_log(env, &llh);
2240 out_lod:
2241         name_destroy(&lovname);
2242 out_free:
2243         OBD_FREE(uuid, sizeof(struct obd_uuid));
2244         RETURN(rc);
2245 }
2246
2247 /* envelope method for all layers log */
2248 static int mgs_write_log_mdt(const struct lu_env *env,
2249                              struct mgs_device *mgs,
2250                              struct fs_db *fsdb,
2251                              struct mgs_target_info *mti)
2252 {
2253         struct mgs_thread_info *mgi = mgs_env_info(env);
2254         struct llog_handle *llh = NULL;
2255         char *cliname;
2256         int rc, i = 0;
2257         ENTRY;
2258
2259         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2260
2261         if (mti->mti_uuid[0] == '\0') {
2262                 /* Make up our own uuid */
2263                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2264                          "%s_UUID", mti->mti_svname);
2265         }
2266
2267         /* add mdt */
2268         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2269         if (rc)
2270                 RETURN(rc);
2271         /* Append the mdt info to the client log */
2272         rc = name_create(&cliname, mti->mti_fsname, "-client");
2273         if (rc)
2274                 RETURN(rc);
2275
2276         if (mgs_log_is_empty(env, mgs, cliname)) {
2277                 /* Start client log */
2278                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2279                                        fsdb->fsdb_clilov);
2280                 if (rc)
2281                         GOTO(out_free, rc);
2282                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2283                                        fsdb->fsdb_clilmv);
2284                 if (rc)
2285                         GOTO(out_free, rc);
2286         }
2287
2288         /*
2289         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2290         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2291         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2292         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2293         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2294         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2295         */
2296
2297         /* copy client info about lov/lmv */
2298         mgi->mgi_comp.comp_mti = mti;
2299         mgi->mgi_comp.comp_fsdb = fsdb;
2300
2301         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2302                                                 &mgi->mgi_comp);
2303         if (rc)
2304                 GOTO(out_free, rc);
2305         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2306                                       fsdb->fsdb_clilmv);
2307         if (rc)
2308                 GOTO(out_free, rc);
2309
2310         /* add mountopts */
2311         rc = record_start_log(env, mgs, &llh, cliname);
2312         if (rc)
2313                 GOTO(out_free, rc);
2314
2315         rc = record_marker(env, llh, fsdb, CM_START, cliname,
2316                            "mount opts");
2317         if (rc)
2318                 GOTO(out_end, rc);
2319         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2320                               fsdb->fsdb_clilmv);
2321         if (rc)
2322                 GOTO(out_end, rc);
2323         rc = record_marker(env, llh, fsdb, CM_END, cliname,
2324                            "mount opts");
2325
2326         if (rc)
2327                 GOTO(out_end, rc);
2328
2329         /* for_all_existing_mdt except current one */
2330         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2331                 if (i !=  mti->mti_stripe_index &&
2332                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2333                         char *logname;
2334
2335                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2336                         if (rc)
2337                                 GOTO(out_end, rc);
2338
2339                         /* NB: If the log for the MDT is empty, it means
2340                          * the MDT is only added to the index
2341                          * map, and not being process yet, i.e. this
2342                          * is an unregistered MDT, see mgs_write_log_target().
2343                          * so we should skip it. Otherwise
2344                          *
2345                          * 1. MGS get register request for MDT1 and MDT2.
2346                          *
2347                          * 2. Then both MDT1 and MDT2 are added into
2348                          * fsdb_mdt_index_map. (see mgs_set_index()).
2349                          *
2350                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2351                          * generate the config log, here, it will regard MDT2
2352                          * as an existent MDT, and generate "add osp" for
2353                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2354                          * MDT0002 config log is still empty, so it will
2355                          * add "add osp" even before "lov setup", which
2356                          * will definitly cause trouble.
2357                          *
2358                          * 4. MDT1 registeration finished, fsdb_mutex is
2359                          * released, then MDT2 get in, then in above
2360                          * mgs_steal_llog_for_mdt_from_client(), it will
2361                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2362                          * which will cause another trouble.*/
2363                         if (!mgs_log_is_empty(env, mgs, logname))
2364                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2365                                                               mti, i, logname);
2366
2367                         name_destroy(&logname);
2368                         if (rc)
2369                                 GOTO(out_end, rc);
2370                 }
2371         }
2372 out_end:
2373         record_end_log(env, &llh);
2374 out_free:
2375         name_destroy(&cliname);
2376         RETURN(rc);
2377 }
2378
2379 /* Add the ost info to the client/mdt lov */
2380 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2381                                     struct mgs_device *mgs, struct fs_db *fsdb,
2382                                     struct mgs_target_info *mti,
2383                                     char *logname, char *suffix, char *lovname,
2384                                     enum lustre_sec_part sec_part, int flags)
2385 {
2386         struct llog_handle *llh = NULL;
2387         char *nodeuuid = NULL;
2388         char *oscname = NULL;
2389         char *oscuuid = NULL;
2390         char *lovuuid = NULL;
2391         char *svname = NULL;
2392         char index[6];
2393         char nidstr[LNET_NIDSTR_SIZE];
2394         int i, rc;
2395         ENTRY;
2396
2397         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2398                 mti->mti_svname, logname);
2399
2400         if (mgs_log_is_empty(env, mgs, logname)) {
2401                 CERROR("log is empty! Logical error\n");
2402                 RETURN(-EINVAL);
2403         }
2404
2405         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2406         rc = name_create(&nodeuuid, nidstr, "");
2407         if (rc)
2408                 RETURN(rc);
2409         rc = name_create(&svname, mti->mti_svname, "-osc");
2410         if (rc)
2411                 GOTO(out_free, rc);
2412
2413         /* for the system upgraded from old 1.8, keep using the old osc naming
2414          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2415         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2416                 rc = name_create(&oscname, svname, "");
2417         else
2418                 rc = name_create(&oscname, svname, suffix);
2419         if (rc)
2420                 GOTO(out_free, rc);
2421
2422         rc = name_create(&oscuuid, oscname, "_UUID");
2423         if (rc)
2424                 GOTO(out_free, rc);
2425         rc = name_create(&lovuuid, lovname, "_UUID");
2426         if (rc)
2427                 GOTO(out_free, rc);
2428
2429
2430         /*
2431         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2432         multihomed (#4)
2433         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2434         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2435         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2436         failover (#6,7)
2437         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2438         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2439         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2440         */
2441
2442         rc = record_start_log(env, mgs, &llh, logname);
2443         if (rc)
2444                 GOTO(out_free, rc);
2445
2446         /* FIXME these should be a single journal transaction */
2447         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2448                            "add osc");
2449         if (rc)
2450                 GOTO(out_end, rc);
2451
2452         /* NB: don't change record order, because upon MDT steal OSC config
2453          * from client, it treats all nids before LCFG_SETUP as target nids
2454          * (multiple interfaces), while nids after as failover node nids.
2455          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2456          */
2457         for (i = 0; i < mti->mti_nid_count; i++) {
2458                 CDEBUG(D_MGS, "add nid %s\n",
2459                         libcfs_nid2str_r(mti->mti_nids[i],
2460                                          nidstr, sizeof(nidstr)));
2461                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2462                 if (rc)
2463                         GOTO(out_end, rc);
2464         }
2465         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2466         if (rc)
2467                 GOTO(out_end, rc);
2468         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2469                           NULL, NULL);
2470         if (rc)
2471                 GOTO(out_end, rc);
2472         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2473         if (rc)
2474                 GOTO(out_end, rc);
2475
2476         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2477
2478         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2479         if (rc)
2480                 GOTO(out_end, rc);
2481         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2482                            "add osc");
2483         if (rc)
2484                 GOTO(out_end, rc);
2485 out_end:
2486         record_end_log(env, &llh);
2487 out_free:
2488         name_destroy(&lovuuid);
2489         name_destroy(&oscuuid);
2490         name_destroy(&oscname);
2491         name_destroy(&svname);
2492         name_destroy(&nodeuuid);
2493         RETURN(rc);
2494 }
2495
2496 static int mgs_write_log_ost(const struct lu_env *env,
2497                              struct mgs_device *mgs, struct fs_db *fsdb,
2498                              struct mgs_target_info *mti)
2499 {
2500         struct llog_handle *llh = NULL;
2501         char *logname, *lovname;
2502         char *ptr = mti->mti_params;
2503         int rc, flags = 0, failout = 0, i;
2504         ENTRY;
2505
2506         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2507
2508         /* The ost startup log */
2509
2510         /* If the ost log already exists, that means that someone reformatted
2511            the ost and it called target_add again. */
2512         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2513                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2514                                    "exists, yet the server claims it never "
2515                                    "registered. It may have been reformatted, "
2516                                    "or the index changed. writeconf the MDT to "
2517                                    "regenerate all logs.\n", mti->mti_svname);
2518                 RETURN(-EALREADY);
2519         }
2520
2521         /*
2522         attach obdfilter ost1 ost1_UUID
2523         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2524         */
2525         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2526                 failout = (strncmp(ptr, "failout", 7) == 0);
2527         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2528         if (rc)
2529                 RETURN(rc);
2530         /* FIXME these should be a single journal transaction */
2531         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2532         if (rc)
2533                 GOTO(out_end, rc);
2534         if (*mti->mti_uuid == '\0')
2535                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2536                          "%s_UUID", mti->mti_svname);
2537         rc = record_attach(env, llh, mti->mti_svname,
2538                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2539         if (rc)
2540                 GOTO(out_end, rc);
2541         rc = record_setup(env, llh, mti->mti_svname,
2542                           "dev"/*ignored*/, "type"/*ignored*/,
2543                           failout ? "n" : "f", NULL/*options*/);
2544         if (rc)
2545                 GOTO(out_end, rc);
2546         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2547         if (rc)
2548                 GOTO(out_end, rc);
2549 out_end:
2550         record_end_log(env, &llh);
2551         if (rc)
2552                 RETURN(rc);
2553         /* We also have to update the other logs where this osc is part of
2554            the lov */
2555
2556         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2557                 /* If we're upgrading, the old mdt log already has our
2558                    entry. Let's do a fake one for fun. */
2559                 /* Note that we can't add any new failnids, since we don't
2560                    know the old osc names. */
2561                 flags = CM_SKIP | CM_UPGRADE146;
2562
2563         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2564                 /* If the update flag isn't set, don't update client/mdt
2565                    logs. */
2566                 flags |= CM_SKIP;
2567                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2568                               "the MDT first to regenerate it.\n",
2569                               mti->mti_svname);
2570         }
2571
2572         /* Add ost to all MDT lov defs */
2573         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2574                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2575                         char mdt_index[9];
2576
2577                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2578                                                      i);
2579                         if (rc)
2580                                 RETURN(rc);
2581                         sprintf(mdt_index, "-MDT%04x", i);
2582                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2583                                                       logname, mdt_index,
2584                                                       lovname, LUSTRE_SP_MDT,
2585                                                       flags);
2586                         name_destroy(&logname);
2587                         name_destroy(&lovname);
2588                         if (rc)
2589                                 RETURN(rc);
2590                 }
2591         }
2592
2593         /* Append ost info to the client log */
2594         rc = name_create(&logname, mti->mti_fsname, "-client");
2595         if (rc)
2596                 RETURN(rc);
2597         if (mgs_log_is_empty(env, mgs, logname)) {
2598                 /* Start client log */
2599                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2600                                        fsdb->fsdb_clilov);
2601                 if (rc)
2602                         GOTO(out_free, rc);
2603                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2604                                        fsdb->fsdb_clilmv);
2605                 if (rc)
2606                         GOTO(out_free, rc);
2607         }
2608         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2609                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2610 out_free:
2611         name_destroy(&logname);
2612         RETURN(rc);
2613 }
2614
2615 static __inline__ int mgs_param_empty(char *ptr)
2616 {
2617         char *tmp;
2618
2619         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2620                 return 1;
2621         return 0;
2622 }
2623
2624 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2625                                           struct mgs_device *mgs,
2626                                           struct fs_db *fsdb,
2627                                           struct mgs_target_info *mti,
2628                                           char *logname, char *cliname)
2629 {
2630         int rc;
2631         struct llog_handle *llh = NULL;
2632
2633         if (mgs_param_empty(mti->mti_params)) {
2634                 /* Remove _all_ failnids */
2635                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2636                                 mti->mti_svname, "add failnid", CM_SKIP);
2637                 return rc < 0 ? rc : 0;
2638         }
2639
2640         /* Otherwise failover nids are additive */
2641         rc = record_start_log(env, mgs, &llh, logname);
2642         if (rc)
2643                 return rc;
2644                 /* FIXME this should be a single journal transaction */
2645         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2646                            "add failnid");
2647         if (rc)
2648                 goto out_end;
2649         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2650         if (rc)
2651                 goto out_end;
2652         rc = record_marker(env, llh, fsdb, CM_END,
2653                            mti->mti_svname, "add failnid");
2654 out_end:
2655         record_end_log(env, &llh);
2656         return rc;
2657 }
2658
2659
2660 /* Add additional failnids to an existing log.
2661    The mdc/osc must have been added to logs first */
2662 /* tcp nids must be in dotted-quad ascii -
2663    we can't resolve hostnames from the kernel. */
2664 static int mgs_write_log_add_failnid(const struct lu_env *env,
2665                                      struct mgs_device *mgs,
2666                                      struct fs_db *fsdb,
2667                                      struct mgs_target_info *mti)
2668 {
2669         char *logname, *cliname;
2670         int rc;
2671         ENTRY;
2672
2673         /* FIXME we currently can't erase the failnids
2674          * given when a target first registers, since they aren't part of
2675          * an "add uuid" stanza */
2676
2677         /* Verify that we know about this target */
2678         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2679                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2680                                    "yet. It must be started before failnids "
2681                                    "can be added.\n", mti->mti_svname);
2682                 RETURN(-ENOENT);
2683         }
2684
2685         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2686         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2687                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2688         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2689                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2690         } else {
2691                 RETURN(-EINVAL);
2692         }
2693         if (rc)
2694                 RETURN(rc);
2695         /* Add failover nids to the client log */
2696         rc = name_create(&logname, mti->mti_fsname, "-client");
2697         if (rc) {
2698                 name_destroy(&cliname);
2699                 RETURN(rc);
2700         }
2701         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2702         name_destroy(&logname);
2703         name_destroy(&cliname);
2704         if (rc)
2705                 RETURN(rc);
2706
2707         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2708                 /* Add OST failover nids to the MDT logs as well */
2709                 int i;
2710
2711                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2712                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2713                                 continue;
2714                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2715                         if (rc)
2716                                 RETURN(rc);
2717                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2718                                                  fsdb, i);
2719                         if (rc) {
2720                                 name_destroy(&logname);
2721                                 RETURN(rc);
2722                         }
2723                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2724                                                             mti, logname,
2725                                                             cliname);
2726                         name_destroy(&cliname);
2727                         name_destroy(&logname);
2728                         if (rc)
2729                                 RETURN(rc);
2730                 }
2731         }
2732
2733         RETURN(rc);
2734 }
2735
2736 static int mgs_wlp_lcfg(const struct lu_env *env,
2737                         struct mgs_device *mgs, struct fs_db *fsdb,
2738                         struct mgs_target_info *mti,
2739                         char *logname, struct lustre_cfg_bufs *bufs,
2740                         char *tgtname, char *ptr)
2741 {
2742         char comment[MTI_NAME_MAXLEN];
2743         char *tmp;
2744         struct llog_cfg_rec *lcr;
2745         int rc, del;
2746
2747         /* Erase any old settings of this same parameter */
2748         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2749         comment[MTI_NAME_MAXLEN - 1] = 0;
2750         /* But don't try to match the value. */
2751         tmp = strchr(comment, '=');
2752         if (tmp != NULL)
2753                 *tmp = 0;
2754         /* FIXME we should skip settings that are the same as old values */
2755         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2756         if (rc < 0)
2757                 return rc;
2758         del = mgs_param_empty(ptr);
2759
2760         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2761                       "Setting" : "Modifying", tgtname, comment, logname);
2762         if (del) {
2763                 /* mgs_modify() will return 1 if nothing had to be done */
2764                 if (rc == 1)
2765                         rc = 0;
2766                 return rc;
2767         }
2768
2769         lustre_cfg_bufs_reset(bufs, tgtname);
2770         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2771         if (mti->mti_flags & LDD_F_PARAM2)
2772                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2773
2774         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2775                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2776         if (lcr == NULL)
2777                 return -ENOMEM;
2778
2779         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2780                                   comment);
2781         lustre_cfg_rec_free(lcr);
2782         return rc;
2783 }
2784
2785 static int mgs_write_log_param2(const struct lu_env *env,
2786                                 struct mgs_device *mgs,
2787                                 struct fs_db *fsdb,
2788                                 struct mgs_target_info *mti, char *ptr)
2789 {
2790         struct lustre_cfg_bufs  bufs;
2791         int                     rc = 0;
2792         ENTRY;
2793
2794         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2795         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2796                           mti->mti_svname, ptr);
2797
2798         RETURN(rc);
2799 }
2800
2801 /* write global variable settings into log */
2802 static int mgs_write_log_sys(const struct lu_env *env,
2803                              struct mgs_device *mgs, struct fs_db *fsdb,
2804                              struct mgs_target_info *mti, char *sys, char *ptr)
2805 {
2806         struct mgs_thread_info  *mgi = mgs_env_info(env);
2807         struct lustre_cfg       *lcfg;
2808         struct llog_cfg_rec     *lcr;
2809         char *tmp, sep;
2810         int rc, cmd, convert = 1;
2811
2812         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2813                 cmd = LCFG_SET_TIMEOUT;
2814         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2815                 cmd = LCFG_SET_LDLM_TIMEOUT;
2816         /* Check for known params here so we can return error to lctl */
2817         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2818                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2819                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2820                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2821                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2822                 cmd = LCFG_PARAM;
2823         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2824                 convert = 0; /* Don't convert string value to integer */
2825                 cmd = LCFG_PARAM;
2826         } else {
2827                 return -EINVAL;
2828         }
2829
2830         if (mgs_param_empty(ptr))
2831                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2832         else
2833                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2834
2835         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2836         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2837         if (!convert && *tmp != '\0')
2838                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2839         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2840         if (lcr == NULL)
2841                 return -ENOMEM;
2842
2843         lcfg = &lcr->lcr_cfg;
2844         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2845         /* truncate the comment to the parameter name */
2846         ptr = tmp - 1;
2847         sep = *ptr;
2848         *ptr = '\0';
2849         /* modify all servers and clients */
2850         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2851                                       *tmp == '\0' ? NULL : lcr,
2852                                       mti->mti_fsname, sys, 0);
2853         if (rc == 0 && *tmp != '\0') {
2854                 switch (cmd) {
2855                 case LCFG_SET_TIMEOUT:
2856                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2857                                 class_process_config(lcfg);
2858                         break;
2859                 case LCFG_SET_LDLM_TIMEOUT:
2860                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2861                                 class_process_config(lcfg);
2862                         break;
2863                 default:
2864                         break;
2865                 }
2866         }
2867         *ptr = sep;
2868         lustre_cfg_rec_free(lcr);
2869         return rc;
2870 }
2871
2872 /* write quota settings into log */
2873 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2874                                struct fs_db *fsdb, struct mgs_target_info *mti,
2875                                char *quota, char *ptr)
2876 {
2877         struct mgs_thread_info  *mgi = mgs_env_info(env);
2878         struct llog_cfg_rec     *lcr;
2879         char                    *tmp;
2880         char                     sep;
2881         int                      rc, cmd = LCFG_PARAM;
2882
2883         /* support only 'meta' and 'data' pools so far */
2884         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2885             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2886                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2887                        "& quota.ost are)\n", ptr);
2888                 return -EINVAL;
2889         }
2890
2891         if (*tmp == '\0') {
2892                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2893         } else {
2894                 CDEBUG(D_MGS, "global '%s'\n", quota);
2895
2896                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2897                     strcmp(tmp, "none") != 0) {
2898                         CERROR("enable option(%s) isn't supported\n", tmp);
2899                         return -EINVAL;
2900                 }
2901         }
2902
2903         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2904         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2905         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2906         if (lcr == NULL)
2907                 return -ENOMEM;
2908
2909         /* truncate the comment to the parameter name */
2910         ptr = tmp - 1;
2911         sep = *ptr;
2912         *ptr = '\0';
2913
2914         /* XXX we duplicated quota enable information in all server
2915          *     config logs, it should be moved to a separate config
2916          *     log once we cleanup the config log for global param. */
2917         /* modify all servers */
2918         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2919                                       *tmp == '\0' ? NULL : lcr,
2920                                       mti->mti_fsname, quota, 1);
2921         *ptr = sep;
2922         lustre_cfg_rec_free(lcr);
2923         return rc < 0 ? rc : 0;
2924 }
2925
2926 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2927                                    struct mgs_device *mgs,
2928                                    struct fs_db *fsdb,
2929                                    struct mgs_target_info *mti,
2930                                    char *param)
2931 {
2932         struct mgs_thread_info  *mgi = mgs_env_info(env);
2933         struct llog_cfg_rec     *lcr;
2934         struct llog_handle      *llh = NULL;
2935         char                    *logname;
2936         char                    *comment, *ptr;
2937         int                      rc, len;
2938
2939         ENTRY;
2940
2941         /* get comment */
2942         ptr = strchr(param, '=');
2943         LASSERT(ptr != NULL);
2944         len = ptr - param;
2945
2946         OBD_ALLOC(comment, len + 1);
2947         if (comment == NULL)
2948                 RETURN(-ENOMEM);
2949         strncpy(comment, param, len);
2950         comment[len] = '\0';
2951
2952         /* prepare lcfg */
2953         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2954         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2955         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2956         if (lcr == NULL)
2957                 GOTO(out_comment, rc = -ENOMEM);
2958
2959         /* construct log name */
2960         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2961         if (rc < 0)
2962                 GOTO(out_lcfg, rc);
2963
2964         if (mgs_log_is_empty(env, mgs, logname)) {
2965                 rc = record_start_log(env, mgs, &llh, logname);
2966                 if (rc < 0)
2967                         GOTO(out, rc);
2968                 record_end_log(env, &llh);
2969         }
2970
2971         /* obsolete old one */
2972         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2973                         comment, CM_SKIP);
2974         if (rc < 0)
2975                 GOTO(out, rc);
2976         /* write the new one */
2977         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2978                                   mti->mti_svname, comment);
2979         if (rc)
2980                 CERROR("%s: error writing log %s: rc = %d\n",
2981                        mgs->mgs_obd->obd_name, logname, rc);
2982 out:
2983         name_destroy(&logname);
2984 out_lcfg:
2985         lustre_cfg_rec_free(lcr);
2986 out_comment:
2987         OBD_FREE(comment, len + 1);
2988         RETURN(rc);
2989 }
2990
2991 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2992                                         char *param)
2993 {
2994         char    *ptr;
2995
2996         /* disable the adjustable udesc parameter for now, i.e. use default
2997          * setting that client always ship udesc to MDT if possible. to enable
2998          * it simply remove the following line */
2999         goto error_out;
3000
3001         ptr = strchr(param, '=');
3002         if (ptr == NULL)
3003                 goto error_out;
3004         *ptr++ = '\0';
3005
3006         if (strcmp(param, PARAM_SRPC_UDESC))
3007                 goto error_out;
3008
3009         if (strcmp(ptr, "yes") == 0) {
3010                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3011                 CWARN("Enable user descriptor shipping from client to MDT\n");
3012         } else if (strcmp(ptr, "no") == 0) {
3013                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3014                 CWARN("Disable user descriptor shipping from client to MDT\n");
3015         } else {
3016                 *(ptr - 1) = '=';
3017                 goto error_out;
3018         }
3019         return 0;
3020
3021 error_out:
3022         CERROR("Invalid param: %s\n", param);
3023         return -EINVAL;
3024 }
3025
3026 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3027                                   const char *svname,
3028                                   char *param)
3029 {
3030         struct sptlrpc_rule      rule;
3031         struct sptlrpc_rule_set *rset;
3032         int                      rc;
3033         ENTRY;
3034
3035         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3036                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3037                 RETURN(-EINVAL);
3038         }
3039
3040         if (strncmp(param, PARAM_SRPC_UDESC,
3041                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3042                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3043         }
3044
3045         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3046                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3047                 RETURN(-EINVAL);
3048         }
3049
3050         param += sizeof(PARAM_SRPC_FLVR) - 1;
3051
3052         rc = sptlrpc_parse_rule(param, &rule);
3053         if (rc)
3054                 RETURN(rc);
3055
3056         /* mgs rules implies must be mgc->mgs */
3057         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3058                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3059                      rule.sr_from != LUSTRE_SP_ANY) ||
3060                     (rule.sr_to != LUSTRE_SP_MGS &&
3061                      rule.sr_to != LUSTRE_SP_ANY))
3062                         RETURN(-EINVAL);
3063         }
3064
3065         /* preapre room for this coming rule. svcname format should be:
3066          * - fsname: general rule
3067          * - fsname-tgtname: target-specific rule
3068          */
3069         if (strchr(svname, '-')) {
3070                 struct mgs_tgt_srpc_conf *tgtconf;
3071                 int                       found = 0;
3072
3073                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3074                      tgtconf = tgtconf->mtsc_next) {
3075                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3076                                 found = 1;
3077                                 break;
3078                         }
3079                 }
3080
3081                 if (!found) {
3082                         int name_len;
3083
3084                         OBD_ALLOC_PTR(tgtconf);
3085                         if (tgtconf == NULL)
3086                                 RETURN(-ENOMEM);
3087
3088                         name_len = strlen(svname);
3089
3090                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3091                         if (tgtconf->mtsc_tgt == NULL) {
3092                                 OBD_FREE_PTR(tgtconf);
3093                                 RETURN(-ENOMEM);
3094                         }
3095                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3096
3097                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3098                         fsdb->fsdb_srpc_tgt = tgtconf;
3099                 }
3100
3101                 rset = &tgtconf->mtsc_rset;
3102         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3103                 /* put _mgs related srpc rule directly in mgs ruleset */
3104                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3105         } else {
3106                 rset = &fsdb->fsdb_srpc_gen;
3107         }
3108
3109         rc = sptlrpc_rule_set_merge(rset, &rule);
3110
3111         RETURN(rc);
3112 }
3113
3114 static int mgs_srpc_set_param(const struct lu_env *env,
3115                               struct mgs_device *mgs,
3116                               struct fs_db *fsdb,
3117                               struct mgs_target_info *mti,
3118                               char *param)
3119 {
3120         char                   *copy;
3121         int                     rc, copy_size;
3122         ENTRY;
3123
3124 #ifndef HAVE_GSS
3125         RETURN(-EINVAL);
3126 #endif
3127         /* keep a copy of original param, which could be destroied
3128          * during parsing */
3129         copy_size = strlen(param) + 1;
3130         OBD_ALLOC(copy, copy_size);
3131         if (copy == NULL)
3132                 return -ENOMEM;
3133         memcpy(copy, param, copy_size);
3134
3135         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3136         if (rc)
3137                 goto out_free;
3138
3139         /* previous steps guaranteed the syntax is correct */
3140         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3141         if (rc)
3142                 goto out_free;
3143
3144         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3145                 /*
3146                  * for mgs rules, make them effective immediately.
3147                  */
3148                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3149                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3150                                                  &fsdb->fsdb_srpc_gen);
3151         }
3152
3153 out_free:
3154         OBD_FREE(copy, copy_size);
3155         RETURN(rc);
3156 }
3157
3158 struct mgs_srpc_read_data {
3159         struct fs_db   *msrd_fsdb;
3160         int             msrd_skip;
3161 };
3162
3163 static int mgs_srpc_read_handler(const struct lu_env *env,
3164                                  struct llog_handle *llh,
3165                                  struct llog_rec_hdr *rec, void *data)
3166 {
3167         struct mgs_srpc_read_data *msrd = data;
3168         struct cfg_marker         *marker;
3169         struct lustre_cfg         *lcfg = REC_DATA(rec);
3170         char                      *svname, *param;
3171         int                        cfg_len, rc;
3172         ENTRY;
3173
3174         if (rec->lrh_type != OBD_CFG_REC) {
3175                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3176                 RETURN(-EINVAL);
3177         }
3178
3179         cfg_len = REC_DATA_LEN(rec);
3180
3181         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3182         if (rc) {
3183                 CERROR("Insane cfg\n");
3184                 RETURN(rc);
3185         }
3186
3187         if (lcfg->lcfg_command == LCFG_MARKER) {
3188                 marker = lustre_cfg_buf(lcfg, 1);
3189
3190                 if (marker->cm_flags & CM_START &&
3191                     marker->cm_flags & CM_SKIP)
3192                         msrd->msrd_skip = 1;
3193                 if (marker->cm_flags & CM_END)
3194                         msrd->msrd_skip = 0;
3195
3196                 RETURN(0);
3197         }
3198
3199         if (msrd->msrd_skip)
3200                 RETURN(0);
3201
3202         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3203                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3204                 RETURN(0);
3205         }
3206
3207         svname = lustre_cfg_string(lcfg, 0);
3208         if (svname == NULL) {
3209                 CERROR("svname is empty\n");
3210                 RETURN(0);
3211         }
3212
3213         param = lustre_cfg_string(lcfg, 1);
3214         if (param == NULL) {
3215                 CERROR("param is empty\n");
3216                 RETURN(0);
3217         }
3218
3219         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3220         if (rc)
3221                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3222
3223         RETURN(0);
3224 }
3225
3226 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3227                                 struct mgs_device *mgs,
3228                                 struct fs_db *fsdb)
3229 {
3230         struct llog_handle        *llh = NULL;
3231         struct llog_ctxt          *ctxt;
3232         char                      *logname;
3233         struct mgs_srpc_read_data  msrd;
3234         int                        rc;
3235         ENTRY;
3236
3237         /* construct log name */
3238         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3239         if (rc)
3240                 RETURN(rc);
3241
3242         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3243         LASSERT(ctxt != NULL);
3244
3245         if (mgs_log_is_empty(env, mgs, logname))
3246                 GOTO(out, rc = 0);
3247
3248         rc = llog_open(env, ctxt, &llh, NULL, logname,
3249                        LLOG_OPEN_EXISTS);
3250         if (rc < 0) {
3251                 if (rc == -ENOENT)
3252                         rc = 0;
3253                 GOTO(out, rc);
3254         }
3255
3256         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3257         if (rc)
3258                 GOTO(out_close, rc);
3259
3260         if (llog_get_size(llh) <= 1)
3261                 GOTO(out_close, rc = 0);
3262
3263         msrd.msrd_fsdb = fsdb;
3264         msrd.msrd_skip = 0;
3265
3266         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3267                           NULL);
3268
3269 out_close:
3270         llog_close(env, llh);
3271 out:
3272         llog_ctxt_put(ctxt);
3273         name_destroy(&logname);
3274
3275         if (rc)
3276                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3277         RETURN(rc);
3278 }
3279
3280 /* Permanent settings of all parameters by writing into the appropriate
3281  * configuration logs.
3282  * A parameter with null value ("<param>='\0'") means to erase it out of
3283  * the logs.
3284  */
3285 static int mgs_write_log_param(const struct lu_env *env,
3286                                struct mgs_device *mgs, struct fs_db *fsdb,
3287                                struct mgs_target_info *mti, char *ptr)
3288 {
3289         struct mgs_thread_info *mgi = mgs_env_info(env);
3290         char *logname;
3291         char *tmp;
3292         int rc = 0, rc2 = 0;
3293         ENTRY;
3294
3295         /* For various parameter settings, we have to figure out which logs
3296            care about them (e.g. both mdt and client for lov settings) */
3297         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3298
3299         /* The params are stored in MOUNT_DATA_FILE and modified via
3300            tunefs.lustre, or set using lctl conf_param */
3301
3302         /* Processed in lustre_start_mgc */
3303         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3304                 GOTO(end, rc);
3305
3306         /* Processed in ost/mdt */
3307         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3308                 GOTO(end, rc);
3309
3310         /* Processed in mgs_write_log_ost */
3311         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3312                 if (mti->mti_flags & LDD_F_PARAM) {
3313                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3314                                            "changed with tunefs.lustre"
3315                                            "and --writeconf\n", ptr);
3316                         rc = -EPERM;
3317                 }
3318                 GOTO(end, rc);
3319         }
3320
3321         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3322                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3323                 GOTO(end, rc);
3324         }
3325
3326         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3327                 /* Add a failover nidlist */
3328                 rc = 0;
3329                 /* We already processed failovers params for new
3330                    targets in mgs_write_log_target */
3331                 if (mti->mti_flags & LDD_F_PARAM) {
3332                         CDEBUG(D_MGS, "Adding failnode\n");
3333                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3334                 }
3335                 GOTO(end, rc);
3336         }
3337
3338         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3339                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3340                 GOTO(end, rc);
3341         }
3342
3343         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3344                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3345                 GOTO(end, rc);
3346         }
3347
3348         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3349             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3350                 /* active=0 means off, anything else means on */
3351                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3352                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3353                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3354                 int i;
3355
3356                 if (!deactive_osc) {
3357                         __u32   index;
3358
3359                         rc = server_name2index(mti->mti_svname, &index, NULL);
3360                         if (rc < 0)
3361                                 GOTO(end, rc);
3362
3363                         if (index == 0) {
3364                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3365                                                    " (de)activated.\n",
3366                                                    mti->mti_svname);
3367                                 GOTO(end, rc = -EINVAL);
3368                         }
3369                 }
3370
3371                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3372                               flag ? "de" : "re", mti->mti_svname);
3373                 /* Modify clilov */
3374                 rc = name_create(&logname, mti->mti_fsname, "-client");
3375                 if (rc < 0)
3376                         GOTO(end, rc);
3377                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3378                                 mti->mti_svname,
3379                                 deactive_osc ? "add osc" : "add mdc", flag);
3380                 name_destroy(&logname);
3381                 if (rc < 0)
3382                         goto active_err;
3383
3384                 /* Modify mdtlov */
3385                 /* Add to all MDT logs for DNE */
3386                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3387                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3388                                 continue;
3389                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3390                         if (rc < 0)
3391                                 GOTO(end, rc);
3392                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3393                                         mti->mti_svname,
3394                                         deactive_osc ? "add osc" : "add osp",
3395                                         flag);
3396                         name_destroy(&logname);
3397                         if (rc < 0)
3398                                 goto active_err;
3399                 }
3400 active_err:
3401                 if (rc < 0) {
3402                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3403                                            "log (%d). No permanent "
3404                                            "changes were made to the "
3405                                            "config log.\n",
3406                                            mti->mti_svname, rc);
3407                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3408                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3409                                                    " because the log"
3410                                                    "is in the old 1.4"
3411                                                    "style. Consider "
3412                                                    " --writeconf to "
3413                                                    "update the logs.\n");
3414                         GOTO(end, rc);
3415                 }
3416                 /* Fall through to osc/mdc proc for deactivating live
3417                    OSC/OSP on running MDT / clients. */
3418         }
3419         /* Below here, let obd's XXX_process_config methods handle it */
3420
3421         /* All lov. in proc */
3422         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3423                 char *mdtlovname;
3424
3425                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3426                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3427                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3428                                            "set on the MDT, not %s. "
3429                                            "Ignoring.\n",
3430                                            mti->mti_svname);
3431                         GOTO(end, rc = 0);
3432                 }
3433
3434                 /* Modify mdtlov */
3435                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3436                         GOTO(end, rc = -ENODEV);
3437
3438                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3439                                              mti->mti_stripe_index);
3440                 if (rc)
3441                         GOTO(end, rc);
3442                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3443                                   &mgi->mgi_bufs, mdtlovname, ptr);
3444                 name_destroy(&logname);
3445                 name_destroy(&mdtlovname);
3446                 if (rc)
3447                         GOTO(end, rc);
3448
3449                 /* Modify clilov */
3450                 rc = name_create(&logname, mti->mti_fsname, "-client");
3451                 if (rc)
3452                         GOTO(end, rc);
3453                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3454                                   fsdb->fsdb_clilov, ptr);
3455                 name_destroy(&logname);
3456                 GOTO(end, rc);
3457         }
3458
3459         /* All osc., mdc., llite. params in proc */
3460         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3461             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3462             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3463                 char *cname;
3464
3465                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3466                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3467                                            " cannot be modified. Consider"
3468                                            " updating the configuration with"
3469                                            " --writeconf\n",
3470                                            mti->mti_svname);
3471                         GOTO(end, rc = -EINVAL);
3472                 }
3473                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3474                         rc = name_create(&cname, mti->mti_fsname, "-client");
3475                         /* Add the client type to match the obdname in
3476                            class_config_llog_handler */
3477                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3478                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3479                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3480                         rc = name_create(&cname, mti->mti_svname, "-osc");
3481                 } else {
3482                         GOTO(end, rc = -EINVAL);
3483                 }
3484                 if (rc)
3485                         GOTO(end, rc);
3486
3487                 /* Forbid direct update of llite root squash parameters.
3488                  * These parameters are indirectly set via the MDT settings.
3489                  * See (LU-1778) */
3490                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3491                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3492                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3493                         LCONSOLE_ERROR("%s: root squash parameters can only "
3494                                 "be updated through MDT component\n",
3495                                 mti->mti_fsname);
3496                         name_destroy(&cname);
3497                         GOTO(end, rc = -EINVAL);
3498                 }
3499
3500                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3501
3502                 /* Modify client */
3503                 rc = name_create(&logname, mti->mti_fsname, "-client");
3504                 if (rc) {
3505                         name_destroy(&cname);
3506                         GOTO(end, rc);
3507                 }
3508                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3509                                   cname, ptr);
3510
3511                 /* osc params affect the MDT as well */
3512                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3513                         int i;
3514
3515                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3516                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3517                                         continue;
3518                                 name_destroy(&cname);
3519                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3520                                                          fsdb, i);
3521                                 name_destroy(&logname);
3522                                 if (rc)
3523                                         break;
3524                                 rc = name_create_mdt(&logname,
3525                                                      mti->mti_fsname, i);
3526                                 if (rc)
3527                                         break;
3528                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3529                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3530                                                           mti, logname,
3531                                                           &mgi->mgi_bufs,
3532                                                           cname, ptr);
3533                                         if (rc)
3534                                                 break;
3535                                 }
3536                         }
3537                 }
3538
3539                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
3540                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
3541                     rc == 0) {
3542                         char suffix[16];
3543                         char *lodname = NULL;
3544                         char *param_str = NULL;
3545                         int i;
3546                         int index;
3547
3548                         /* replace mdc with osp */
3549                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
3550                         rc = server_name2index(mti->mti_svname, &index, NULL);
3551                         if (rc < 0) {
3552                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3553                                 GOTO(end, rc);
3554                         }
3555
3556                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3557                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3558                                         continue;
3559
3560                                 if (i == index)
3561                                         continue;
3562
3563                                 name_destroy(&logname);
3564                                 rc = name_create_mdt(&logname, mti->mti_fsname,
3565                                                      i);
3566                                 if (rc < 0)
3567                                         break;
3568
3569                                 if (mgs_log_is_empty(env, mgs, logname))
3570                                         continue;
3571
3572                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
3573                                          i);
3574                                 name_destroy(&cname);
3575                                 rc = name_create(&cname, mti->mti_svname,
3576                                                  suffix);
3577                                 if (rc < 0)
3578                                         break;
3579
3580                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3581                                                   &mgi->mgi_bufs, cname, ptr);
3582                                 if (rc < 0)
3583                                         break;
3584
3585                                 /* Add configuration log for noitfying LOD
3586                                  * to active/deactive the OSP. */
3587                                 name_destroy(&param_str);
3588                                 rc = name_create(&param_str, cname,
3589                                                  (*tmp == '0') ?  ".active=0" :
3590                                                  ".active=1");
3591                                 if (rc < 0)
3592                                         break;
3593
3594                                 name_destroy(&lodname);
3595                                 rc = name_create(&lodname, logname, "-mdtlov");
3596                                 if (rc < 0)
3597                                         break;
3598
3599                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3600                                                   &mgi->mgi_bufs, lodname,
3601                                                   param_str);
3602                                 if (rc < 0)
3603                                         break;
3604                         }
3605                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3606                         name_destroy(&lodname);
3607                         name_destroy(&param_str);
3608                 }
3609
3610                 name_destroy(&logname);
3611                 name_destroy(&cname);
3612                 GOTO(end, rc);
3613         }
3614
3615         /* All mdt. params in proc */
3616         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3617                 int i;
3618                 __u32 idx;
3619
3620                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3621                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3622                             MTI_NAME_MAXLEN) == 0)
3623                         /* device is unspecified completely? */
3624                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3625                 else
3626                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3627                 if (rc < 0)
3628                         goto active_err;
3629                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3630                         goto active_err;
3631                 if (rc & LDD_F_SV_ALL) {
3632                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3633                                 if (!test_bit(i,
3634                                                   fsdb->fsdb_mdt_index_map))
3635                                         continue;
3636                                 rc = name_create_mdt(&logname,
3637                                                 mti->mti_fsname, i);
3638                                 if (rc)
3639                                         goto active_err;
3640                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3641                                                   logname, &mgi->mgi_bufs,
3642                                                   logname, ptr);
3643                                 name_destroy(&logname);
3644                                 if (rc)
3645                                         goto active_err;
3646                         }
3647                 } else {
3648                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3649                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3650                                 LCONSOLE_ERROR("%s: root squash parameters "
3651                                         "cannot be applied to a single MDT\n",
3652                                         mti->mti_fsname);
3653                                 GOTO(end, rc = -EINVAL);
3654                         }
3655                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3656                                           mti->mti_svname, &mgi->mgi_bufs,
3657                                           mti->mti_svname, ptr);
3658                         if (rc)
3659                                 goto active_err;
3660                 }
3661
3662                 /* root squash settings are also applied to llite
3663                  * config log (see LU-1778) */
3664                 if (rc == 0 &&
3665                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3666                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3667                         char *cname;
3668                         char *ptr2;
3669
3670                         rc = name_create(&cname, mti->mti_fsname, "-client");
3671                         if (rc)
3672                                 GOTO(end, rc);
3673                         rc = name_create(&logname, mti->mti_fsname, "-client");
3674                         if (rc) {
3675                                 name_destroy(&cname);
3676                                 GOTO(end, rc);
3677                         }
3678                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3679                         if (rc) {
3680                                 name_destroy(&cname);
3681                                 name_destroy(&logname);
3682                                 GOTO(end, rc);
3683                         }
3684                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3685                                           &mgi->mgi_bufs, cname, ptr2);
3686                         name_destroy(&ptr2);
3687                         name_destroy(&logname);
3688                         name_destroy(&cname);
3689                 }
3690                 GOTO(end, rc);
3691         }
3692
3693         /* All mdd., ost. and osd. params in proc */
3694         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3695             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3696             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3697                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3698                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3699                         GOTO(end, rc = -ENODEV);
3700
3701                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3702                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3703                 GOTO(end, rc);
3704         }
3705
3706         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3707         rc2 = -ENOSYS;
3708
3709 end:
3710         if (rc)
3711                 CERROR("err %d on param '%s'\n", rc, ptr);
3712
3713         RETURN(rc ?: rc2);
3714 }
3715
3716 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3717                          struct mgs_target_info *mti, struct fs_db *fsdb)
3718 {
3719         char    *buf, *params;
3720         int      rc = -EINVAL;
3721
3722         ENTRY;
3723
3724         /* set/check the new target index */
3725         rc = mgs_set_index(env, mgs, mti);
3726         if (rc < 0)
3727                 RETURN(rc);
3728
3729         if (rc == EALREADY) {
3730                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3731                               mti->mti_stripe_index, mti->mti_svname);
3732                 /* We would like to mark old log sections as invalid
3733                    and add new log sections in the client and mdt logs.
3734                    But if we add new sections, then live clients will
3735                    get repeat setup instructions for already running
3736                    osc's. So don't update the client/mdt logs. */
3737                 mti->mti_flags &= ~LDD_F_UPDATE;
3738                 rc = 0;
3739         }
3740
3741         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
3742                          cfs_fail_val : 10);
3743
3744         mutex_lock(&fsdb->fsdb_mutex);
3745
3746         if (mti->mti_flags &
3747             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3748                 /* Generate a log from scratch */
3749                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3750                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3751                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3752                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3753                 } else {
3754                         CERROR("Unknown target type %#x, can't create log for "
3755                                "%s\n", mti->mti_flags, mti->mti_svname);
3756                 }
3757                 if (rc) {
3758                         CERROR("Can't write logs for %s (%d)\n",
3759                                mti->mti_svname, rc);
3760                         GOTO(out_up, rc);
3761                 }
3762         } else {
3763                 /* Just update the params from tunefs in mgs_write_log_params */
3764                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3765                 mti->mti_flags |= LDD_F_PARAM;
3766         }
3767
3768         /* allocate temporary buffer, where class_get_next_param will
3769            make copy of a current  parameter */
3770         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3771         if (buf == NULL)
3772                 GOTO(out_up, rc = -ENOMEM);
3773         params = mti->mti_params;
3774         while (params != NULL) {
3775                 rc = class_get_next_param(&params, buf);
3776                 if (rc) {
3777                         if (rc == 1)
3778                                 /* there is no next parameter, that is
3779                                    not an error */
3780                                 rc = 0;
3781                         break;
3782                 }
3783                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3784                        params, buf);
3785                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3786                 if (rc)
3787                         break;
3788         }
3789
3790         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3791
3792 out_up:
3793         mutex_unlock(&fsdb->fsdb_mutex);
3794         RETURN(rc);
3795 }
3796
3797 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3798 {
3799         struct llog_ctxt        *ctxt;
3800         int                      rc = 0;
3801
3802         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3803         if (ctxt == NULL) {
3804                 CERROR("%s: MGS config context doesn't exist\n",
3805                        mgs->mgs_obd->obd_name);
3806                 rc = -ENODEV;
3807         } else {
3808                 rc = llog_erase(env, ctxt, NULL, name);
3809                 /* llog may not exist */
3810                 if (rc == -ENOENT)
3811                         rc = 0;
3812                 llog_ctxt_put(ctxt);
3813         }
3814
3815         if (rc)
3816                 CERROR("%s: failed to clear log %s: %d\n",
3817                        mgs->mgs_obd->obd_name, name, rc);
3818
3819         return rc;
3820 }
3821
3822 /* erase all logs for the given fs */
3823 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3824 {
3825         struct fs_db *fsdb;
3826         struct list_head log_list;
3827         struct mgs_direntry *dirent, *n;
3828         int rc, len = strlen(fsname);
3829         char *suffix;
3830         ENTRY;
3831
3832         /* Find all the logs in the CONFIGS directory */
3833         rc = class_dentry_readdir(env, mgs, &log_list);
3834         if (rc)
3835                 RETURN(rc);
3836
3837         mutex_lock(&mgs->mgs_mutex);
3838
3839         /* Delete the fs db */
3840         fsdb = mgs_find_fsdb(mgs, fsname);
3841         if (fsdb)
3842                 mgs_free_fsdb(mgs, fsdb);
3843
3844         mutex_unlock(&mgs->mgs_mutex);
3845
3846         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3847                 list_del_init(&dirent->mde_list);
3848                 suffix = strrchr(dirent->mde_name, '-');
3849                 if (suffix != NULL) {
3850                         if ((len == suffix - dirent->mde_name) &&
3851                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3852                                 CDEBUG(D_MGS, "Removing log %s\n",
3853                                        dirent->mde_name);
3854                                 mgs_erase_log(env, mgs, dirent->mde_name);
3855                         }
3856                 }
3857                 mgs_direntry_free(dirent);
3858         }
3859
3860         RETURN(rc);
3861 }
3862
3863 /* list all logs for the given fs */
3864 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3865                   struct obd_ioctl_data *data)
3866 {
3867         struct list_head         log_list;
3868         struct mgs_direntry     *dirent, *n;
3869         char                    *out, *suffix;
3870         int                      l, remains, rc;
3871
3872         ENTRY;
3873
3874         /* Find all the logs in the CONFIGS directory */
3875         rc = class_dentry_readdir(env, mgs, &log_list);
3876         if (rc)
3877                 RETURN(rc);
3878
3879         out = data->ioc_bulk;
3880         remains = data->ioc_inllen1;
3881         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3882                 list_del_init(&dirent->mde_list);
3883                 suffix = strrchr(dirent->mde_name, '-');
3884                 if (suffix != NULL) {
3885                         l = snprintf(out, remains, "config log: $%s\n",
3886                                      dirent->mde_name);
3887                         out += l;
3888                         remains -= l;
3889                 }
3890                 mgs_direntry_free(dirent);
3891                 if (remains <= 0)
3892                         break;
3893         }
3894         RETURN(rc);
3895 }
3896
3897 /* from llog_swab */
3898 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3899 {
3900         int i;
3901         ENTRY;
3902
3903         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3904         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3905
3906         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3907         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3908         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3909         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3910
3911         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3912         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3913                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3914                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3915                                i, lcfg->lcfg_buflens[i],
3916                                lustre_cfg_string(lcfg, i));
3917                 }
3918         EXIT;
3919 }
3920
3921 /* Setup _mgs fsdb and log
3922  */
3923 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3924                           struct fs_db *fsdb)
3925 {
3926         int                     rc;
3927         ENTRY;
3928
3929         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
3930
3931         RETURN(rc);
3932 }
3933
3934 /* Setup params fsdb and log
3935  */
3936 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3937                           struct fs_db *fsdb)
3938 {
3939         struct llog_handle      *params_llh = NULL;
3940         int                     rc;
3941         ENTRY;
3942
3943         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3944         if (fsdb != NULL) {
3945                 mutex_lock(&fsdb->fsdb_mutex);
3946                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3947                 if (rc == 0)
3948                         rc = record_end_log(env, &params_llh);
3949                 mutex_unlock(&fsdb->fsdb_mutex);
3950         }
3951
3952         RETURN(rc);
3953 }
3954
3955 /* Cleanup params fsdb and log
3956  */
3957 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3958 {
3959         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3960 }
3961
3962 /* Set a permanent (config log) param for a target or fs
3963  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3964  *             buf1 contains the single parameter
3965  */
3966 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3967                  struct lustre_cfg *lcfg, char *fsname)
3968 {
3969         struct fs_db *fsdb;
3970         struct mgs_target_info *mti;
3971         char *devname, *param;
3972         char *ptr;
3973         const char *tmp;
3974         __u32 index;
3975         int rc = 0;
3976         ENTRY;
3977
3978         print_lustre_cfg(lcfg);
3979
3980         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3981         devname = lustre_cfg_string(lcfg, 0);
3982         param = lustre_cfg_string(lcfg, 1);
3983         if (!devname) {
3984                 /* Assume device name embedded in param:
3985                    lustre-OST0000.osc.max_dirty_mb=32 */
3986                 ptr = strchr(param, '.');
3987                 if (ptr) {
3988                         devname = param;
3989                         *ptr = 0;
3990                         param = ptr + 1;
3991                 }
3992         }
3993         if (!devname) {
3994                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3995                 RETURN(-ENOSYS);
3996         }
3997
3998         rc = mgs_parse_devname(devname, fsname, NULL);
3999         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
4000                 /* param related to llite isn't allowed to set by OST or MDT */
4001                 if (rc == 0 && strncmp(param, PARAM_LLITE,
4002                                        sizeof(PARAM_LLITE) - 1) == 0)
4003                         RETURN(-EINVAL);
4004         } else {
4005                 /* assume devname is the fsname */
4006                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
4007         }
4008         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
4009
4010         rc = mgs_find_or_make_fsdb(env, mgs,
4011                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
4012                                    PARAMS_FILENAME : fsname, &fsdb);
4013         if (rc)
4014                 RETURN(rc);
4015
4016         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
4017             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
4018             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4019                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
4020                        "is '%s'\n", fsname, devname);
4021                 mgs_free_fsdb(mgs, fsdb);
4022                 RETURN(-EINVAL);
4023         }
4024
4025         /* Create a fake mti to hold everything */
4026         OBD_ALLOC_PTR(mti);
4027         if (!mti)
4028                 GOTO(out, rc = -ENOMEM);
4029         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
4030             >= sizeof(mti->mti_fsname))
4031                 GOTO(out, rc = -E2BIG);
4032         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
4033             >= sizeof(mti->mti_svname))
4034                 GOTO(out, rc = -E2BIG);
4035         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
4036             >= sizeof(mti->mti_params))
4037                 GOTO(out, rc = -E2BIG);
4038         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
4039         if (rc < 0)
4040                 /* Not a valid server; may be only fsname */
4041                 rc = 0;
4042         else
4043                 /* Strip -osc or -mdc suffix from svname */
4044                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
4045                                      mti->mti_svname))
4046                         GOTO(out, rc = -EINVAL);
4047         /*
4048          * Revoke lock so everyone updates.  Should be alright if
4049          * someone was already reading while we were updating the logs,
4050          * so we don't really need to hold the lock while we're
4051          * writing (above).
4052          */
4053         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
4054                 mti->mti_flags = rc | LDD_F_PARAM2;
4055                 mutex_lock(&fsdb->fsdb_mutex);
4056                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
4057                 mutex_unlock(&fsdb->fsdb_mutex);
4058                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
4059         } else {
4060                 mti->mti_flags = rc | LDD_F_PARAM;
4061                 mutex_lock(&fsdb->fsdb_mutex);
4062                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
4063                 mutex_unlock(&fsdb->fsdb_mutex);
4064                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4065         }
4066
4067 out:
4068         OBD_FREE_PTR(mti);
4069         RETURN(rc);
4070 }
4071
4072 static int mgs_write_log_pool(const struct lu_env *env,
4073                               struct mgs_device *mgs, char *logname,
4074                               struct fs_db *fsdb, char *tgtname,
4075                               enum lcfg_command_type cmd,
4076                               char *fsname, char *poolname,
4077                               char *ostname, char *comment)
4078 {
4079         struct llog_handle *llh = NULL;
4080         int rc;
4081
4082         rc = record_start_log(env, mgs, &llh, logname);
4083         if (rc)
4084                 return rc;
4085         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
4086         if (rc)
4087                 goto out;
4088         rc = record_base(env, llh, tgtname, 0, cmd,
4089                          fsname, poolname, ostname, NULL);
4090         if (rc)
4091                 goto out;
4092         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
4093 out:
4094         record_end_log(env, &llh);
4095         return rc;
4096 }
4097
4098 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
4099                     enum lcfg_command_type cmd, const char *nodemap_name,
4100                     char *param)
4101 {
4102         lnet_nid_t      nid[2];
4103         __u32           idmap[2];
4104         bool            bool_switch;
4105         __u32           int_id;
4106         int             rc = 0;
4107         ENTRY;
4108
4109         switch (cmd) {
4110         case LCFG_NODEMAP_ADD:
4111                 rc = nodemap_add(nodemap_name);
4112                 break;
4113         case LCFG_NODEMAP_DEL:
4114                 rc = nodemap_del(nodemap_name);
4115                 break;
4116         case LCFG_NODEMAP_ADD_RANGE:
4117                 rc = nodemap_parse_range(param, nid);
4118                 if (rc != 0)
4119                         break;
4120                 rc = nodemap_add_range(nodemap_name, nid);
4121                 break;
4122         case LCFG_NODEMAP_DEL_RANGE:
4123                 rc = nodemap_parse_range(param, nid);
4124                 if (rc != 0)
4125                         break;
4126                 rc = nodemap_del_range(nodemap_name, nid);
4127                 break;
4128         case LCFG_NODEMAP_ADMIN:
4129                 bool_switch = simple_strtoul(param, NULL, 10);
4130                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4131                 break;
4132         case LCFG_NODEMAP_DENY_UNKNOWN:
4133                 bool_switch = simple_strtoul(param, NULL, 10);
4134                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
4135                 break;
4136         case LCFG_NODEMAP_TRUSTED:
4137                 bool_switch = simple_strtoul(param, NULL, 10);
4138                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4139                 break;
4140         case LCFG_NODEMAP_SQUASH_UID:
4141                 int_id = simple_strtoul(param, NULL, 10);
4142                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4143                 break;
4144         case LCFG_NODEMAP_SQUASH_GID:
4145                 int_id = simple_strtoul(param, NULL, 10);
4146                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4147                 break;
4148         case LCFG_NODEMAP_ADD_UIDMAP:
4149         case LCFG_NODEMAP_ADD_GIDMAP:
4150                 rc = nodemap_parse_idmap(param, idmap);
4151                 if (rc != 0)
4152                         break;
4153                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4154                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4155                                                idmap);
4156                 else
4157                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4158                                                idmap);
4159                 break;
4160         case LCFG_NODEMAP_DEL_UIDMAP:
4161         case LCFG_NODEMAP_DEL_GIDMAP:
4162                 rc = nodemap_parse_idmap(param, idmap);
4163                 if (rc != 0)
4164                         break;
4165                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4166                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4167                                                idmap);
4168                 else
4169                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4170                                                idmap);
4171                 break;
4172         case LCFG_NODEMAP_SET_FILESET:
4173                 rc = nodemap_set_fileset(nodemap_name, param);
4174                 break;
4175         default:
4176                 rc = -EINVAL;
4177         }
4178
4179         RETURN(rc);
4180 }
4181
4182 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4183                  enum lcfg_command_type cmd, char *fsname,
4184                  char *poolname, char *ostname)
4185 {
4186         struct fs_db *fsdb;
4187         char *lovname;
4188         char *logname;
4189         char *label = NULL, *canceled_label = NULL;
4190         int label_sz;
4191         struct mgs_target_info *mti = NULL;
4192         int rc, i;
4193         ENTRY;
4194
4195         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4196         if (rc) {
4197                 CERROR("Can't get db for %s\n", fsname);
4198                 RETURN(rc);
4199         }
4200         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4201                 CERROR("%s is not defined\n", fsname);
4202                 mgs_free_fsdb(mgs, fsdb);
4203                 RETURN(-EINVAL);
4204         }
4205
4206         label_sz = 10 + strlen(fsname) + strlen(poolname);
4207
4208         /* check if ostname match fsname */
4209         if (ostname != NULL) {
4210                 char *ptr;
4211
4212                 ptr = strrchr(ostname, '-');
4213                 if ((ptr == NULL) ||
4214                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4215                         RETURN(-EINVAL);
4216                 label_sz += strlen(ostname);
4217         }
4218
4219         OBD_ALLOC(label, label_sz);
4220         if (label == NULL)
4221                 RETURN(-ENOMEM);
4222
4223         switch(cmd) {
4224         case LCFG_POOL_NEW:
4225                 sprintf(label,
4226                         "new %s.%s", fsname, poolname);
4227                 break;
4228         case LCFG_POOL_ADD:
4229                 sprintf(label,
4230                         "add %s.%s.%s", fsname, poolname, ostname);
4231                 break;
4232         case LCFG_POOL_REM:
4233                 OBD_ALLOC(canceled_label, label_sz);
4234                 if (canceled_label == NULL)
4235                         GOTO(out_label, rc = -ENOMEM);
4236                 sprintf(label,
4237                         "rem %s.%s.%s", fsname, poolname, ostname);
4238                 sprintf(canceled_label,
4239                         "add %s.%s.%s", fsname, poolname, ostname);
4240                 break;
4241         case LCFG_POOL_DEL:
4242                 OBD_ALLOC(canceled_label, label_sz);
4243                 if (canceled_label == NULL)
4244                         GOTO(out_label, rc = -ENOMEM);
4245                 sprintf(label,
4246                         "del %s.%s", fsname, poolname);
4247                 sprintf(canceled_label,
4248                         "new %s.%s", fsname, poolname);
4249                 break;
4250         default:
4251                 break;
4252         }
4253
4254         if (canceled_label != NULL) {
4255                 OBD_ALLOC_PTR(mti);
4256                 if (mti == NULL)
4257                         GOTO(out_cancel, rc = -ENOMEM);
4258         }
4259
4260         mutex_lock(&fsdb->fsdb_mutex);
4261         /* write pool def to all MDT logs */
4262         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4263                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4264                         rc = name_create_mdt_and_lov(&logname, &lovname,
4265                                                      fsdb, i);
4266                         if (rc) {
4267                                 mutex_unlock(&fsdb->fsdb_mutex);
4268                                 GOTO(out_mti, rc);
4269                         }
4270                         if (canceled_label != NULL) {
4271                                 strcpy(mti->mti_svname, "lov pool");
4272                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4273                                                 lovname, canceled_label,
4274                                                 CM_SKIP);
4275                         }
4276
4277                         if (rc >= 0)
4278                                 rc = mgs_write_log_pool(env, mgs, logname,
4279                                                         fsdb, lovname, cmd,
4280                                                         fsname, poolname,
4281                                                         ostname, label);
4282                         name_destroy(&logname);
4283                         name_destroy(&lovname);
4284                         if (rc) {
4285                                 mutex_unlock(&fsdb->fsdb_mutex);
4286                                 GOTO(out_mti, rc);
4287                         }
4288                 }
4289         }
4290
4291         rc = name_create(&logname, fsname, "-client");
4292         if (rc) {
4293                 mutex_unlock(&fsdb->fsdb_mutex);
4294                 GOTO(out_mti, rc);
4295         }
4296         if (canceled_label != NULL) {
4297                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4298                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4299                 if (rc < 0) {
4300                         mutex_unlock(&fsdb->fsdb_mutex);
4301                         name_destroy(&logname);
4302                         GOTO(out_mti, rc);
4303                 }
4304         }
4305
4306         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4307                                 cmd, fsname, poolname, ostname, label);
4308         mutex_unlock(&fsdb->fsdb_mutex);
4309         name_destroy(&logname);
4310         /* request for update */
4311         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4312
4313         EXIT;
4314 out_mti:
4315         if (mti != NULL)
4316                 OBD_FREE_PTR(mti);
4317 out_cancel:
4318         if (canceled_label != NULL)
4319                 OBD_FREE(canceled_label, label_sz);
4320 out_label:
4321         OBD_FREE(label, label_sz);
4322         return rc;
4323 }