Whamcloud - gitweb
LU-8900 mgs: use reference count for fs_db
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <lustre_ioctl.h>
46 #include <lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49
50 #include "mgs_internal.h"
51
52 /********************** Class functions ********************/
53
54 /* Find all logs in CONFIG directory and link then into list */
55 int class_dentry_readdir(const struct lu_env *env,
56                          struct mgs_device *mgs, struct list_head *log_list)
57 {
58         struct dt_object    *dir = mgs->mgs_configs_dir;
59         const struct dt_it_ops *iops;
60         struct dt_it        *it;
61         struct mgs_direntry *de;
62         char                *key;
63         int                  rc, key_sz;
64
65         INIT_LIST_HEAD(log_list);
66
67         LASSERT(dir);
68         LASSERT(dir->do_index_ops);
69
70         iops = &dir->do_index_ops->dio_it;
71         it = iops->init(env, dir, LUDA_64BITHASH);
72         if (IS_ERR(it))
73                 RETURN(PTR_ERR(it));
74
75         rc = iops->load(env, it, 0);
76         if (rc <= 0)
77                 GOTO(fini, rc = 0);
78
79         /* main cycle */
80         do {
81                 key = (void *)iops->key(env, it);
82                 if (IS_ERR(key)) {
83                         CERROR("%s: key failed when listing %s: rc = %d\n",
84                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
85                                (int) PTR_ERR(key));
86                         goto next;
87                 }
88                 key_sz = iops->key_size(env, it);
89                 LASSERT(key_sz > 0);
90
91                 /* filter out "." and ".." entries */
92                 if (key[0] == '.') {
93                         if (key_sz == 1)
94                                 goto next;
95                         if (key_sz == 2 && key[1] == '.')
96                                 goto next;
97                 }
98
99                 /* filter out ".bak" files */
100                 /* sizeof(".bak") - 1 == 3 */
101                 if (key_sz >= 3 &&
102                     !memcmp(".bak", key + key_sz - 3, 3)) {
103                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
104                                key_sz, key);
105                         goto next;
106                 }
107
108                 de = mgs_direntry_alloc(key_sz + 1);
109                 if (de == NULL) {
110                         rc = -ENOMEM;
111                         break;
112                 }
113
114                 memcpy(de->mde_name, key, key_sz);
115                 de->mde_name[key_sz] = 0;
116
117                 list_add(&de->mde_list, log_list);
118
119 next:
120                 rc = iops->next(env, it);
121         } while (rc == 0);
122         if (rc > 0)
123                 rc = 0;
124
125         iops->put(env, it);
126
127 fini:
128         iops->fini(env, it);
129         if (rc)
130                 CERROR("%s: key failed when listing %s: rc = %d\n",
131                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
132         RETURN(rc);
133 }
134
135 /******************** DB functions *********************/
136
137 static inline int name_create(char **newname, char *prefix, char *suffix)
138 {
139         LASSERT(newname);
140         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
141         if (!*newname)
142                 return -ENOMEM;
143         sprintf(*newname, "%s%s", prefix, suffix);
144         return 0;
145 }
146
147 static inline void name_destroy(char **name)
148 {
149         if (*name)
150                 OBD_FREE(*name, strlen(*name) + 1);
151         *name = NULL;
152 }
153
154 struct mgs_fsdb_handler_data
155 {
156         struct fs_db   *fsdb;
157         __u32           ver;
158 };
159
160 /* from the (client) config log, figure out:
161         1. which ost's/mdt's are configured (by index)
162         2. what the last config step is
163         3. COMPAT_18 osc name
164 */
165 /* It might be better to have a separate db file, instead of parsing the info
166    out of the client log.  This is slow and potentially error-prone. */
167 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
168                             struct llog_rec_hdr *rec, void *data)
169 {
170         struct mgs_fsdb_handler_data *d = data;
171         struct fs_db *fsdb = d->fsdb;
172         int cfg_len = rec->lrh_len;
173         char *cfg_buf = (char*) (rec + 1);
174         struct lustre_cfg *lcfg;
175         __u32 index;
176         int rc = 0;
177         ENTRY;
178
179         if (rec->lrh_type != OBD_CFG_REC) {
180                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
181                 RETURN(-EINVAL);
182         }
183
184         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
185         if (rc) {
186                 CERROR("Insane cfg\n");
187                 RETURN(rc);
188         }
189
190         lcfg = (struct lustre_cfg *)cfg_buf;
191
192         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
193                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
194
195         /* Figure out ost indicies */
196         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
197         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
198             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
199                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
200                                        NULL, 10);
201                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
202                        lustre_cfg_string(lcfg, 1), index,
203                        lustre_cfg_string(lcfg, 2));
204                 set_bit(index, fsdb->fsdb_ost_index_map);
205         }
206
207         /* Figure out mdt indicies */
208         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
209         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
210             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
211                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
212                                        &index, NULL);
213                 if (rc != LDD_F_SV_TYPE_MDT) {
214                         CWARN("Unparsable MDC name %s, assuming index 0\n",
215                               lustre_cfg_string(lcfg, 0));
216                         index = 0;
217                 }
218                 rc = 0;
219                 CDEBUG(D_MGS, "MDT index is %u\n", index);
220                 set_bit(index, fsdb->fsdb_mdt_index_map);
221                 fsdb->fsdb_mdt_count ++;
222         }
223
224         /**
225          * figure out the old config. fsdb_gen = 0 means old log
226          * It is obsoleted and not supported anymore
227          */
228         if (fsdb->fsdb_gen == 0) {
229                 CERROR("Old config format is not supported\n");
230                 RETURN(-EINVAL);
231         }
232
233         /*
234          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
235          */
236         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
237             lcfg->lcfg_command == LCFG_ATTACH &&
238             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
239                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
240                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
241                         CWARN("MDT using 1.8 OSC name scheme\n");
242                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
243                 }
244         }
245
246         if (lcfg->lcfg_command == LCFG_MARKER) {
247                 struct cfg_marker *marker;
248                 marker = lustre_cfg_buf(lcfg, 1);
249
250                 d->ver = marker->cm_vers;
251
252                 /* Keep track of the latest marker step */
253                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
254         }
255
256         RETURN(rc);
257 }
258
259 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
260 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
261                                   struct mgs_device *mgs,
262                                   struct fs_db *fsdb)
263 {
264         char *logname;
265         struct llog_handle *loghandle;
266         struct llog_ctxt *ctxt;
267         struct mgs_fsdb_handler_data d = {
268                 .fsdb = fsdb,
269         };
270         int rc;
271
272         ENTRY;
273
274         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
275         LASSERT(ctxt != NULL);
276         rc = name_create(&logname, fsdb->fsdb_name, "-client");
277         if (rc)
278                 GOTO(out_put, rc);
279         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
280         if (rc)
281                 GOTO(out_pop, rc);
282
283         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
284         if (rc)
285                 GOTO(out_close, rc);
286
287         if (llog_get_size(loghandle) <= 1)
288                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
289
290         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
291         CDEBUG(D_INFO, "get_db = %d\n", rc);
292 out_close:
293         llog_close(env, loghandle);
294 out_pop:
295         name_destroy(&logname);
296 out_put:
297         llog_ctxt_put(ctxt);
298
299         RETURN(rc);
300 }
301
302 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
303 {
304         struct mgs_tgt_srpc_conf *tgtconf;
305
306         /* free target-specific rules */
307         while (fsdb->fsdb_srpc_tgt) {
308                 tgtconf = fsdb->fsdb_srpc_tgt;
309                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
310
311                 LASSERT(tgtconf->mtsc_tgt);
312
313                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
314                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
315                 OBD_FREE_PTR(tgtconf);
316         }
317
318         /* free general rules */
319         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
320 }
321
322 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
323 {
324         mutex_lock(&mgs->mgs_mutex);
325         if (likely(!list_empty(&fsdb->fsdb_list))) {
326                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
327                          "Invalid ref %d on %s\n",
328                          atomic_read(&fsdb->fsdb_ref),
329                          fsdb->fsdb_name);
330
331                 list_del_init(&fsdb->fsdb_list);
332                 /* Drop the reference on the list.*/
333                 mgs_put_fsdb(mgs, fsdb);
334         }
335         mutex_unlock(&mgs->mgs_mutex);
336 }
337
338 /* The caller must hold mgs->mgs_mutex. */
339 static inline struct fs_db *
340 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
341 {
342         struct fs_db *fsdb;
343         struct list_head *tmp;
344
345         list_for_each(tmp, &mgs->mgs_fs_db_list) {
346                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
347                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
348                         return fsdb;
349         }
350
351         return NULL;
352 }
353
354 /* The caller must hold mgs->mgs_mutex. */
355 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
356 {
357         struct fs_db *fsdb;
358
359         fsdb = mgs_find_fsdb_noref(mgs, name);
360         if (fsdb) {
361                 list_del_init(&fsdb->fsdb_list);
362                 /* Drop the reference on the list.*/
363                 mgs_put_fsdb(mgs, fsdb);
364         }
365 }
366
367 /* The caller must hold mgs->mgs_mutex. */
368 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
369 {
370         struct fs_db *fsdb;
371
372         fsdb = mgs_find_fsdb_noref(mgs, fsname);
373         if (fsdb)
374                 atomic_inc(&fsdb->fsdb_ref);
375
376         return fsdb;
377 }
378
379 /* The caller must hold mgs->mgs_mutex. */
380 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
381                                   struct mgs_device *mgs, char *fsname)
382 {
383         struct fs_db *fsdb;
384         int rc;
385         ENTRY;
386
387         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
388                 CERROR("fsname %s is too long\n", fsname);
389
390                 RETURN(ERR_PTR(-EINVAL));
391         }
392
393         OBD_ALLOC_PTR(fsdb);
394         if (!fsdb)
395                 RETURN(ERR_PTR(-ENOMEM));
396
397         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
398         mutex_init(&fsdb->fsdb_mutex);
399         INIT_LIST_HEAD(&fsdb->fsdb_list);
400         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
401         fsdb->fsdb_gen = 1;
402         INIT_LIST_HEAD(&fsdb->fsdb_clients);
403         atomic_set(&fsdb->fsdb_notify_phase, 0);
404         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
405         init_completion(&fsdb->fsdb_notify_comp);
406
407         if (strcmp(fsname, MGSSELF_NAME) == 0) {
408                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
409                 fsdb->fsdb_mgs = mgs;
410         } else {
411                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
412                 if (!fsdb->fsdb_mdt_index_map) {
413                         CERROR("No memory for MDT index maps\n");
414
415                         GOTO(err, rc = -ENOMEM);
416                 }
417
418                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
419                 if (!fsdb->fsdb_ost_index_map) {
420                         CERROR("No memory for OST index maps\n");
421
422                         GOTO(err, rc = -ENOMEM);
423                 }
424
425                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
426                 if (rc)
427                         GOTO(err, rc);
428
429                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
430                 if (rc)
431                         GOTO(err, rc);
432
433                 /* initialise data for NID table */
434                 mgs_ir_init_fs(env, mgs, fsdb);
435                 lproc_mgs_add_live(mgs, fsdb);
436         }
437
438         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
439                 /* populate the db from the client llog */
440                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
441                 if (rc) {
442                         CERROR("Can't get db from client log %d\n", rc);
443
444                         GOTO(err, rc);
445                 }
446         }
447
448         /* populate srpc rules from params llog */
449         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
450         if (rc) {
451                 CERROR("Can't get db from params log %d\n", rc);
452
453                 GOTO(err, rc);
454         }
455
456         /* One ref is for the fsdb on the list.
457          * The other ref is for the caller. */
458         atomic_set(&fsdb->fsdb_ref, 2);
459         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
460
461         RETURN(fsdb);
462
463 err:
464         atomic_set(&fsdb->fsdb_ref, 1);
465         mgs_put_fsdb(mgs, fsdb);
466
467         RETURN(ERR_PTR(rc));
468 }
469
470 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
471 {
472         LASSERT(list_empty(&fsdb->fsdb_list));
473
474         lproc_mgs_del_live(mgs, fsdb);
475
476         /* deinitialize fsr */
477         mgs_ir_fini_fs(mgs, fsdb);
478
479         if (fsdb->fsdb_ost_index_map)
480                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
481         if (fsdb->fsdb_mdt_index_map)
482                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
483         name_destroy(&fsdb->fsdb_clilov);
484         name_destroy(&fsdb->fsdb_clilmv);
485         mgs_free_fsdb_srpc(fsdb);
486         OBD_FREE_PTR(fsdb);
487 }
488
489 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
490 {
491         if (atomic_dec_and_test(&fsdb->fsdb_ref))
492                 mgs_free_fsdb(mgs, fsdb);
493 }
494
495 int mgs_init_fsdb_list(struct mgs_device *mgs)
496 {
497         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
498         return 0;
499 }
500
501 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
502 {
503         struct fs_db *fsdb;
504         struct list_head *tmp, *tmp2;
505
506         mutex_lock(&mgs->mgs_mutex);
507         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
508                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
509                 list_del_init(&fsdb->fsdb_list);
510                 mgs_put_fsdb(mgs, fsdb);
511         }
512         mutex_unlock(&mgs->mgs_mutex);
513         return 0;
514 }
515
516 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
517                           char *name, struct fs_db **dbh)
518 {
519         struct fs_db *fsdb;
520         int rc = 0;
521         ENTRY;
522
523         mutex_lock(&mgs->mgs_mutex);
524         fsdb = mgs_find_fsdb(mgs, name);
525         if (!fsdb) {
526                 fsdb = mgs_new_fsdb(env, mgs, name);
527                 if (IS_ERR(fsdb))
528                         rc = PTR_ERR(fsdb);
529
530                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
531         }
532         mutex_unlock(&mgs->mgs_mutex);
533
534         if (!rc)
535                 *dbh = fsdb;
536
537         RETURN(rc);
538 }
539
540 /* 1 = index in use
541    0 = index unused
542    -1= empty client log */
543 int mgs_check_index(const struct lu_env *env,
544                     struct mgs_device *mgs,
545                     struct mgs_target_info *mti)
546 {
547         struct fs_db *fsdb;
548         void *imap;
549         int rc = 0;
550         ENTRY;
551
552         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
553
554         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
555         if (rc) {
556                 CERROR("Can't get db for %s\n", mti->mti_fsname);
557                 RETURN(rc);
558         }
559
560         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
561                 GOTO(out, rc = -1);
562
563         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
564                 imap = fsdb->fsdb_ost_index_map;
565         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
566                 imap = fsdb->fsdb_mdt_index_map;
567         else
568                 GOTO(out, rc = -EINVAL);
569
570         if (test_bit(mti->mti_stripe_index, imap))
571                 GOTO(out, rc = 1);
572
573         GOTO(out, rc = 0);
574
575 out:
576         mgs_put_fsdb(mgs, fsdb);
577         return rc;
578 }
579
580 static __inline__ int next_index(void *index_map, int map_len)
581 {
582         int i;
583         for (i = 0; i < map_len * 8; i++)
584                 if (!test_bit(i, index_map)) {
585                          return i;
586                  }
587         CERROR("max index %d exceeded.\n", i);
588         return -1;
589 }
590
591 /* Return codes:
592         0  newly marked as in use
593         <0 err
594         +EALREADY for update of an old index */
595 static int mgs_set_index(const struct lu_env *env,
596                          struct mgs_device *mgs,
597                          struct mgs_target_info *mti)
598 {
599         struct fs_db *fsdb;
600         void *imap;
601         int rc = 0;
602         ENTRY;
603
604         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
605         if (rc) {
606                 CERROR("Can't get db for %s\n", mti->mti_fsname);
607                 RETURN(rc);
608         }
609
610         mutex_lock(&fsdb->fsdb_mutex);
611         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
612                 imap = fsdb->fsdb_ost_index_map;
613         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
614                 imap = fsdb->fsdb_mdt_index_map;
615         } else {
616                 GOTO(out_up, rc = -EINVAL);
617         }
618
619         if (mti->mti_flags & LDD_F_NEED_INDEX) {
620                 rc = next_index(imap, INDEX_MAP_SIZE);
621                 if (rc == -1)
622                         GOTO(out_up, rc = -ERANGE);
623                 mti->mti_stripe_index = rc;
624                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
625                         fsdb->fsdb_mdt_count ++;
626         }
627
628         /* the last index(0xffff) is reserved for default value. */
629         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
630                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
631                                    "but index must be less than %u.\n",
632                                    mti->mti_svname, mti->mti_stripe_index,
633                                    INDEX_MAP_SIZE * 8 - 1);
634                 GOTO(out_up, rc = -ERANGE);
635         }
636
637         if (test_bit(mti->mti_stripe_index, imap)) {
638                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
639                     !(mti->mti_flags & LDD_F_WRITECONF)) {
640                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
641                                            "%d, but that index is already in "
642                                            "use. Use --writeconf to force\n",
643                                            mti->mti_svname,
644                                            mti->mti_stripe_index);
645                         GOTO(out_up, rc = -EADDRINUSE);
646                 } else {
647                         CDEBUG(D_MGS, "Server %s updating index %d\n",
648                                mti->mti_svname, mti->mti_stripe_index);
649                         GOTO(out_up, rc = EALREADY);
650                 }
651         }
652
653         set_bit(mti->mti_stripe_index, imap);
654         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
655         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
656                              mti->mti_stripe_index, mti->mti_fsname,
657                              mti->mti_svname)) {
658                 CERROR("unknown server type %#x\n", mti->mti_flags);
659                 GOTO(out_up, rc = -EINVAL);
660         }
661
662         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
663                mti->mti_stripe_index);
664
665         GOTO(out_up, rc = 0);
666
667 out_up:
668         mutex_unlock(&fsdb->fsdb_mutex);
669         mgs_put_fsdb(mgs, fsdb);
670         return rc;
671 }
672
673 struct mgs_modify_lookup {
674         struct cfg_marker mml_marker;
675         int               mml_modified;
676 };
677
678 static int mgs_check_record_match(const struct lu_env *env,
679                                 struct llog_handle *llh,
680                                 struct llog_rec_hdr *rec, void *data)
681 {
682         struct cfg_marker *mc_marker = data;
683         struct cfg_marker *marker;
684         struct lustre_cfg *lcfg = REC_DATA(rec);
685         int cfg_len = REC_DATA_LEN(rec);
686         int rc;
687         ENTRY;
688
689
690         if (rec->lrh_type != OBD_CFG_REC) {
691                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
692                 RETURN(-EINVAL);
693         }
694
695         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
696         if (rc) {
697                 CDEBUG(D_ERROR, "Insane cfg\n");
698                 RETURN(rc);
699         }
700
701         /* We only care about markers */
702         if (lcfg->lcfg_command != LCFG_MARKER)
703                 RETURN(0);
704
705         marker = lustre_cfg_buf(lcfg, 1);
706
707         if (marker->cm_flags & CM_SKIP)
708                 RETURN(0);
709
710         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
711                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
712                 /* Found a non-skipped marker match */
713                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
714                         rec->lrh_index, marker->cm_step,
715                         marker->cm_flags, marker->cm_tgtname,
716                         marker->cm_comment);
717                 rc = LLOG_PROC_BREAK;
718         }
719
720         RETURN(rc);
721 }
722
723 /**
724  * Check an existing config log record with matching comment and device
725  * Return code:
726  * 0 - checked successfully,
727  * LLOG_PROC_BREAK - record matches
728  * negative - error
729  */
730 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
731                 struct fs_db *fsdb, struct mgs_target_info *mti,
732                 char *logname, char *devname, char *comment)
733 {
734         struct llog_handle *loghandle;
735         struct llog_ctxt *ctxt;
736         struct cfg_marker *mc_marker;
737         int rc;
738
739         ENTRY;
740
741         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
742         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
743
744         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
745         LASSERT(ctxt != NULL);
746         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
747         if (rc < 0) {
748                 if (rc == -ENOENT)
749                         rc = 0;
750                 GOTO(out_pop, rc);
751         }
752
753         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
754         if (rc)
755                 GOTO(out_close, rc);
756
757         if (llog_get_size(loghandle) <= 1)
758                 GOTO(out_close, rc = 0);
759
760         OBD_ALLOC_PTR(mc_marker);
761         if (!mc_marker)
762                 GOTO(out_close, rc = -ENOMEM);
763         if (strlcpy(mc_marker->cm_comment, comment,
764                 sizeof(mc_marker->cm_comment)) >=
765                 sizeof(mc_marker->cm_comment))
766                 GOTO(out_free, rc = -E2BIG);
767         if (strlcpy(mc_marker->cm_tgtname, devname,
768                 sizeof(mc_marker->cm_tgtname)) >=
769                 sizeof(mc_marker->cm_tgtname))
770                 GOTO(out_free, rc = -E2BIG);
771
772         rc = llog_process(env, loghandle, mgs_check_record_match,
773                         (void *)mc_marker, NULL);
774
775 out_free:
776         OBD_FREE_PTR(mc_marker);
777
778 out_close:
779         llog_close(env, loghandle);
780 out_pop:
781         if (rc && rc != LLOG_PROC_BREAK)
782                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
783                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
784         llog_ctxt_put(ctxt);
785         RETURN(rc);
786 }
787
788 static int mgs_modify_handler(const struct lu_env *env,
789                               struct llog_handle *llh,
790                               struct llog_rec_hdr *rec, void *data)
791 {
792         struct mgs_modify_lookup *mml = data;
793         struct cfg_marker *marker;
794         struct lustre_cfg *lcfg = REC_DATA(rec);
795         int cfg_len = REC_DATA_LEN(rec);
796         int rc;
797         ENTRY;
798
799         if (rec->lrh_type != OBD_CFG_REC) {
800                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
801                 RETURN(-EINVAL);
802         }
803
804         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
805         if (rc) {
806                 CERROR("Insane cfg\n");
807                 RETURN(rc);
808         }
809
810         /* We only care about markers */
811         if (lcfg->lcfg_command != LCFG_MARKER)
812                 RETURN(0);
813
814         marker = lustre_cfg_buf(lcfg, 1);
815         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
816             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
817             !(marker->cm_flags & CM_SKIP)) {
818                 /* Found a non-skipped marker match */
819                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
820                        rec->lrh_index, marker->cm_step,
821                        marker->cm_flags, mml->mml_marker.cm_flags,
822                        marker->cm_tgtname, marker->cm_comment);
823                 /* Overwrite the old marker llog entry */
824                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
825                 marker->cm_flags |= mml->mml_marker.cm_flags;
826                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
827                 rc = llog_write(env, llh, rec, rec->lrh_index);
828                 if (!rc)
829                          mml->mml_modified++;
830         }
831
832         RETURN(rc);
833 }
834
835 /**
836  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
837  * Return code:
838  * 0 - modified successfully,
839  * 1 - no modification was done
840  * negative - error
841  */
842 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
843                       struct fs_db *fsdb, struct mgs_target_info *mti,
844                       char *logname, char *devname, char *comment, int flags)
845 {
846         struct llog_handle *loghandle;
847         struct llog_ctxt *ctxt;
848         struct mgs_modify_lookup *mml;
849         int rc;
850
851         ENTRY;
852
853         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
854         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
855                flags);
856
857         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
858         LASSERT(ctxt != NULL);
859         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
860         if (rc < 0) {
861                 if (rc == -ENOENT)
862                         rc = 0;
863                 GOTO(out_pop, rc);
864         }
865
866         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
867         if (rc)
868                 GOTO(out_close, rc);
869
870         if (llog_get_size(loghandle) <= 1)
871                 GOTO(out_close, rc = 0);
872
873         OBD_ALLOC_PTR(mml);
874         if (!mml)
875                 GOTO(out_close, rc = -ENOMEM);
876         if (strlcpy(mml->mml_marker.cm_comment, comment,
877                     sizeof(mml->mml_marker.cm_comment)) >=
878             sizeof(mml->mml_marker.cm_comment))
879                 GOTO(out_free, rc = -E2BIG);
880         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
881                     sizeof(mml->mml_marker.cm_tgtname)) >=
882             sizeof(mml->mml_marker.cm_tgtname))
883                 GOTO(out_free, rc = -E2BIG);
884         /* Modify mostly means cancel */
885         mml->mml_marker.cm_flags = flags;
886         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
887         mml->mml_modified = 0;
888         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
889                           NULL);
890         if (!rc && !mml->mml_modified)
891                 rc = 1;
892
893 out_free:
894         OBD_FREE_PTR(mml);
895
896 out_close:
897         llog_close(env, loghandle);
898 out_pop:
899         if (rc < 0)
900                 CERROR("%s: modify %s/%s failed: rc = %d\n",
901                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
902         llog_ctxt_put(ctxt);
903         RETURN(rc);
904 }
905
906 /** This structure is passed to mgs_replace_handler */
907 struct mgs_replace_uuid_lookup {
908         /* Nids are replaced for this target device */
909         struct mgs_target_info target;
910         /* Temporary modified llog */
911         struct llog_handle *temp_llh;
912         /* Flag is set if in target block*/
913         int in_target_device;
914         /* Nids already added. Just skip (multiple nids) */
915         int device_nids_added;
916         /* Flag is set if this block should not be copied */
917         int skip_it;
918 };
919
920 /**
921  * Check: a) if block should be skipped
922  * b) is it target block
923  *
924  * \param[in] lcfg
925  * \param[in] mrul
926  *
927  * \retval 0 should not to be skipped
928  * \retval 1 should to be skipped
929  */
930 static int check_markers(struct lustre_cfg *lcfg,
931                          struct mgs_replace_uuid_lookup *mrul)
932 {
933          struct cfg_marker *marker;
934
935         /* Track markers. Find given device */
936         if (lcfg->lcfg_command == LCFG_MARKER) {
937                 marker = lustre_cfg_buf(lcfg, 1);
938                 /* Clean llog from records marked as CM_EXCLUDE.
939                    CM_SKIP records are used for "active" command
940                    and can be restored if needed */
941                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
942                     (CM_EXCLUDE | CM_START)) {
943                         mrul->skip_it = 1;
944                         return 1;
945                 }
946
947                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
948                     (CM_EXCLUDE | CM_END)) {
949                         mrul->skip_it = 0;
950                         return 1;
951                 }
952
953                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
954                         LASSERT(!(marker->cm_flags & CM_START) ||
955                                 !(marker->cm_flags & CM_END));
956                         if (marker->cm_flags & CM_START) {
957                                 mrul->in_target_device = 1;
958                                 mrul->device_nids_added = 0;
959                         } else if (marker->cm_flags & CM_END)
960                                 mrul->in_target_device = 0;
961                 }
962         }
963
964         return 0;
965 }
966
967 static int record_base(const struct lu_env *env, struct llog_handle *llh,
968                      char *cfgname, lnet_nid_t nid, int cmd,
969                      char *s1, char *s2, char *s3, char *s4)
970 {
971         struct mgs_thread_info  *mgi = mgs_env_info(env);
972         struct llog_cfg_rec     *lcr;
973         int rc;
974
975         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
976                cmd, s1, s2, s3, s4);
977
978         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
979         if (s1)
980                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
981         if (s2)
982                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
983         if (s3)
984                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
985         if (s4)
986                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
987
988         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
989         if (lcr == NULL)
990                 return -ENOMEM;
991
992         lcr->lcr_cfg.lcfg_nid = nid;
993         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
994
995         lustre_cfg_rec_free(lcr);
996
997         if (rc < 0)
998                 CDEBUG(D_MGS,
999                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1000                        cfgname, cmd, s1, s2, s3, s4, rc);
1001         return rc;
1002 }
1003
1004 static inline int record_add_uuid(const struct lu_env *env,
1005                                   struct llog_handle *llh,
1006                                   uint64_t nid, char *uuid)
1007 {
1008         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1009                            NULL, NULL, NULL);
1010 }
1011
1012 static inline int record_add_conn(const struct lu_env *env,
1013                                   struct llog_handle *llh,
1014                                   char *devname, char *uuid)
1015 {
1016         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1017                            NULL, NULL, NULL);
1018 }
1019
1020 static inline int record_attach(const struct lu_env *env,
1021                                 struct llog_handle *llh, char *devname,
1022                                 char *type, char *uuid)
1023 {
1024         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1025                            NULL, NULL);
1026 }
1027
1028 static inline int record_setup(const struct lu_env *env,
1029                                struct llog_handle *llh, char *devname,
1030                                char *s1, char *s2, char *s3, char *s4)
1031 {
1032         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1033 }
1034
1035 /**
1036  * \retval <0 record processing error
1037  * \retval n record is processed. No need copy original one.
1038  * \retval 0 record is not processed.
1039  */
1040 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1041                            struct mgs_replace_uuid_lookup *mrul)
1042 {
1043         int nids_added = 0;
1044         lnet_nid_t nid;
1045         char *ptr;
1046         int rc;
1047
1048         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1049                 /* LCFG_ADD_UUID command found. Let's skip original command
1050                    and add passed nids */
1051                 ptr = mrul->target.mti_params;
1052                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1053                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1054                                "device %s\n", libcfs_nid2str(nid),
1055                                 mrul->target.mti_params,
1056                                 mrul->target.mti_svname);
1057                         rc = record_add_uuid(env,
1058                                              mrul->temp_llh, nid,
1059                                              mrul->target.mti_params);
1060                         if (!rc)
1061                                 nids_added++;
1062                 }
1063
1064                 if (nids_added == 0) {
1065                         CERROR("No new nids were added, nid %s with uuid %s, "
1066                                "device %s\n", libcfs_nid2str(nid),
1067                                mrul->target.mti_params,
1068                                mrul->target.mti_svname);
1069                         RETURN(-ENXIO);
1070                 } else {
1071                         mrul->device_nids_added = 1;
1072                 }
1073
1074                 return nids_added;
1075         }
1076
1077         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
1078                 /* LCFG_SETUP command found. UUID should be changed */
1079                 rc = record_setup(env,
1080                                   mrul->temp_llh,
1081                                   /* devname the same */
1082                                   lustre_cfg_string(lcfg, 0),
1083                                   /* s1 is not changed */
1084                                   lustre_cfg_string(lcfg, 1),
1085                                   /* new uuid should be
1086                                   the full nidlist */
1087                                   mrul->target.mti_params,
1088                                   /* s3 is not changed */
1089                                   lustre_cfg_string(lcfg, 3),
1090                                   /* s4 is not changed */
1091                                   lustre_cfg_string(lcfg, 4));
1092                 return rc ? rc : 1;
1093         }
1094
1095         /* Another commands in target device block */
1096         return 0;
1097 }
1098
1099 /**
1100  * Handler that called for every record in llog.
1101  * Records are processed in order they placed in llog.
1102  *
1103  * \param[in] llh       log to be processed
1104  * \param[in] rec       current record
1105  * \param[in] data      mgs_replace_uuid_lookup structure
1106  *
1107  * \retval 0    success
1108  */
1109 static int mgs_replace_handler(const struct lu_env *env,
1110                                struct llog_handle *llh,
1111                                struct llog_rec_hdr *rec,
1112                                void *data)
1113 {
1114         struct mgs_replace_uuid_lookup *mrul;
1115         struct lustre_cfg *lcfg = REC_DATA(rec);
1116         int cfg_len = REC_DATA_LEN(rec);
1117         int rc;
1118         ENTRY;
1119
1120         mrul = (struct mgs_replace_uuid_lookup *)data;
1121
1122         if (rec->lrh_type != OBD_CFG_REC) {
1123                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1124                        rec->lrh_type, lcfg->lcfg_command,
1125                        lustre_cfg_string(lcfg, 0),
1126                        lustre_cfg_string(lcfg, 1));
1127                 RETURN(-EINVAL);
1128         }
1129
1130         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1131         if (rc) {
1132                 /* Do not copy any invalidated records */
1133                 GOTO(skip_out, rc = 0);
1134         }
1135
1136         rc = check_markers(lcfg, mrul);
1137         if (rc || mrul->skip_it)
1138                 GOTO(skip_out, rc = 0);
1139
1140         /* Write to new log all commands outside target device block */
1141         if (!mrul->in_target_device)
1142                 GOTO(copy_out, rc = 0);
1143
1144         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
1145            (failover nids) for this target, assuming that if then
1146            primary is changing then so is the failover */
1147         if (mrul->device_nids_added &&
1148             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1149              lcfg->lcfg_command == LCFG_ADD_CONN))
1150                 GOTO(skip_out, rc = 0);
1151
1152         rc = process_command(env, lcfg, mrul);
1153         if (rc < 0)
1154                 RETURN(rc);
1155
1156         if (rc)
1157                 RETURN(0);
1158 copy_out:
1159         /* Record is placed in temporary llog as is */
1160         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
1161
1162         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1163                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1164                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1165         RETURN(rc);
1166
1167 skip_out:
1168         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1169                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1170                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1171         RETURN(rc);
1172 }
1173
1174 static int mgs_log_is_empty(const struct lu_env *env,
1175                             struct mgs_device *mgs, char *name)
1176 {
1177         struct llog_ctxt        *ctxt;
1178         int                      rc;
1179
1180         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1181         LASSERT(ctxt != NULL);
1182
1183         rc = llog_is_empty(env, ctxt, name);
1184         llog_ctxt_put(ctxt);
1185         return rc;
1186 }
1187
1188 static int mgs_replace_nids_log(const struct lu_env *env,
1189                                 struct obd_device *mgs, struct fs_db *fsdb,
1190                                 char *logname, char *devname, char *nids)
1191 {
1192         struct llog_handle *orig_llh, *backup_llh;
1193         struct llog_ctxt *ctxt;
1194         struct mgs_replace_uuid_lookup *mrul;
1195         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1196         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1197         char *backup;
1198         int rc, rc2;
1199         ENTRY;
1200
1201         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1202
1203         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1204         LASSERT(ctxt != NULL);
1205
1206         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1207                 /* Log is empty. Nothing to replace */
1208                 GOTO(out_put, rc = 0);
1209         }
1210
1211         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1212         if (backup == NULL)
1213                 GOTO(out_put, rc = -ENOMEM);
1214
1215         sprintf(backup, "%s.bak", logname);
1216
1217         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1218         if (rc == 0) {
1219                 /* Now erase original log file. Connections are not allowed.
1220                    Backup is already saved */
1221                 rc = llog_erase(env, ctxt, NULL, logname);
1222                 if (rc < 0)
1223                         GOTO(out_free, rc);
1224         } else if (rc != -ENOENT) {
1225                 CERROR("%s: can't make backup for %s: rc = %d\n",
1226                        mgs->obd_name, logname, rc);
1227                 GOTO(out_free,rc);
1228         }
1229
1230         /* open local log */
1231         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1232         if (rc)
1233                 GOTO(out_restore, rc);
1234
1235         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1236         if (rc)
1237                 GOTO(out_closel, rc);
1238
1239         /* open backup llog */
1240         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1241                        LLOG_OPEN_EXISTS);
1242         if (rc)
1243                 GOTO(out_closel, rc);
1244
1245         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1246         if (rc)
1247                 GOTO(out_close, rc);
1248
1249         if (llog_get_size(backup_llh) <= 1)
1250                 GOTO(out_close, rc = 0);
1251
1252         OBD_ALLOC_PTR(mrul);
1253         if (!mrul)
1254                 GOTO(out_close, rc = -ENOMEM);
1255         /* devname is only needed information to replace UUID records */
1256         strlcpy(mrul->target.mti_svname, devname,
1257                 sizeof(mrul->target.mti_svname));
1258         /* parse nids later */
1259         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1260         /* Copy records to this temporary llog */
1261         mrul->temp_llh = orig_llh;
1262
1263         rc = llog_process(env, backup_llh, mgs_replace_handler,
1264                           (void *)mrul, NULL);
1265         OBD_FREE_PTR(mrul);
1266 out_close:
1267         rc2 = llog_close(NULL, backup_llh);
1268         if (!rc)
1269                 rc = rc2;
1270 out_closel:
1271         rc2 = llog_close(NULL, orig_llh);
1272         if (!rc)
1273                 rc = rc2;
1274
1275 out_restore:
1276         if (rc) {
1277                 CERROR("%s: llog should be restored: rc = %d\n",
1278                        mgs->obd_name, rc);
1279                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1280                                   logname);
1281                 if (rc2 < 0)
1282                         CERROR("%s: can't restore backup %s: rc = %d\n",
1283                                mgs->obd_name, logname, rc2);
1284         }
1285
1286 out_free:
1287         OBD_FREE(backup, strlen(backup) + 1);
1288
1289 out_put:
1290         llog_ctxt_put(ctxt);
1291
1292         if (rc)
1293                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1294                        mgs->obd_name, logname, rc);
1295
1296         RETURN(rc);
1297 }
1298
1299 /**
1300  * Parse device name and get file system name and/or device index
1301  *
1302  * \param[in]   devname device name (ex. lustre-MDT0000)
1303  * \param[out]  fsname  file system name(optional)
1304  * \param[out]  index   device index(optional)
1305  *
1306  * \retval 0    success
1307  */
1308 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1309 {
1310         int rc;
1311         ENTRY;
1312
1313         /* Extract fsname */
1314         if (fsname) {
1315                 rc = server_name2fsname(devname, fsname, NULL);
1316                 if (rc < 0) {
1317                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1318                                devname);
1319                         RETURN(-EINVAL);
1320                 }
1321         }
1322
1323         if (index) {
1324                 rc = server_name2index(devname, index, NULL);
1325                 if (rc < 0) {
1326                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1327                                devname);
1328                         RETURN(-EINVAL);
1329                 }
1330         }
1331
1332         RETURN(0);
1333 }
1334
1335 /* This is only called during replace_nids */
1336 static int only_mgs_is_running(struct obd_device *mgs_obd)
1337 {
1338         /* TDB: Is global variable with devices count exists? */
1339         int num_devices = get_devices_count();
1340         int num_exports = 0;
1341         struct obd_export *exp;
1342
1343         spin_lock(&mgs_obd->obd_dev_lock);
1344         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1345                 /* skip self export */
1346                 if (exp == mgs_obd->obd_self_export)
1347                         continue;
1348                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1349                         continue;
1350
1351                 ++num_exports;
1352
1353                 CERROR("%s: node %s still connected during replace_nids "
1354                        "connect_flags:%llx\n",
1355                        mgs_obd->obd_name,
1356                        libcfs_nid2str(exp->exp_nid_stats->nid),
1357                        exp_connect_flags(exp));
1358
1359         }
1360         spin_unlock(&mgs_obd->obd_dev_lock);
1361
1362         /* osd, MGS and MGC + self_export
1363            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1364         return (num_devices <= 3) && (num_exports == 0);
1365 }
1366
1367 static int name_create_mdt(char **logname, char *fsname, int i)
1368 {
1369         char mdt_index[9];
1370
1371         sprintf(mdt_index, "-MDT%04x", i);
1372         return name_create(logname, fsname, mdt_index);
1373 }
1374
1375 /**
1376  * Replace nids for \a device to \a nids values
1377  *
1378  * \param obd           MGS obd device
1379  * \param devname       nids need to be replaced for this device
1380  * (ex. lustre-OST0000)
1381  * \param nids          nids list (ex. nid1,nid2,nid3)
1382  *
1383  * \retval 0    success
1384  */
1385 int mgs_replace_nids(const struct lu_env *env,
1386                      struct mgs_device *mgs,
1387                      char *devname, char *nids)
1388 {
1389         /* Assume fsname is part of device name */
1390         char fsname[MTI_NAME_MAXLEN];
1391         int rc;
1392         __u32 index;
1393         char *logname;
1394         struct fs_db *fsdb = NULL;
1395         unsigned int i;
1396         int conn_state;
1397         struct obd_device *mgs_obd = mgs->mgs_obd;
1398         ENTRY;
1399
1400         /* We can only change NIDs if no other nodes are connected */
1401         spin_lock(&mgs_obd->obd_dev_lock);
1402         conn_state = mgs_obd->obd_no_conn;
1403         mgs_obd->obd_no_conn = 1;
1404         spin_unlock(&mgs_obd->obd_dev_lock);
1405
1406         /* We can not change nids if not only MGS is started */
1407         if (!only_mgs_is_running(mgs_obd)) {
1408                 CERROR("Only MGS is allowed to be started\n");
1409                 GOTO(out, rc = -EINPROGRESS);
1410         }
1411
1412         /* Get fsname and index*/
1413         rc = mgs_parse_devname(devname, fsname, &index);
1414         if (rc)
1415                 GOTO(out, rc);
1416
1417         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1418         if (rc) {
1419                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1420                 GOTO(out, rc);
1421         }
1422
1423         /* Process client llogs */
1424         name_create(&logname, fsname, "-client");
1425         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1426         name_destroy(&logname);
1427         if (rc) {
1428                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1429                        fsname, devname, rc);
1430                 GOTO(out, rc);
1431         }
1432
1433         /* Process MDT llogs */
1434         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1435                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1436                         continue;
1437                 name_create_mdt(&logname, fsname, i);
1438                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1439                 name_destroy(&logname);
1440                 if (rc)
1441                         GOTO(out, rc);
1442         }
1443
1444 out:
1445         spin_lock(&mgs_obd->obd_dev_lock);
1446         mgs_obd->obd_no_conn = conn_state;
1447         spin_unlock(&mgs_obd->obd_dev_lock);
1448
1449         if (fsdb)
1450                 mgs_put_fsdb(mgs, fsdb);
1451
1452         RETURN(rc);
1453 }
1454
1455 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1456                             char *devname, struct lov_desc *desc)
1457 {
1458         struct mgs_thread_info  *mgi = mgs_env_info(env);
1459         struct llog_cfg_rec     *lcr;
1460         int rc;
1461
1462         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1463         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1464         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1465         if (lcr == NULL)
1466                 return -ENOMEM;
1467
1468         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1469         lustre_cfg_rec_free(lcr);
1470         return rc;
1471 }
1472
1473 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1474                             char *devname, struct lmv_desc *desc)
1475 {
1476         struct mgs_thread_info  *mgi = mgs_env_info(env);
1477         struct llog_cfg_rec     *lcr;
1478         int rc;
1479
1480         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1481         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1482         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1483         if (lcr == NULL)
1484                 return -ENOMEM;
1485
1486         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1487         lustre_cfg_rec_free(lcr);
1488         return rc;
1489 }
1490
1491 static inline int record_mdc_add(const struct lu_env *env,
1492                                  struct llog_handle *llh,
1493                                  char *logname, char *mdcuuid,
1494                                  char *mdtuuid, char *index,
1495                                  char *gen)
1496 {
1497         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1498                            mdtuuid,index,gen,mdcuuid);
1499 }
1500
1501 static inline int record_lov_add(const struct lu_env *env,
1502                                  struct llog_handle *llh,
1503                                  char *lov_name, char *ost_uuid,
1504                                  char *index, char *gen)
1505 {
1506         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1507                            ost_uuid, index, gen, NULL);
1508 }
1509
1510 static inline int record_mount_opt(const struct lu_env *env,
1511                                    struct llog_handle *llh,
1512                                    char *profile, char *lov_name,
1513                                    char *mdc_name)
1514 {
1515         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1516                            profile, lov_name, mdc_name, NULL);
1517 }
1518
1519 static int record_marker(const struct lu_env *env,
1520                          struct llog_handle *llh,
1521                          struct fs_db *fsdb, __u32 flags,
1522                          char *tgtname, char *comment)
1523 {
1524         struct mgs_thread_info *mgi = mgs_env_info(env);
1525         struct llog_cfg_rec *lcr;
1526         int rc;
1527         int cplen = 0;
1528
1529         if (flags & CM_START)
1530                 fsdb->fsdb_gen++;
1531         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1532         mgi->mgi_marker.cm_flags = flags;
1533         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1534         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1535                         sizeof(mgi->mgi_marker.cm_tgtname));
1536         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1537                 return -E2BIG;
1538         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1539                         sizeof(mgi->mgi_marker.cm_comment));
1540         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1541                 return -E2BIG;
1542         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1543         mgi->mgi_marker.cm_canceltime = 0;
1544         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1545         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1546                             sizeof(mgi->mgi_marker));
1547         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1548         if (lcr == NULL)
1549                 return -ENOMEM;
1550
1551         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1552         lustre_cfg_rec_free(lcr);
1553         return rc;
1554 }
1555
1556 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1557                             struct llog_handle **llh, char *name)
1558 {
1559         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1560         struct llog_ctxt        *ctxt;
1561         int                      rc = 0;
1562         ENTRY;
1563
1564         if (*llh)
1565                 GOTO(out, rc = -EBUSY);
1566
1567         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1568         if (!ctxt)
1569                 GOTO(out, rc = -ENODEV);
1570         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1571
1572         rc = llog_open_create(env, ctxt, llh, NULL, name);
1573         if (rc)
1574                 GOTO(out_ctxt, rc);
1575         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1576         if (rc)
1577                 llog_close(env, *llh);
1578 out_ctxt:
1579         llog_ctxt_put(ctxt);
1580 out:
1581         if (rc) {
1582                 CERROR("%s: can't start log %s: rc = %d\n",
1583                        mgs->mgs_obd->obd_name, name, rc);
1584                 *llh = NULL;
1585         }
1586         RETURN(rc);
1587 }
1588
1589 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1590 {
1591         int rc;
1592
1593         rc = llog_close(env, *llh);
1594         *llh = NULL;
1595
1596         return rc;
1597 }
1598
1599 /******************** config "macros" *********************/
1600
1601 /* write an lcfg directly into a log (with markers) */
1602 static int mgs_write_log_direct(const struct lu_env *env,
1603                                 struct mgs_device *mgs, struct fs_db *fsdb,
1604                                 char *logname, struct llog_cfg_rec *lcr,
1605                                 char *devname, char *comment)
1606 {
1607         struct llog_handle *llh = NULL;
1608         int rc;
1609
1610         ENTRY;
1611
1612         rc = record_start_log(env, mgs, &llh, logname);
1613         if (rc)
1614                 RETURN(rc);
1615
1616         /* FIXME These should be a single journal transaction */
1617         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1618         if (rc)
1619                 GOTO(out_end, rc);
1620         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1621         if (rc)
1622                 GOTO(out_end, rc);
1623         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1624         if (rc)
1625                 GOTO(out_end, rc);
1626 out_end:
1627         record_end_log(env, &llh);
1628         RETURN(rc);
1629 }
1630
1631 /* write the lcfg in all logs for the given fs */
1632 static int mgs_write_log_direct_all(const struct lu_env *env,
1633                                     struct mgs_device *mgs,
1634                                     struct fs_db *fsdb,
1635                                     struct mgs_target_info *mti,
1636                                     struct llog_cfg_rec *lcr, char *devname,
1637                                     char *comment, int server_only)
1638 {
1639         struct list_head         log_list;
1640         struct mgs_direntry     *dirent, *n;
1641         char                    *fsname = mti->mti_fsname;
1642         int                      rc = 0, len = strlen(fsname);
1643
1644         ENTRY;
1645         /* Find all the logs in the CONFIGS directory */
1646         rc = class_dentry_readdir(env, mgs, &log_list);
1647         if (rc)
1648                 RETURN(rc);
1649
1650         /* Could use fsdb index maps instead of directory listing */
1651         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1652                 list_del_init(&dirent->mde_list);
1653                 /* don't write to sptlrpc rule log */
1654                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1655                         goto next;
1656
1657                 /* caller wants write server logs only */
1658                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1659                         goto next;
1660
1661                 if (strlen(dirent->mde_name) <= len ||
1662                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1663                     dirent->mde_name[len] != '-')
1664                         goto next;
1665
1666                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1667                 /* Erase any old settings of this same parameter */
1668                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1669                                 devname, comment, CM_SKIP);
1670                 if (rc < 0)
1671                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1672                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1673                 if (lcr == NULL)
1674                         goto next;
1675                 /* Write the new one */
1676                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1677                                           lcr, devname, comment);
1678                 if (rc != 0)
1679                         CERROR("%s: writing log %s: rc = %d\n",
1680                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1681 next:
1682                 mgs_direntry_free(dirent);
1683         }
1684
1685         RETURN(rc);
1686 }
1687
1688 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1689                                     struct mgs_device *mgs,
1690                                     struct fs_db *fsdb,
1691                                     struct mgs_target_info *mti,
1692                                     int index, char *logname);
1693 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1694                                     struct mgs_device *mgs,
1695                                     struct fs_db *fsdb,
1696                                     struct mgs_target_info *mti,
1697                                     char *logname, char *suffix, char *lovname,
1698                                     enum lustre_sec_part sec_part, int flags);
1699 static int name_create_mdt_and_lov(char **logname, char **lovname,
1700                                    struct fs_db *fsdb, int i);
1701
1702 static int add_param(char *params, char *key, char *val)
1703 {
1704         char *start = params + strlen(params);
1705         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1706         int keylen = 0;
1707
1708         if (key != NULL)
1709                 keylen = strlen(key);
1710         if (start + 1 + keylen + strlen(val) >= end) {
1711                 CERROR("params are too long: %s %s%s\n",
1712                        params, key != NULL ? key : "", val);
1713                 return -EINVAL;
1714         }
1715
1716         sprintf(start, " %s%s", key != NULL ? key : "", val);
1717         return 0;
1718 }
1719
1720 /**
1721  * Walk through client config log record and convert the related records
1722  * into the target.
1723  **/
1724 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1725                                          struct llog_handle *llh,
1726                                          struct llog_rec_hdr *rec, void *data)
1727 {
1728         struct mgs_device *mgs;
1729         struct obd_device *obd;
1730         struct mgs_target_info *mti, *tmti;
1731         struct fs_db *fsdb;
1732         int cfg_len = rec->lrh_len;
1733         char *cfg_buf = (char*) (rec + 1);
1734         struct lustre_cfg *lcfg;
1735         int rc = 0;
1736         struct llog_handle *mdt_llh = NULL;
1737         static int got_an_osc_or_mdc = 0;
1738         /* 0: not found any osc/mdc;
1739            1: found osc;
1740            2: found mdc;
1741         */
1742         static int last_step = -1;
1743         int cplen = 0;
1744
1745         ENTRY;
1746
1747         mti = ((struct temp_comp*)data)->comp_mti;
1748         tmti = ((struct temp_comp*)data)->comp_tmti;
1749         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1750         obd = ((struct temp_comp *)data)->comp_obd;
1751         mgs = lu2mgs_dev(obd->obd_lu_dev);
1752         LASSERT(mgs);
1753
1754         if (rec->lrh_type != OBD_CFG_REC) {
1755                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1756                 RETURN(-EINVAL);
1757         }
1758
1759         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1760         if (rc) {
1761                 CERROR("Insane cfg\n");
1762                 RETURN(rc);
1763         }
1764
1765         lcfg = (struct lustre_cfg *)cfg_buf;
1766
1767         if (lcfg->lcfg_command == LCFG_MARKER) {
1768                 struct cfg_marker *marker;
1769                 marker = lustre_cfg_buf(lcfg, 1);
1770                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1771                     (marker->cm_flags & CM_START) &&
1772                      !(marker->cm_flags & CM_SKIP)) {
1773                         got_an_osc_or_mdc = 1;
1774                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1775                                         sizeof(tmti->mti_svname));
1776                         if (cplen >= sizeof(tmti->mti_svname))
1777                                 RETURN(-E2BIG);
1778                         rc = record_start_log(env, mgs, &mdt_llh,
1779                                               mti->mti_svname);
1780                         if (rc)
1781                                 RETURN(rc);
1782                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1783                                            mti->mti_svname, "add osc(copied)");
1784                         record_end_log(env, &mdt_llh);
1785                         last_step = marker->cm_step;
1786                         RETURN(rc);
1787                 }
1788                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1789                     (marker->cm_flags & CM_END) &&
1790                      !(marker->cm_flags & CM_SKIP)) {
1791                         LASSERT(last_step == marker->cm_step);
1792                         last_step = -1;
1793                         got_an_osc_or_mdc = 0;
1794                         memset(tmti, 0, sizeof(*tmti));
1795                         rc = record_start_log(env, mgs, &mdt_llh,
1796                                               mti->mti_svname);
1797                         if (rc)
1798                                 RETURN(rc);
1799                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1800                                            mti->mti_svname, "add osc(copied)");
1801                         record_end_log(env, &mdt_llh);
1802                         RETURN(rc);
1803                 }
1804                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1805                     (marker->cm_flags & CM_START) &&
1806                      !(marker->cm_flags & CM_SKIP)) {
1807                         got_an_osc_or_mdc = 2;
1808                         last_step = marker->cm_step;
1809                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1810                                strlen(marker->cm_tgtname));
1811
1812                         RETURN(rc);
1813                 }
1814                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1815                     (marker->cm_flags & CM_END) &&
1816                      !(marker->cm_flags & CM_SKIP)) {
1817                         LASSERT(last_step == marker->cm_step);
1818                         last_step = -1;
1819                         got_an_osc_or_mdc = 0;
1820                         memset(tmti, 0, sizeof(*tmti));
1821                         RETURN(rc);
1822                 }
1823         }
1824
1825         if (got_an_osc_or_mdc == 0 || last_step < 0)
1826                 RETURN(rc);
1827
1828         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1829                 __u64 nodenid = lcfg->lcfg_nid;
1830
1831                 if (strlen(tmti->mti_uuid) == 0) {
1832                         /* target uuid not set, this config record is before
1833                          * LCFG_SETUP, this nid is one of target node nid.
1834                          */
1835                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1836                         tmti->mti_nid_count++;
1837                 } else {
1838                         char nidstr[LNET_NIDSTR_SIZE];
1839
1840                         /* failover node nid */
1841                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1842                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1843                                         nidstr);
1844                 }
1845
1846                 RETURN(rc);
1847         }
1848
1849         if (lcfg->lcfg_command == LCFG_SETUP) {
1850                 char *target;
1851
1852                 target = lustre_cfg_string(lcfg, 1);
1853                 memcpy(tmti->mti_uuid, target, strlen(target));
1854                 RETURN(rc);
1855         }
1856
1857         /* ignore client side sptlrpc_conf_log */
1858         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1859                 RETURN(rc);
1860
1861         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1862                 int index;
1863
1864                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1865                         RETURN (-EINVAL);
1866
1867                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1868                        strlen(mti->mti_fsname));
1869                 tmti->mti_stripe_index = index;
1870
1871                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1872                                               mti->mti_stripe_index,
1873                                               mti->mti_svname);
1874                 memset(tmti, 0, sizeof(*tmti));
1875                 RETURN(rc);
1876         }
1877
1878         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1879                 int index;
1880                 char mdt_index[9];
1881                 char *logname, *lovname;
1882
1883                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1884                                              mti->mti_stripe_index);
1885                 if (rc)
1886                         RETURN(rc);
1887                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1888
1889                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1890                         name_destroy(&logname);
1891                         name_destroy(&lovname);
1892                         RETURN(-EINVAL);
1893                 }
1894
1895                 tmti->mti_stripe_index = index;
1896                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1897                                          mdt_index, lovname,
1898                                          LUSTRE_SP_MDT, 0);
1899                 name_destroy(&logname);
1900                 name_destroy(&lovname);
1901                 RETURN(rc);
1902         }
1903         RETURN(rc);
1904 }
1905
1906 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1907 /* stealed from mgs_get_fsdb_from_llog*/
1908 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1909                                               struct mgs_device *mgs,
1910                                               char *client_name,
1911                                               struct temp_comp* comp)
1912 {
1913         struct llog_handle *loghandle;
1914         struct mgs_target_info *tmti;
1915         struct llog_ctxt *ctxt;
1916         int rc;
1917
1918         ENTRY;
1919
1920         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1921         LASSERT(ctxt != NULL);
1922
1923         OBD_ALLOC_PTR(tmti);
1924         if (tmti == NULL)
1925                 GOTO(out_ctxt, rc = -ENOMEM);
1926
1927         comp->comp_tmti = tmti;
1928         comp->comp_obd = mgs->mgs_obd;
1929
1930         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1931                        LLOG_OPEN_EXISTS);
1932         if (rc < 0) {
1933                 if (rc == -ENOENT)
1934                         rc = 0;
1935                 GOTO(out_pop, rc);
1936         }
1937
1938         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1939         if (rc)
1940                 GOTO(out_close, rc);
1941
1942         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1943                                   (void *)comp, NULL, false);
1944         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1945 out_close:
1946         llog_close(env, loghandle);
1947 out_pop:
1948         OBD_FREE_PTR(tmti);
1949 out_ctxt:
1950         llog_ctxt_put(ctxt);
1951         RETURN(rc);
1952 }
1953
1954 /* lmv is the second thing for client logs */
1955 /* copied from mgs_write_log_lov. Please refer to that.  */
1956 static int mgs_write_log_lmv(const struct lu_env *env,
1957                              struct mgs_device *mgs,
1958                              struct fs_db *fsdb,
1959                              struct mgs_target_info *mti,
1960                              char *logname, char *lmvname)
1961 {
1962         struct llog_handle *llh = NULL;
1963         struct lmv_desc *lmvdesc;
1964         char *uuid;
1965         int rc = 0;
1966         ENTRY;
1967
1968         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1969
1970         OBD_ALLOC_PTR(lmvdesc);
1971         if (lmvdesc == NULL)
1972                 RETURN(-ENOMEM);
1973         lmvdesc->ld_active_tgt_count = 0;
1974         lmvdesc->ld_tgt_count = 0;
1975         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1976         uuid = (char *)lmvdesc->ld_uuid.uuid;
1977
1978         rc = record_start_log(env, mgs, &llh, logname);
1979         if (rc)
1980                 GOTO(out_free, rc);
1981         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1982         if (rc)
1983                 GOTO(out_end, rc);
1984         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1991         if (rc)
1992                 GOTO(out_end, rc);
1993 out_end:
1994         record_end_log(env, &llh);
1995 out_free:
1996         OBD_FREE_PTR(lmvdesc);
1997         RETURN(rc);
1998 }
1999
2000 /* lov is the first thing in the mdt and client logs */
2001 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2002                              struct fs_db *fsdb, struct mgs_target_info *mti,
2003                              char *logname, char *lovname)
2004 {
2005         struct llog_handle *llh = NULL;
2006         struct lov_desc *lovdesc;
2007         char *uuid;
2008         int rc = 0;
2009         ENTRY;
2010
2011         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2012
2013         /*
2014         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2015         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2016               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2017         */
2018
2019         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2020         OBD_ALLOC_PTR(lovdesc);
2021         if (lovdesc == NULL)
2022                 RETURN(-ENOMEM);
2023         lovdesc->ld_magic = LOV_DESC_MAGIC;
2024         lovdesc->ld_tgt_count = 0;
2025         /* Defaults.  Can be changed later by lcfg config_param */
2026         lovdesc->ld_default_stripe_count = 1;
2027         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2028         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2029         lovdesc->ld_default_stripe_offset = -1;
2030         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2031         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2032         /* can these be the same? */
2033         uuid = (char *)lovdesc->ld_uuid.uuid;
2034
2035         /* This should always be the first entry in a log.
2036         rc = mgs_clear_log(obd, logname); */
2037         rc = record_start_log(env, mgs, &llh, logname);
2038         if (rc)
2039                 GOTO(out_free, rc);
2040         /* FIXME these should be a single journal transaction */
2041         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2042         if (rc)
2043                 GOTO(out_end, rc);
2044         rc = record_attach(env, llh, lovname, "lov", uuid);
2045         if (rc)
2046                 GOTO(out_end, rc);
2047         rc = record_lov_setup(env, llh, lovname, lovdesc);
2048         if (rc)
2049                 GOTO(out_end, rc);
2050         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2051         if (rc)
2052                 GOTO(out_end, rc);
2053         EXIT;
2054 out_end:
2055         record_end_log(env, &llh);
2056 out_free:
2057         OBD_FREE_PTR(lovdesc);
2058         return rc;
2059 }
2060
2061 /* add failnids to open log */
2062 static int mgs_write_log_failnids(const struct lu_env *env,
2063                                   struct mgs_target_info *mti,
2064                                   struct llog_handle *llh,
2065                                   char *cliname)
2066 {
2067         char *failnodeuuid = NULL;
2068         char *ptr = mti->mti_params;
2069         lnet_nid_t nid;
2070         int rc = 0;
2071
2072         /*
2073         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2074         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2075         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2076         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2077         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2078         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2079         */
2080
2081         /*
2082          * Pull failnid info out of params string, which may contain something
2083          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2084          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2085          * etc.  However, convert_hostnames() should have caught those.
2086          */
2087         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2088                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2089                         char nidstr[LNET_NIDSTR_SIZE];
2090
2091                         if (failnodeuuid == NULL) {
2092                                 /* We don't know the failover node name,
2093                                  * so just use the first nid as the uuid */
2094                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2095                                 rc = name_create(&failnodeuuid, nidstr, "");
2096                                 if (rc != 0)
2097                                         return rc;
2098                         }
2099                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2100                                 "client %s\n",
2101                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2102                                 failnodeuuid, cliname);
2103                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2104                         /*
2105                          * If *ptr is ':', we have added all NIDs for
2106                          * failnodeuuid.
2107                          */
2108                         if (*ptr == ':') {
2109                                 rc = record_add_conn(env, llh, cliname,
2110                                                      failnodeuuid);
2111                                 name_destroy(&failnodeuuid);
2112                                 failnodeuuid = NULL;
2113                         }
2114                 }
2115                 if (failnodeuuid) {
2116                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2117                         name_destroy(&failnodeuuid);
2118                         failnodeuuid = NULL;
2119                 }
2120         }
2121
2122         return rc;
2123 }
2124
2125 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2126                                     struct mgs_device *mgs,
2127                                     struct fs_db *fsdb,
2128                                     struct mgs_target_info *mti,
2129                                     char *logname, char *lmvname)
2130 {
2131         struct llog_handle *llh = NULL;
2132         char *mdcname = NULL;
2133         char *nodeuuid = NULL;
2134         char *mdcuuid = NULL;
2135         char *lmvuuid = NULL;
2136         char index[6];
2137         char nidstr[LNET_NIDSTR_SIZE];
2138         int i, rc;
2139         ENTRY;
2140
2141         if (mgs_log_is_empty(env, mgs, logname)) {
2142                 CERROR("log is empty! Logical error\n");
2143                 RETURN(-EINVAL);
2144         }
2145
2146         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2147                mti->mti_svname, logname, lmvname);
2148
2149         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2150         rc = name_create(&nodeuuid, nidstr, "");
2151         if (rc)
2152                 RETURN(rc);
2153         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2154         if (rc)
2155                 GOTO(out_free, rc);
2156         rc = name_create(&mdcuuid, mdcname, "_UUID");
2157         if (rc)
2158                 GOTO(out_free, rc);
2159         rc = name_create(&lmvuuid, lmvname, "_UUID");
2160         if (rc)
2161                 GOTO(out_free, rc);
2162
2163         rc = record_start_log(env, mgs, &llh, logname);
2164         if (rc)
2165                 GOTO(out_free, rc);
2166         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2167                            "add mdc");
2168         if (rc)
2169                 GOTO(out_end, rc);
2170         for (i = 0; i < mti->mti_nid_count; i++) {
2171                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2172                         libcfs_nid2str_r(mti->mti_nids[i],
2173                                          nidstr, sizeof(nidstr)));
2174
2175                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2176                 if (rc)
2177                         GOTO(out_end, rc);
2178         }
2179
2180         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2181         if (rc)
2182                 GOTO(out_end, rc);
2183         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2184                           NULL, NULL);
2185         if (rc)
2186                 GOTO(out_end, rc);
2187         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2188         if (rc)
2189                 GOTO(out_end, rc);
2190         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2191         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2192                             index, "1");
2193         if (rc)
2194                 GOTO(out_end, rc);
2195         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2196                            "add mdc");
2197         if (rc)
2198                 GOTO(out_end, rc);
2199 out_end:
2200         record_end_log(env, &llh);
2201 out_free:
2202         name_destroy(&lmvuuid);
2203         name_destroy(&mdcuuid);
2204         name_destroy(&mdcname);
2205         name_destroy(&nodeuuid);
2206         RETURN(rc);
2207 }
2208
2209 static inline int name_create_lov(char **lovname, char *mdtname,
2210                                   struct fs_db *fsdb, int index)
2211 {
2212         /* COMPAT_180 */
2213         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2214                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2215         else
2216                 return name_create(lovname, mdtname, "-mdtlov");
2217 }
2218
2219 static int name_create_mdt_and_lov(char **logname, char **lovname,
2220                                    struct fs_db *fsdb, int i)
2221 {
2222         int rc;
2223
2224         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2225         if (rc)
2226                 return rc;
2227         /* COMPAT_180 */
2228         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2229                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2230         else
2231                 rc = name_create(lovname, *logname, "-mdtlov");
2232         if (rc) {
2233                 name_destroy(logname);
2234                 *logname = NULL;
2235         }
2236         return rc;
2237 }
2238
2239 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2240                                       struct fs_db *fsdb, int i)
2241 {
2242         char suffix[16];
2243
2244         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2245                 sprintf(suffix, "-osc");
2246         else
2247                 sprintf(suffix, "-osc-MDT%04x", i);
2248         return name_create(oscname, ostname, suffix);
2249 }
2250
2251 /* add new mdc to already existent MDS */
2252 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2253                                     struct mgs_device *mgs,
2254                                     struct fs_db *fsdb,
2255                                     struct mgs_target_info *mti,
2256                                     int mdt_index, char *logname)
2257 {
2258         struct llog_handle      *llh = NULL;
2259         char    *nodeuuid = NULL;
2260         char    *ospname = NULL;
2261         char    *lovuuid = NULL;
2262         char    *mdtuuid = NULL;
2263         char    *svname = NULL;
2264         char    *mdtname = NULL;
2265         char    *lovname = NULL;
2266         char    index_str[16];
2267         char    nidstr[LNET_NIDSTR_SIZE];
2268         int     i, rc;
2269
2270         ENTRY;
2271         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2272                 CERROR("log is empty! Logical error\n");
2273                 RETURN (-EINVAL);
2274         }
2275
2276         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2277                logname);
2278
2279         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2280         if (rc)
2281                 RETURN(rc);
2282
2283         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2284         rc = name_create(&nodeuuid, nidstr, "");
2285         if (rc)
2286                 GOTO(out_destory, rc);
2287
2288         rc = name_create(&svname, mdtname, "-osp");
2289         if (rc)
2290                 GOTO(out_destory, rc);
2291
2292         sprintf(index_str, "-MDT%04x", mdt_index);
2293         rc = name_create(&ospname, svname, index_str);
2294         if (rc)
2295                 GOTO(out_destory, rc);
2296
2297         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2298         if (rc)
2299                 GOTO(out_destory, rc);
2300
2301         rc = name_create(&lovuuid, lovname, "_UUID");
2302         if (rc)
2303                 GOTO(out_destory, rc);
2304
2305         rc = name_create(&mdtuuid, mdtname, "_UUID");
2306         if (rc)
2307                 GOTO(out_destory, rc);
2308
2309         rc = record_start_log(env, mgs, &llh, logname);
2310         if (rc)
2311                 GOTO(out_destory, rc);
2312
2313         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2314                            "add osp");
2315         if (rc)
2316                 GOTO(out_destory, rc);
2317
2318         for (i = 0; i < mti->mti_nid_count; i++) {
2319                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2320                         libcfs_nid2str_r(mti->mti_nids[i],
2321                                          nidstr, sizeof(nidstr)));
2322                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2323                 if (rc)
2324                         GOTO(out_end, rc);
2325         }
2326
2327         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2328         if (rc)
2329                 GOTO(out_end, rc);
2330
2331         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2332                           NULL, NULL);
2333         if (rc)
2334                 GOTO(out_end, rc);
2335
2336         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2337         if (rc)
2338                 GOTO(out_end, rc);
2339
2340         /* Add mdc(osp) to lod */
2341         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2342         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2343                          index_str, "1", NULL);
2344         if (rc)
2345                 GOTO(out_end, rc);
2346
2347         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2348         if (rc)
2349                 GOTO(out_end, rc);
2350
2351 out_end:
2352         record_end_log(env, &llh);
2353
2354 out_destory:
2355         name_destroy(&mdtuuid);
2356         name_destroy(&lovuuid);
2357         name_destroy(&lovname);
2358         name_destroy(&ospname);
2359         name_destroy(&svname);
2360         name_destroy(&nodeuuid);
2361         name_destroy(&mdtname);
2362         RETURN(rc);
2363 }
2364
2365 static int mgs_write_log_mdt0(const struct lu_env *env,
2366                               struct mgs_device *mgs,
2367                               struct fs_db *fsdb,
2368                               struct mgs_target_info *mti)
2369 {
2370         char *log = mti->mti_svname;
2371         struct llog_handle *llh = NULL;
2372         char *uuid, *lovname;
2373         char mdt_index[6];
2374         char *ptr = mti->mti_params;
2375         int rc = 0, failout = 0;
2376         ENTRY;
2377
2378         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2379         if (uuid == NULL)
2380                 RETURN(-ENOMEM);
2381
2382         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2383                 failout = (strncmp(ptr, "failout", 7) == 0);
2384
2385         rc = name_create(&lovname, log, "-mdtlov");
2386         if (rc)
2387                 GOTO(out_free, rc);
2388         if (mgs_log_is_empty(env, mgs, log)) {
2389                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2390                 if (rc)
2391                         GOTO(out_lod, rc);
2392         }
2393
2394         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2395
2396         rc = record_start_log(env, mgs, &llh, log);
2397         if (rc)
2398                 GOTO(out_lod, rc);
2399
2400         /* add MDT itself */
2401
2402         /* FIXME this whole fn should be a single journal transaction */
2403         sprintf(uuid, "%s_UUID", log);
2404         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2405         if (rc)
2406                 GOTO(out_lod, rc);
2407         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2408         if (rc)
2409                 GOTO(out_end, rc);
2410         rc = record_mount_opt(env, llh, log, lovname, NULL);
2411         if (rc)
2412                 GOTO(out_end, rc);
2413         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2414                         failout ? "n" : "f");
2415         if (rc)
2416                 GOTO(out_end, rc);
2417         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2418         if (rc)
2419                 GOTO(out_end, rc);
2420 out_end:
2421         record_end_log(env, &llh);
2422 out_lod:
2423         name_destroy(&lovname);
2424 out_free:
2425         OBD_FREE(uuid, sizeof(struct obd_uuid));
2426         RETURN(rc);
2427 }
2428
2429 /* envelope method for all layers log */
2430 static int mgs_write_log_mdt(const struct lu_env *env,
2431                              struct mgs_device *mgs,
2432                              struct fs_db *fsdb,
2433                              struct mgs_target_info *mti)
2434 {
2435         struct mgs_thread_info *mgi = mgs_env_info(env);
2436         struct llog_handle *llh = NULL;
2437         char *cliname;
2438         int rc, i = 0;
2439         ENTRY;
2440
2441         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2442
2443         if (mti->mti_uuid[0] == '\0') {
2444                 /* Make up our own uuid */
2445                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2446                          "%s_UUID", mti->mti_svname);
2447         }
2448
2449         /* add mdt */
2450         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2451         if (rc)
2452                 RETURN(rc);
2453         /* Append the mdt info to the client log */
2454         rc = name_create(&cliname, mti->mti_fsname, "-client");
2455         if (rc)
2456                 RETURN(rc);
2457
2458         if (mgs_log_is_empty(env, mgs, cliname)) {
2459                 /* Start client log */
2460                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2461                                        fsdb->fsdb_clilov);
2462                 if (rc)
2463                         GOTO(out_free, rc);
2464                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2465                                        fsdb->fsdb_clilmv);
2466                 if (rc)
2467                         GOTO(out_free, rc);
2468         }
2469
2470         /*
2471         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2472         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2473         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2474         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2475         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2476         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2477         */
2478
2479         /* copy client info about lov/lmv */
2480         mgi->mgi_comp.comp_mti = mti;
2481         mgi->mgi_comp.comp_fsdb = fsdb;
2482
2483         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2484                                                 &mgi->mgi_comp);
2485         if (rc)
2486                 GOTO(out_free, rc);
2487         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2488                                       fsdb->fsdb_clilmv);
2489         if (rc)
2490                 GOTO(out_free, rc);
2491
2492         /* add mountopts */
2493         rc = record_start_log(env, mgs, &llh, cliname);
2494         if (rc)
2495                 GOTO(out_free, rc);
2496
2497         rc = record_marker(env, llh, fsdb, CM_START, cliname,
2498                            "mount opts");
2499         if (rc)
2500                 GOTO(out_end, rc);
2501         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2502                               fsdb->fsdb_clilmv);
2503         if (rc)
2504                 GOTO(out_end, rc);
2505         rc = record_marker(env, llh, fsdb, CM_END, cliname,
2506                            "mount opts");
2507
2508         if (rc)
2509                 GOTO(out_end, rc);
2510
2511         /* for_all_existing_mdt except current one */
2512         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2513                 if (i !=  mti->mti_stripe_index &&
2514                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2515                         char *logname;
2516
2517                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2518                         if (rc)
2519                                 GOTO(out_end, rc);
2520
2521                         /* NB: If the log for the MDT is empty, it means
2522                          * the MDT is only added to the index
2523                          * map, and not being process yet, i.e. this
2524                          * is an unregistered MDT, see mgs_write_log_target().
2525                          * so we should skip it. Otherwise
2526                          *
2527                          * 1. MGS get register request for MDT1 and MDT2.
2528                          *
2529                          * 2. Then both MDT1 and MDT2 are added into
2530                          * fsdb_mdt_index_map. (see mgs_set_index()).
2531                          *
2532                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2533                          * generate the config log, here, it will regard MDT2
2534                          * as an existent MDT, and generate "add osp" for
2535                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2536                          * MDT0002 config log is still empty, so it will
2537                          * add "add osp" even before "lov setup", which
2538                          * will definitly cause trouble.
2539                          *
2540                          * 4. MDT1 registeration finished, fsdb_mutex is
2541                          * released, then MDT2 get in, then in above
2542                          * mgs_steal_llog_for_mdt_from_client(), it will
2543                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2544                          * which will cause another trouble.*/
2545                         if (!mgs_log_is_empty(env, mgs, logname))
2546                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2547                                                               mti, i, logname);
2548
2549                         name_destroy(&logname);
2550                         if (rc)
2551                                 GOTO(out_end, rc);
2552                 }
2553         }
2554 out_end:
2555         record_end_log(env, &llh);
2556 out_free:
2557         name_destroy(&cliname);
2558         RETURN(rc);
2559 }
2560
2561 /* Add the ost info to the client/mdt lov */
2562 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2563                                     struct mgs_device *mgs, struct fs_db *fsdb,
2564                                     struct mgs_target_info *mti,
2565                                     char *logname, char *suffix, char *lovname,
2566                                     enum lustre_sec_part sec_part, int flags)
2567 {
2568         struct llog_handle *llh = NULL;
2569         char *nodeuuid = NULL;
2570         char *oscname = NULL;
2571         char *oscuuid = NULL;
2572         char *lovuuid = NULL;
2573         char *svname = NULL;
2574         char index[6];
2575         char nidstr[LNET_NIDSTR_SIZE];
2576         int i, rc;
2577         ENTRY;
2578
2579         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2580                 mti->mti_svname, logname);
2581
2582         if (mgs_log_is_empty(env, mgs, logname)) {
2583                 CERROR("log is empty! Logical error\n");
2584                 RETURN(-EINVAL);
2585         }
2586
2587         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2588         rc = name_create(&nodeuuid, nidstr, "");
2589         if (rc)
2590                 RETURN(rc);
2591         rc = name_create(&svname, mti->mti_svname, "-osc");
2592         if (rc)
2593                 GOTO(out_free, rc);
2594
2595         /* for the system upgraded from old 1.8, keep using the old osc naming
2596          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2597         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2598                 rc = name_create(&oscname, svname, "");
2599         else
2600                 rc = name_create(&oscname, svname, suffix);
2601         if (rc)
2602                 GOTO(out_free, rc);
2603
2604         rc = name_create(&oscuuid, oscname, "_UUID");
2605         if (rc)
2606                 GOTO(out_free, rc);
2607         rc = name_create(&lovuuid, lovname, "_UUID");
2608         if (rc)
2609                 GOTO(out_free, rc);
2610
2611
2612         /*
2613         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2614         multihomed (#4)
2615         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2616         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2617         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2618         failover (#6,7)
2619         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2620         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2621         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2622         */
2623
2624         rc = record_start_log(env, mgs, &llh, logname);
2625         if (rc)
2626                 GOTO(out_free, rc);
2627
2628         /* FIXME these should be a single journal transaction */
2629         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2630                            "add osc");
2631         if (rc)
2632                 GOTO(out_end, rc);
2633
2634         /* NB: don't change record order, because upon MDT steal OSC config
2635          * from client, it treats all nids before LCFG_SETUP as target nids
2636          * (multiple interfaces), while nids after as failover node nids.
2637          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2638          */
2639         for (i = 0; i < mti->mti_nid_count; i++) {
2640                 CDEBUG(D_MGS, "add nid %s\n",
2641                         libcfs_nid2str_r(mti->mti_nids[i],
2642                                          nidstr, sizeof(nidstr)));
2643                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2644                 if (rc)
2645                         GOTO(out_end, rc);
2646         }
2647         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2648         if (rc)
2649                 GOTO(out_end, rc);
2650         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2651                           NULL, NULL);
2652         if (rc)
2653                 GOTO(out_end, rc);
2654         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2655         if (rc)
2656                 GOTO(out_end, rc);
2657
2658         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2659
2660         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2661         if (rc)
2662                 GOTO(out_end, rc);
2663         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2664                            "add osc");
2665         if (rc)
2666                 GOTO(out_end, rc);
2667 out_end:
2668         record_end_log(env, &llh);
2669 out_free:
2670         name_destroy(&lovuuid);
2671         name_destroy(&oscuuid);
2672         name_destroy(&oscname);
2673         name_destroy(&svname);
2674         name_destroy(&nodeuuid);
2675         RETURN(rc);
2676 }
2677
2678 static int mgs_write_log_ost(const struct lu_env *env,
2679                              struct mgs_device *mgs, struct fs_db *fsdb,
2680                              struct mgs_target_info *mti)
2681 {
2682         struct llog_handle *llh = NULL;
2683         char *logname, *lovname;
2684         char *ptr = mti->mti_params;
2685         int rc, flags = 0, failout = 0, i;
2686         ENTRY;
2687
2688         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2689
2690         /* The ost startup log */
2691
2692         /* If the ost log already exists, that means that someone reformatted
2693            the ost and it called target_add again. */
2694         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2695                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2696                                    "exists, yet the server claims it never "
2697                                    "registered. It may have been reformatted, "
2698                                    "or the index changed. writeconf the MDT to "
2699                                    "regenerate all logs.\n", mti->mti_svname);
2700                 RETURN(-EALREADY);
2701         }
2702
2703         /*
2704         attach obdfilter ost1 ost1_UUID
2705         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2706         */
2707         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2708                 failout = (strncmp(ptr, "failout", 7) == 0);
2709         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2710         if (rc)
2711                 RETURN(rc);
2712         /* FIXME these should be a single journal transaction */
2713         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2714         if (rc)
2715                 GOTO(out_end, rc);
2716         if (*mti->mti_uuid == '\0')
2717                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2718                          "%s_UUID", mti->mti_svname);
2719         rc = record_attach(env, llh, mti->mti_svname,
2720                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2721         if (rc)
2722                 GOTO(out_end, rc);
2723         rc = record_setup(env, llh, mti->mti_svname,
2724                           "dev"/*ignored*/, "type"/*ignored*/,
2725                           failout ? "n" : "f", NULL/*options*/);
2726         if (rc)
2727                 GOTO(out_end, rc);
2728         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2729         if (rc)
2730                 GOTO(out_end, rc);
2731 out_end:
2732         record_end_log(env, &llh);
2733         if (rc)
2734                 RETURN(rc);
2735         /* We also have to update the other logs where this osc is part of
2736            the lov */
2737
2738         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2739                 /* If we're upgrading, the old mdt log already has our
2740                    entry. Let's do a fake one for fun. */
2741                 /* Note that we can't add any new failnids, since we don't
2742                    know the old osc names. */
2743                 flags = CM_SKIP | CM_UPGRADE146;
2744
2745         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2746                 /* If the update flag isn't set, don't update client/mdt
2747                    logs. */
2748                 flags |= CM_SKIP;
2749                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2750                               "the MDT first to regenerate it.\n",
2751                               mti->mti_svname);
2752         }
2753
2754         /* Add ost to all MDT lov defs */
2755         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2756                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2757                         char mdt_index[9];
2758
2759                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2760                                                      i);
2761                         if (rc)
2762                                 RETURN(rc);
2763                         sprintf(mdt_index, "-MDT%04x", i);
2764                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2765                                                       logname, mdt_index,
2766                                                       lovname, LUSTRE_SP_MDT,
2767                                                       flags);
2768                         name_destroy(&logname);
2769                         name_destroy(&lovname);
2770                         if (rc)
2771                                 RETURN(rc);
2772                 }
2773         }
2774
2775         /* Append ost info to the client log */
2776         rc = name_create(&logname, mti->mti_fsname, "-client");
2777         if (rc)
2778                 RETURN(rc);
2779         if (mgs_log_is_empty(env, mgs, logname)) {
2780                 /* Start client log */
2781                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2782                                        fsdb->fsdb_clilov);
2783                 if (rc)
2784                         GOTO(out_free, rc);
2785                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2786                                        fsdb->fsdb_clilmv);
2787                 if (rc)
2788                         GOTO(out_free, rc);
2789         }
2790         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2791                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2792 out_free:
2793         name_destroy(&logname);
2794         RETURN(rc);
2795 }
2796
2797 static __inline__ int mgs_param_empty(char *ptr)
2798 {
2799         char *tmp;
2800
2801         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2802                 return 1;
2803         return 0;
2804 }
2805
2806 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2807                                           struct mgs_device *mgs,
2808                                           struct fs_db *fsdb,
2809                                           struct mgs_target_info *mti,
2810                                           char *logname, char *cliname)
2811 {
2812         int rc;
2813         struct llog_handle *llh = NULL;
2814
2815         if (mgs_param_empty(mti->mti_params)) {
2816                 /* Remove _all_ failnids */
2817                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2818                                 mti->mti_svname, "add failnid", CM_SKIP);
2819                 return rc < 0 ? rc : 0;
2820         }
2821
2822         /* Otherwise failover nids are additive */
2823         rc = record_start_log(env, mgs, &llh, logname);
2824         if (rc)
2825                 return rc;
2826                 /* FIXME this should be a single journal transaction */
2827         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2828                            "add failnid");
2829         if (rc)
2830                 goto out_end;
2831         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2832         if (rc)
2833                 goto out_end;
2834         rc = record_marker(env, llh, fsdb, CM_END,
2835                            mti->mti_svname, "add failnid");
2836 out_end:
2837         record_end_log(env, &llh);
2838         return rc;
2839 }
2840
2841
2842 /* Add additional failnids to an existing log.
2843    The mdc/osc must have been added to logs first */
2844 /* tcp nids must be in dotted-quad ascii -
2845    we can't resolve hostnames from the kernel. */
2846 static int mgs_write_log_add_failnid(const struct lu_env *env,
2847                                      struct mgs_device *mgs,
2848                                      struct fs_db *fsdb,
2849                                      struct mgs_target_info *mti)
2850 {
2851         char *logname, *cliname;
2852         int rc;
2853         ENTRY;
2854
2855         /* FIXME we currently can't erase the failnids
2856          * given when a target first registers, since they aren't part of
2857          * an "add uuid" stanza */
2858
2859         /* Verify that we know about this target */
2860         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2861                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2862                                    "yet. It must be started before failnids "
2863                                    "can be added.\n", mti->mti_svname);
2864                 RETURN(-ENOENT);
2865         }
2866
2867         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2868         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2869                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2870         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2871                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2872         } else {
2873                 RETURN(-EINVAL);
2874         }
2875         if (rc)
2876                 RETURN(rc);
2877         /* Add failover nids to the client log */
2878         rc = name_create(&logname, mti->mti_fsname, "-client");
2879         if (rc) {
2880                 name_destroy(&cliname);
2881                 RETURN(rc);
2882         }
2883         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2884         name_destroy(&logname);
2885         name_destroy(&cliname);
2886         if (rc)
2887                 RETURN(rc);
2888
2889         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2890                 /* Add OST failover nids to the MDT logs as well */
2891                 int i;
2892
2893                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2894                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2895                                 continue;
2896                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2897                         if (rc)
2898                                 RETURN(rc);
2899                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2900                                                  fsdb, i);
2901                         if (rc) {
2902                                 name_destroy(&logname);
2903                                 RETURN(rc);
2904                         }
2905                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2906                                                             mti, logname,
2907                                                             cliname);
2908                         name_destroy(&cliname);
2909                         name_destroy(&logname);
2910                         if (rc)
2911                                 RETURN(rc);
2912                 }
2913         }
2914
2915         RETURN(rc);
2916 }
2917
2918 static int mgs_wlp_lcfg(const struct lu_env *env,
2919                         struct mgs_device *mgs, struct fs_db *fsdb,
2920                         struct mgs_target_info *mti,
2921                         char *logname, struct lustre_cfg_bufs *bufs,
2922                         char *tgtname, char *ptr)
2923 {
2924         char comment[MTI_NAME_MAXLEN];
2925         char *tmp;
2926         struct llog_cfg_rec *lcr;
2927         int rc, del;
2928
2929         /* Erase any old settings of this same parameter */
2930         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2931         comment[MTI_NAME_MAXLEN - 1] = 0;
2932         /* But don't try to match the value. */
2933         tmp = strchr(comment, '=');
2934         if (tmp != NULL)
2935                 *tmp = 0;
2936         /* FIXME we should skip settings that are the same as old values */
2937         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2938         if (rc < 0)
2939                 return rc;
2940         del = mgs_param_empty(ptr);
2941
2942         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2943                       "Setting" : "Modifying", tgtname, comment, logname);
2944         if (del) {
2945                 /* mgs_modify() will return 1 if nothing had to be done */
2946                 if (rc == 1)
2947                         rc = 0;
2948                 return rc;
2949         }
2950
2951         lustre_cfg_bufs_reset(bufs, tgtname);
2952         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2953         if (mti->mti_flags & LDD_F_PARAM2)
2954                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2955
2956         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2957                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2958         if (lcr == NULL)
2959                 return -ENOMEM;
2960
2961         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2962                                   comment);
2963         lustre_cfg_rec_free(lcr);
2964         return rc;
2965 }
2966
2967 static int mgs_write_log_param2(const struct lu_env *env,
2968                                 struct mgs_device *mgs,
2969                                 struct fs_db *fsdb,
2970                                 struct mgs_target_info *mti, char *ptr)
2971 {
2972         struct lustre_cfg_bufs  bufs;
2973         int                     rc = 0;
2974         ENTRY;
2975
2976         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2977         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2978                           mti->mti_svname, ptr);
2979
2980         RETURN(rc);
2981 }
2982
2983 /* write global variable settings into log */
2984 static int mgs_write_log_sys(const struct lu_env *env,
2985                              struct mgs_device *mgs, struct fs_db *fsdb,
2986                              struct mgs_target_info *mti, char *sys, char *ptr)
2987 {
2988         struct mgs_thread_info  *mgi = mgs_env_info(env);
2989         struct lustre_cfg       *lcfg;
2990         struct llog_cfg_rec     *lcr;
2991         char *tmp, sep;
2992         int rc, cmd, convert = 1;
2993
2994         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2995                 cmd = LCFG_SET_TIMEOUT;
2996         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2997                 cmd = LCFG_SET_LDLM_TIMEOUT;
2998         /* Check for known params here so we can return error to lctl */
2999         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3000                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3001                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3002                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3003                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3004                 cmd = LCFG_PARAM;
3005         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3006                 convert = 0; /* Don't convert string value to integer */
3007                 cmd = LCFG_PARAM;
3008         } else {
3009                 return -EINVAL;
3010         }
3011
3012         if (mgs_param_empty(ptr))
3013                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3014         else
3015                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3016
3017         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3018         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3019         if (!convert && *tmp != '\0')
3020                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3021         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3022         if (lcr == NULL)
3023                 return -ENOMEM;
3024
3025         lcfg = &lcr->lcr_cfg;
3026         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
3027         /* truncate the comment to the parameter name */
3028         ptr = tmp - 1;
3029         sep = *ptr;
3030         *ptr = '\0';
3031         /* modify all servers and clients */
3032         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3033                                       *tmp == '\0' ? NULL : lcr,
3034                                       mti->mti_fsname, sys, 0);
3035         if (rc == 0 && *tmp != '\0') {
3036                 switch (cmd) {
3037                 case LCFG_SET_TIMEOUT:
3038                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3039                                 class_process_config(lcfg);
3040                         break;
3041                 case LCFG_SET_LDLM_TIMEOUT:
3042                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3043                                 class_process_config(lcfg);
3044                         break;
3045                 default:
3046                         break;
3047                 }
3048         }
3049         *ptr = sep;
3050         lustre_cfg_rec_free(lcr);
3051         return rc;
3052 }
3053
3054 /* write quota settings into log */
3055 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3056                                struct fs_db *fsdb, struct mgs_target_info *mti,
3057                                char *quota, char *ptr)
3058 {
3059         struct mgs_thread_info  *mgi = mgs_env_info(env);
3060         struct llog_cfg_rec     *lcr;
3061         char                    *tmp;
3062         char                     sep;
3063         int                      rc, cmd = LCFG_PARAM;
3064
3065         /* support only 'meta' and 'data' pools so far */
3066         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3067             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3068                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3069                        "& quota.ost are)\n", ptr);
3070                 return -EINVAL;
3071         }
3072
3073         if (*tmp == '\0') {
3074                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3075         } else {
3076                 CDEBUG(D_MGS, "global '%s'\n", quota);
3077
3078                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3079                     strcmp(tmp, "none") != 0) {
3080                         CERROR("enable option(%s) isn't supported\n", tmp);
3081                         return -EINVAL;
3082                 }
3083         }
3084
3085         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3086         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3087         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3088         if (lcr == NULL)
3089                 return -ENOMEM;
3090
3091         /* truncate the comment to the parameter name */
3092         ptr = tmp - 1;
3093         sep = *ptr;
3094         *ptr = '\0';
3095
3096         /* XXX we duplicated quota enable information in all server
3097          *     config logs, it should be moved to a separate config
3098          *     log once we cleanup the config log for global param. */
3099         /* modify all servers */
3100         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3101                                       *tmp == '\0' ? NULL : lcr,
3102                                       mti->mti_fsname, quota, 1);
3103         *ptr = sep;
3104         lustre_cfg_rec_free(lcr);
3105         return rc < 0 ? rc : 0;
3106 }
3107
3108 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3109                                    struct mgs_device *mgs,
3110                                    struct fs_db *fsdb,
3111                                    struct mgs_target_info *mti,
3112                                    char *param)
3113 {
3114         struct mgs_thread_info  *mgi = mgs_env_info(env);
3115         struct llog_cfg_rec     *lcr;
3116         struct llog_handle      *llh = NULL;
3117         char                    *logname;
3118         char                    *comment, *ptr;
3119         int                      rc, len;
3120
3121         ENTRY;
3122
3123         /* get comment */
3124         ptr = strchr(param, '=');
3125         LASSERT(ptr != NULL);
3126         len = ptr - param;
3127
3128         OBD_ALLOC(comment, len + 1);
3129         if (comment == NULL)
3130                 RETURN(-ENOMEM);
3131         strncpy(comment, param, len);
3132         comment[len] = '\0';
3133
3134         /* prepare lcfg */
3135         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3136         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3137         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3138         if (lcr == NULL)
3139                 GOTO(out_comment, rc = -ENOMEM);
3140
3141         /* construct log name */
3142         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3143         if (rc < 0)
3144                 GOTO(out_lcfg, rc);
3145
3146         if (mgs_log_is_empty(env, mgs, logname)) {
3147                 rc = record_start_log(env, mgs, &llh, logname);
3148                 if (rc < 0)
3149                         GOTO(out, rc);
3150                 record_end_log(env, &llh);
3151         }
3152
3153         /* obsolete old one */
3154         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3155                         comment, CM_SKIP);
3156         if (rc < 0)
3157                 GOTO(out, rc);
3158         /* write the new one */
3159         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3160                                   mti->mti_svname, comment);
3161         if (rc)
3162                 CERROR("%s: error writing log %s: rc = %d\n",
3163                        mgs->mgs_obd->obd_name, logname, rc);
3164 out:
3165         name_destroy(&logname);
3166 out_lcfg:
3167         lustre_cfg_rec_free(lcr);
3168 out_comment:
3169         OBD_FREE(comment, len + 1);
3170         RETURN(rc);
3171 }
3172
3173 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3174                                         char *param)
3175 {
3176         char    *ptr;
3177
3178         /* disable the adjustable udesc parameter for now, i.e. use default
3179          * setting that client always ship udesc to MDT if possible. to enable
3180          * it simply remove the following line */
3181         goto error_out;
3182
3183         ptr = strchr(param, '=');
3184         if (ptr == NULL)
3185                 goto error_out;
3186         *ptr++ = '\0';
3187
3188         if (strcmp(param, PARAM_SRPC_UDESC))
3189                 goto error_out;
3190
3191         if (strcmp(ptr, "yes") == 0) {
3192                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3193                 CWARN("Enable user descriptor shipping from client to MDT\n");
3194         } else if (strcmp(ptr, "no") == 0) {
3195                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3196                 CWARN("Disable user descriptor shipping from client to MDT\n");
3197         } else {
3198                 *(ptr - 1) = '=';
3199                 goto error_out;
3200         }
3201         return 0;
3202
3203 error_out:
3204         CERROR("Invalid param: %s\n", param);
3205         return -EINVAL;
3206 }
3207
3208 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3209                                   const char *svname,
3210                                   char *param)
3211 {
3212         struct sptlrpc_rule      rule;
3213         struct sptlrpc_rule_set *rset;
3214         int                      rc;
3215         ENTRY;
3216
3217         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3218                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3219                 RETURN(-EINVAL);
3220         }
3221
3222         if (strncmp(param, PARAM_SRPC_UDESC,
3223                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3224                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3225         }
3226
3227         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3228                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3229                 RETURN(-EINVAL);
3230         }
3231
3232         param += sizeof(PARAM_SRPC_FLVR) - 1;
3233
3234         rc = sptlrpc_parse_rule(param, &rule);
3235         if (rc)
3236                 RETURN(rc);
3237
3238         /* mgs rules implies must be mgc->mgs */
3239         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3240                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3241                      rule.sr_from != LUSTRE_SP_ANY) ||
3242                     (rule.sr_to != LUSTRE_SP_MGS &&
3243                      rule.sr_to != LUSTRE_SP_ANY))
3244                         RETURN(-EINVAL);
3245         }
3246
3247         /* preapre room for this coming rule. svcname format should be:
3248          * - fsname: general rule
3249          * - fsname-tgtname: target-specific rule
3250          */
3251         if (strchr(svname, '-')) {
3252                 struct mgs_tgt_srpc_conf *tgtconf;
3253                 int                       found = 0;
3254
3255                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3256                      tgtconf = tgtconf->mtsc_next) {
3257                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3258                                 found = 1;
3259                                 break;
3260                         }
3261                 }
3262
3263                 if (!found) {
3264                         int name_len;
3265
3266                         OBD_ALLOC_PTR(tgtconf);
3267                         if (tgtconf == NULL)
3268                                 RETURN(-ENOMEM);
3269
3270                         name_len = strlen(svname);
3271
3272                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3273                         if (tgtconf->mtsc_tgt == NULL) {
3274                                 OBD_FREE_PTR(tgtconf);
3275                                 RETURN(-ENOMEM);
3276                         }
3277                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3278
3279                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3280                         fsdb->fsdb_srpc_tgt = tgtconf;
3281                 }
3282
3283                 rset = &tgtconf->mtsc_rset;
3284         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3285                 /* put _mgs related srpc rule directly in mgs ruleset */
3286                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3287         } else {
3288                 rset = &fsdb->fsdb_srpc_gen;
3289         }
3290
3291         rc = sptlrpc_rule_set_merge(rset, &rule);
3292
3293         RETURN(rc);
3294 }
3295
3296 static int mgs_srpc_set_param(const struct lu_env *env,
3297                               struct mgs_device *mgs,
3298                               struct fs_db *fsdb,
3299                               struct mgs_target_info *mti,
3300                               char *param)
3301 {
3302         char                   *copy;
3303         int                     rc, copy_size;
3304         ENTRY;
3305
3306 #ifndef HAVE_GSS
3307         RETURN(-EINVAL);
3308 #endif
3309         /* keep a copy of original param, which could be destroied
3310          * during parsing */
3311         copy_size = strlen(param) + 1;
3312         OBD_ALLOC(copy, copy_size);
3313         if (copy == NULL)
3314                 return -ENOMEM;
3315         memcpy(copy, param, copy_size);
3316
3317         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3318         if (rc)
3319                 goto out_free;
3320
3321         /* previous steps guaranteed the syntax is correct */
3322         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3323         if (rc)
3324                 goto out_free;
3325
3326         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3327                 /*
3328                  * for mgs rules, make them effective immediately.
3329                  */
3330                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3331                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3332                                                  &fsdb->fsdb_srpc_gen);
3333         }
3334
3335 out_free:
3336         OBD_FREE(copy, copy_size);
3337         RETURN(rc);
3338 }
3339
3340 struct mgs_srpc_read_data {
3341         struct fs_db   *msrd_fsdb;
3342         int             msrd_skip;
3343 };
3344
3345 static int mgs_srpc_read_handler(const struct lu_env *env,
3346                                  struct llog_handle *llh,
3347                                  struct llog_rec_hdr *rec, void *data)
3348 {
3349         struct mgs_srpc_read_data *msrd = data;
3350         struct cfg_marker         *marker;
3351         struct lustre_cfg         *lcfg = REC_DATA(rec);
3352         char                      *svname, *param;
3353         int                        cfg_len, rc;
3354         ENTRY;
3355
3356         if (rec->lrh_type != OBD_CFG_REC) {
3357                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3358                 RETURN(-EINVAL);
3359         }
3360
3361         cfg_len = REC_DATA_LEN(rec);
3362
3363         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3364         if (rc) {
3365                 CERROR("Insane cfg\n");
3366                 RETURN(rc);
3367         }
3368
3369         if (lcfg->lcfg_command == LCFG_MARKER) {
3370                 marker = lustre_cfg_buf(lcfg, 1);
3371
3372                 if (marker->cm_flags & CM_START &&
3373                     marker->cm_flags & CM_SKIP)
3374                         msrd->msrd_skip = 1;
3375                 if (marker->cm_flags & CM_END)
3376                         msrd->msrd_skip = 0;
3377
3378                 RETURN(0);
3379         }
3380
3381         if (msrd->msrd_skip)
3382                 RETURN(0);
3383
3384         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3385                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3386                 RETURN(0);
3387         }
3388
3389         svname = lustre_cfg_string(lcfg, 0);
3390         if (svname == NULL) {
3391                 CERROR("svname is empty\n");
3392                 RETURN(0);
3393         }
3394
3395         param = lustre_cfg_string(lcfg, 1);
3396         if (param == NULL) {
3397                 CERROR("param is empty\n");
3398                 RETURN(0);
3399         }
3400
3401         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3402         if (rc)
3403                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3404
3405         RETURN(0);
3406 }
3407
3408 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3409                                 struct mgs_device *mgs,
3410                                 struct fs_db *fsdb)
3411 {
3412         struct llog_handle        *llh = NULL;
3413         struct llog_ctxt          *ctxt;
3414         char                      *logname;
3415         struct mgs_srpc_read_data  msrd;
3416         int                        rc;
3417         ENTRY;
3418
3419         /* construct log name */
3420         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3421         if (rc)
3422                 RETURN(rc);
3423
3424         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3425         LASSERT(ctxt != NULL);
3426
3427         if (mgs_log_is_empty(env, mgs, logname))
3428                 GOTO(out, rc = 0);
3429
3430         rc = llog_open(env, ctxt, &llh, NULL, logname,
3431                        LLOG_OPEN_EXISTS);
3432         if (rc < 0) {
3433                 if (rc == -ENOENT)
3434                         rc = 0;
3435                 GOTO(out, rc);
3436         }
3437
3438         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3439         if (rc)
3440                 GOTO(out_close, rc);
3441
3442         if (llog_get_size(llh) <= 1)
3443                 GOTO(out_close, rc = 0);
3444
3445         msrd.msrd_fsdb = fsdb;
3446         msrd.msrd_skip = 0;
3447
3448         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3449                           NULL);
3450
3451 out_close:
3452         llog_close(env, llh);
3453 out:
3454         llog_ctxt_put(ctxt);
3455         name_destroy(&logname);
3456
3457         if (rc)
3458                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3459         RETURN(rc);
3460 }
3461
3462 /* Permanent settings of all parameters by writing into the appropriate
3463  * configuration logs.
3464  * A parameter with null value ("<param>='\0'") means to erase it out of
3465  * the logs.
3466  */
3467 static int mgs_write_log_param(const struct lu_env *env,
3468                                struct mgs_device *mgs, struct fs_db *fsdb,
3469                                struct mgs_target_info *mti, char *ptr)
3470 {
3471         struct mgs_thread_info *mgi = mgs_env_info(env);
3472         char *logname;
3473         char *tmp;
3474         int rc = 0;
3475         ENTRY;
3476
3477         /* For various parameter settings, we have to figure out which logs
3478            care about them (e.g. both mdt and client for lov settings) */
3479         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3480
3481         /* The params are stored in MOUNT_DATA_FILE and modified via
3482            tunefs.lustre, or set using lctl conf_param */
3483
3484         /* Processed in lustre_start_mgc */
3485         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3486                 GOTO(end, rc);
3487
3488         /* Processed in ost/mdt */
3489         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3490                 GOTO(end, rc);
3491
3492         /* Processed in mgs_write_log_ost */
3493         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3494                 if (mti->mti_flags & LDD_F_PARAM) {
3495                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3496                                            "changed with tunefs.lustre"
3497                                            "and --writeconf\n", ptr);
3498                         rc = -EPERM;
3499                 }
3500                 GOTO(end, rc);
3501         }
3502
3503         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3504                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3505                 GOTO(end, rc);
3506         }
3507
3508         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3509                 /* Add a failover nidlist */
3510                 rc = 0;
3511                 /* We already processed failovers params for new
3512                    targets in mgs_write_log_target */
3513                 if (mti->mti_flags & LDD_F_PARAM) {
3514                         CDEBUG(D_MGS, "Adding failnode\n");
3515                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3516                 }
3517                 GOTO(end, rc);
3518         }
3519
3520         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3521                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3522                 GOTO(end, rc);
3523         }
3524
3525         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3526                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3527                 GOTO(end, rc);
3528         }
3529
3530         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3531             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3532                 /* active=0 means off, anything else means on */
3533                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3534                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3535                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3536                 int i;
3537
3538                 if (!deactive_osc) {
3539                         __u32   index;
3540
3541                         rc = server_name2index(mti->mti_svname, &index, NULL);
3542                         if (rc < 0)
3543                                 GOTO(end, rc);
3544
3545                         if (index == 0) {
3546                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3547                                                    " (de)activated.\n",
3548                                                    mti->mti_svname);
3549                                 GOTO(end, rc = -EINVAL);
3550                         }
3551                 }
3552
3553                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3554                               flag ? "de" : "re", mti->mti_svname);
3555                 /* Modify clilov */
3556                 rc = name_create(&logname, mti->mti_fsname, "-client");
3557                 if (rc < 0)
3558                         GOTO(end, rc);
3559                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3560                                 mti->mti_svname,
3561                                 deactive_osc ? "add osc" : "add mdc", flag);
3562                 name_destroy(&logname);
3563                 if (rc < 0)
3564                         goto active_err;
3565
3566                 /* Modify mdtlov */
3567                 /* Add to all MDT logs for DNE */
3568                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3569                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3570                                 continue;
3571                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3572                         if (rc < 0)
3573                                 GOTO(end, rc);
3574                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3575                                         mti->mti_svname,
3576                                         deactive_osc ? "add osc" : "add osp",
3577                                         flag);
3578                         name_destroy(&logname);
3579                         if (rc < 0)
3580                                 goto active_err;
3581                 }
3582 active_err:
3583                 if (rc < 0) {
3584                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3585                                            "log (%d). No permanent "
3586                                            "changes were made to the "
3587                                            "config log.\n",
3588                                            mti->mti_svname, rc);
3589                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3590                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3591                                                    " because the log"
3592                                                    "is in the old 1.4"
3593                                                    "style. Consider "
3594                                                    " --writeconf to "
3595                                                    "update the logs.\n");
3596                         GOTO(end, rc);
3597                 }
3598                 /* Fall through to osc/mdc proc for deactivating live
3599                    OSC/OSP on running MDT / clients. */
3600         }
3601         /* Below here, let obd's XXX_process_config methods handle it */
3602
3603         /* All lov. in proc */
3604         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3605                 char *mdtlovname;
3606
3607                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3608                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3609                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3610                                            "set on the MDT, not %s. "
3611                                            "Ignoring.\n",
3612                                            mti->mti_svname);
3613                         GOTO(end, rc = 0);
3614                 }
3615
3616                 /* Modify mdtlov */
3617                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3618                         GOTO(end, rc = -ENODEV);
3619
3620                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3621                                              mti->mti_stripe_index);
3622                 if (rc)
3623                         GOTO(end, rc);
3624                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3625                                   &mgi->mgi_bufs, mdtlovname, ptr);
3626                 name_destroy(&logname);
3627                 name_destroy(&mdtlovname);
3628                 if (rc)
3629                         GOTO(end, rc);
3630
3631                 /* Modify clilov */
3632                 rc = name_create(&logname, mti->mti_fsname, "-client");
3633                 if (rc)
3634                         GOTO(end, rc);
3635                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3636                                   fsdb->fsdb_clilov, ptr);
3637                 name_destroy(&logname);
3638                 GOTO(end, rc);
3639         }
3640
3641         /* All osc., mdc., llite. params in proc */
3642         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3643             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3644             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3645                 char *cname;
3646
3647                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3648                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3649                                            " cannot be modified. Consider"
3650                                            " updating the configuration with"
3651                                            " --writeconf\n",
3652                                            mti->mti_svname);
3653                         GOTO(end, rc = -EINVAL);
3654                 }
3655                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3656                         rc = name_create(&cname, mti->mti_fsname, "-client");
3657                         /* Add the client type to match the obdname in
3658                            class_config_llog_handler */
3659                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3660                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3661                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3662                         rc = name_create(&cname, mti->mti_svname, "-osc");
3663                 } else {
3664                         GOTO(end, rc = -EINVAL);
3665                 }
3666                 if (rc)
3667                         GOTO(end, rc);
3668
3669                 /* Forbid direct update of llite root squash parameters.
3670                  * These parameters are indirectly set via the MDT settings.
3671                  * See (LU-1778) */
3672                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3673                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3674                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3675                         LCONSOLE_ERROR("%s: root squash parameters can only "
3676                                 "be updated through MDT component\n",
3677                                 mti->mti_fsname);
3678                         name_destroy(&cname);
3679                         GOTO(end, rc = -EINVAL);
3680                 }
3681
3682                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3683
3684                 /* Modify client */
3685                 rc = name_create(&logname, mti->mti_fsname, "-client");
3686                 if (rc) {
3687                         name_destroy(&cname);
3688                         GOTO(end, rc);
3689                 }
3690                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3691                                   cname, ptr);
3692
3693                 /* osc params affect the MDT as well */
3694                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3695                         int i;
3696
3697                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3698                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3699                                         continue;
3700                                 name_destroy(&cname);
3701                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3702                                                          fsdb, i);
3703                                 name_destroy(&logname);
3704                                 if (rc)
3705                                         break;
3706                                 rc = name_create_mdt(&logname,
3707                                                      mti->mti_fsname, i);
3708                                 if (rc)
3709                                         break;
3710                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3711                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3712                                                           mti, logname,
3713                                                           &mgi->mgi_bufs,
3714                                                           cname, ptr);
3715                                         if (rc)
3716                                                 break;
3717                                 }
3718                         }
3719                 }
3720
3721                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
3722                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
3723                     rc == 0) {
3724                         char suffix[16];
3725                         char *lodname = NULL;
3726                         char *param_str = NULL;
3727                         int i;
3728                         int index;
3729
3730                         /* replace mdc with osp */
3731                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
3732                         rc = server_name2index(mti->mti_svname, &index, NULL);
3733                         if (rc < 0) {
3734                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3735                                 GOTO(end, rc);
3736                         }
3737
3738                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3739                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3740                                         continue;
3741
3742                                 if (i == index)
3743                                         continue;
3744
3745                                 name_destroy(&logname);
3746                                 rc = name_create_mdt(&logname, mti->mti_fsname,
3747                                                      i);
3748                                 if (rc < 0)
3749                                         break;
3750
3751                                 if (mgs_log_is_empty(env, mgs, logname))
3752                                         continue;
3753
3754                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
3755                                          i);
3756                                 name_destroy(&cname);
3757                                 rc = name_create(&cname, mti->mti_svname,
3758                                                  suffix);
3759                                 if (rc < 0)
3760                                         break;
3761
3762                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3763                                                   &mgi->mgi_bufs, cname, ptr);
3764                                 if (rc < 0)
3765                                         break;
3766
3767                                 /* Add configuration log for noitfying LOD
3768                                  * to active/deactive the OSP. */
3769                                 name_destroy(&param_str);
3770                                 rc = name_create(&param_str, cname,
3771                                                  (*tmp == '0') ?  ".active=0" :
3772                                                  ".active=1");
3773                                 if (rc < 0)
3774                                         break;
3775
3776                                 name_destroy(&lodname);
3777                                 rc = name_create(&lodname, logname, "-mdtlov");
3778                                 if (rc < 0)
3779                                         break;
3780
3781                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3782                                                   &mgi->mgi_bufs, lodname,
3783                                                   param_str);
3784                                 if (rc < 0)
3785                                         break;
3786                         }
3787                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3788                         name_destroy(&lodname);
3789                         name_destroy(&param_str);
3790                 }
3791
3792                 name_destroy(&logname);
3793                 name_destroy(&cname);
3794                 GOTO(end, rc);
3795         }
3796
3797         /* All mdt. params in proc */
3798         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3799                 int i;
3800                 __u32 idx;
3801
3802                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3803                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3804                             MTI_NAME_MAXLEN) == 0)
3805                         /* device is unspecified completely? */
3806                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3807                 else
3808                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3809                 if (rc < 0)
3810                         goto active_err;
3811                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3812                         goto active_err;
3813                 if (rc & LDD_F_SV_ALL) {
3814                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3815                                 if (!test_bit(i,
3816                                                   fsdb->fsdb_mdt_index_map))
3817                                         continue;
3818                                 rc = name_create_mdt(&logname,
3819                                                 mti->mti_fsname, i);
3820                                 if (rc)
3821                                         goto active_err;
3822                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3823                                                   logname, &mgi->mgi_bufs,
3824                                                   logname, ptr);
3825                                 name_destroy(&logname);
3826                                 if (rc)
3827                                         goto active_err;
3828                         }
3829                 } else {
3830                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3831                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3832                                 LCONSOLE_ERROR("%s: root squash parameters "
3833                                         "cannot be applied to a single MDT\n",
3834                                         mti->mti_fsname);
3835                                 GOTO(end, rc = -EINVAL);
3836                         }
3837                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3838                                           mti->mti_svname, &mgi->mgi_bufs,
3839                                           mti->mti_svname, ptr);
3840                         if (rc)
3841                                 goto active_err;
3842                 }
3843
3844                 /* root squash settings are also applied to llite
3845                  * config log (see LU-1778) */
3846                 if (rc == 0 &&
3847                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3848                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3849                         char *cname;
3850                         char *ptr2;
3851
3852                         rc = name_create(&cname, mti->mti_fsname, "-client");
3853                         if (rc)
3854                                 GOTO(end, rc);
3855                         rc = name_create(&logname, mti->mti_fsname, "-client");
3856                         if (rc) {
3857                                 name_destroy(&cname);
3858                                 GOTO(end, rc);
3859                         }
3860                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3861                         if (rc) {
3862                                 name_destroy(&cname);
3863                                 name_destroy(&logname);
3864                                 GOTO(end, rc);
3865                         }
3866                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3867                                           &mgi->mgi_bufs, cname, ptr2);
3868                         name_destroy(&ptr2);
3869                         name_destroy(&logname);
3870                         name_destroy(&cname);
3871                 }
3872                 GOTO(end, rc);
3873         }
3874
3875         /* All mdd., ost. and osd. params in proc */
3876         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3877             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3878             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3879                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3880                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3881                         GOTO(end, rc = -ENODEV);
3882
3883                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3884                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3885                 GOTO(end, rc);
3886         }
3887
3888         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3889
3890 end:
3891         if (rc)
3892                 CERROR("err %d on param '%s'\n", rc, ptr);
3893
3894         RETURN(rc);
3895 }
3896
3897 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3898                          struct mgs_target_info *mti, struct fs_db *fsdb)
3899 {
3900         char    *buf, *params;
3901         int      rc = -EINVAL;
3902
3903         ENTRY;
3904
3905         /* set/check the new target index */
3906         rc = mgs_set_index(env, mgs, mti);
3907         if (rc < 0)
3908                 RETURN(rc);
3909
3910         if (rc == EALREADY) {
3911                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3912                               mti->mti_stripe_index, mti->mti_svname);
3913                 /* We would like to mark old log sections as invalid
3914                    and add new log sections in the client and mdt logs.
3915                    But if we add new sections, then live clients will
3916                    get repeat setup instructions for already running
3917                    osc's. So don't update the client/mdt logs. */
3918                 mti->mti_flags &= ~LDD_F_UPDATE;
3919                 rc = 0;
3920         }
3921
3922         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
3923                          cfs_fail_val : 10);
3924
3925         mutex_lock(&fsdb->fsdb_mutex);
3926
3927         if (mti->mti_flags &
3928             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3929                 /* Generate a log from scratch */
3930                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3931                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3932                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3933                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3934                 } else {
3935                         CERROR("Unknown target type %#x, can't create log for "
3936                                "%s\n", mti->mti_flags, mti->mti_svname);
3937                 }
3938                 if (rc) {
3939                         CERROR("Can't write logs for %s (%d)\n",
3940                                mti->mti_svname, rc);
3941                         GOTO(out_up, rc);
3942                 }
3943         } else {
3944                 /* Just update the params from tunefs in mgs_write_log_params */
3945                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3946                 mti->mti_flags |= LDD_F_PARAM;
3947         }
3948
3949         /* allocate temporary buffer, where class_get_next_param will
3950            make copy of a current  parameter */
3951         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3952         if (buf == NULL)
3953                 GOTO(out_up, rc = -ENOMEM);
3954         params = mti->mti_params;
3955         while (params != NULL) {
3956                 rc = class_get_next_param(&params, buf);
3957                 if (rc) {
3958                         if (rc == 1)
3959                                 /* there is no next parameter, that is
3960                                    not an error */
3961                                 rc = 0;
3962                         break;
3963                 }
3964                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3965                        params, buf);
3966                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3967                 if (rc)
3968                         break;
3969         }
3970
3971         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3972
3973 out_up:
3974         mutex_unlock(&fsdb->fsdb_mutex);
3975         RETURN(rc);
3976 }
3977
3978 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3979 {
3980         struct llog_ctxt        *ctxt;
3981         int                      rc = 0;
3982
3983         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3984         if (ctxt == NULL) {
3985                 CERROR("%s: MGS config context doesn't exist\n",
3986                        mgs->mgs_obd->obd_name);
3987                 rc = -ENODEV;
3988         } else {
3989                 rc = llog_erase(env, ctxt, NULL, name);
3990                 /* llog may not exist */
3991                 if (rc == -ENOENT)
3992                         rc = 0;
3993                 llog_ctxt_put(ctxt);
3994         }
3995
3996         if (rc)
3997                 CERROR("%s: failed to clear log %s: %d\n",
3998                        mgs->mgs_obd->obd_name, name, rc);
3999
4000         return rc;
4001 }
4002
4003 /* erase all logs for the given fs */
4004 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4005                    const char *fsname)
4006 {
4007         struct list_head log_list;
4008         struct mgs_direntry *dirent, *n;
4009         int rc, len = strlen(fsname);
4010         char *suffix;
4011         ENTRY;
4012
4013         /* Find all the logs in the CONFIGS directory */
4014         rc = class_dentry_readdir(env, mgs, &log_list);
4015         if (rc)
4016                 RETURN(rc);
4017
4018         mutex_lock(&mgs->mgs_mutex);
4019         /* Delete the fs db */
4020         mgs_remove_fsdb_by_name(mgs, fsname);
4021         mutex_unlock(&mgs->mgs_mutex);
4022
4023         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4024                 list_del_init(&dirent->mde_list);
4025                 suffix = strrchr(dirent->mde_name, '-');
4026                 if (suffix != NULL) {
4027                         if ((len == suffix - dirent->mde_name) &&
4028                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4029                                 CDEBUG(D_MGS, "Removing log %s\n",
4030                                        dirent->mde_name);
4031                                 mgs_erase_log(env, mgs, dirent->mde_name);
4032                         }
4033                 }
4034                 mgs_direntry_free(dirent);
4035         }
4036
4037         RETURN(rc);
4038 }
4039
4040 /* list all logs for the given fs */
4041 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4042                   struct obd_ioctl_data *data)
4043 {
4044         struct list_head         log_list;
4045         struct mgs_direntry     *dirent, *n;
4046         char                    *out, *suffix;
4047         int                      l, remains, rc;
4048
4049         ENTRY;
4050
4051         /* Find all the logs in the CONFIGS directory */
4052         rc = class_dentry_readdir(env, mgs, &log_list);
4053         if (rc)
4054                 RETURN(rc);
4055
4056         out = data->ioc_bulk;
4057         remains = data->ioc_inllen1;
4058         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4059                 list_del_init(&dirent->mde_list);
4060                 suffix = strrchr(dirent->mde_name, '-');
4061                 if (suffix != NULL) {
4062                         l = snprintf(out, remains, "config log: $%s\n",
4063                                      dirent->mde_name);
4064                         out += l;
4065                         remains -= l;
4066                 }
4067                 mgs_direntry_free(dirent);
4068                 if (remains <= 0)
4069                         break;
4070         }
4071         RETURN(rc);
4072 }
4073
4074 /* from llog_swab */
4075 static void print_lustre_cfg(struct lustre_cfg *lcfg)
4076 {
4077         int i;
4078         ENTRY;
4079
4080         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
4081         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
4082
4083         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
4084         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
4085         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
4086         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
4087
4088         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
4089         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
4090                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
4091                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
4092                                i, lcfg->lcfg_buflens[i],
4093                                lustre_cfg_string(lcfg, i));
4094                 }
4095         EXIT;
4096 }
4097
4098 /* Setup _mgs fsdb and log
4099  */
4100 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
4101 {
4102         struct fs_db *fsdb = NULL;
4103         int rc;
4104         ENTRY;
4105
4106         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
4107         if (!rc)
4108                 mgs_put_fsdb(mgs, fsdb);
4109
4110         RETURN(rc);
4111 }
4112
4113 /* Setup params fsdb and log
4114  */
4115 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
4116 {
4117         struct fs_db *fsdb = NULL;
4118         struct llog_handle *params_llh = NULL;
4119         int rc;
4120         ENTRY;
4121
4122         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
4123         if (!rc) {
4124                 mutex_lock(&fsdb->fsdb_mutex);
4125                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
4126                 if (!rc)
4127                         rc = record_end_log(env, &params_llh);
4128                 mutex_unlock(&fsdb->fsdb_mutex);
4129                 mgs_put_fsdb(mgs, fsdb);
4130         }
4131
4132         RETURN(rc);
4133 }
4134
4135 /* Cleanup params fsdb and log
4136  */
4137 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
4138 {
4139         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
4140 }
4141
4142 /* Set a permanent (config log) param for a target or fs
4143  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
4144  *             buf1 contains the single parameter
4145  */
4146 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
4147                  struct lustre_cfg *lcfg, char *fsname)
4148 {
4149         struct fs_db *fsdb = NULL;
4150         struct mgs_target_info *mti = NULL;
4151         char *devname, *param;
4152         char *ptr;
4153         const char *tmp;
4154         __u32 index;
4155         int rc = 0;
4156         bool free = false;
4157         ENTRY;
4158
4159         print_lustre_cfg(lcfg);
4160
4161         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
4162         devname = lustre_cfg_string(lcfg, 0);
4163         param = lustre_cfg_string(lcfg, 1);
4164         if (!devname) {
4165                 /* Assume device name embedded in param:
4166                    lustre-OST0000.osc.max_dirty_mb=32 */
4167                 ptr = strchr(param, '.');
4168                 if (ptr) {
4169                         devname = param;
4170                         *ptr = 0;
4171                         param = ptr + 1;
4172                 }
4173         }
4174         if (!devname) {
4175                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
4176                 RETURN(-ENOSYS);
4177         }
4178
4179         rc = mgs_parse_devname(devname, fsname, NULL);
4180         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
4181                 /* param related to llite isn't allowed to set by OST or MDT */
4182                 if (rc == 0 && strncmp(param, PARAM_LLITE,
4183                                        sizeof(PARAM_LLITE) - 1) == 0)
4184                         RETURN(-EINVAL);
4185         } else {
4186                 /* assume devname is the fsname */
4187                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
4188         }
4189         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
4190
4191         rc = mgs_find_or_make_fsdb(env, mgs,
4192                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
4193                                    PARAMS_FILENAME : fsname, &fsdb);
4194         if (rc)
4195                 RETURN(rc);
4196
4197         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
4198             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
4199             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4200                 CERROR("No filesystem targets for %s. cfg_device from lctl "
4201                        "is '%s'\n", fsname, devname);
4202                 free = true;
4203                 GOTO(out, rc = -EINVAL);
4204         }
4205
4206         /* Create a fake mti to hold everything */
4207         OBD_ALLOC_PTR(mti);
4208         if (!mti)
4209                 GOTO(out, rc = -ENOMEM);
4210         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
4211             >= sizeof(mti->mti_fsname))
4212                 GOTO(out, rc = -E2BIG);
4213         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
4214             >= sizeof(mti->mti_svname))
4215                 GOTO(out, rc = -E2BIG);
4216         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
4217             >= sizeof(mti->mti_params))
4218                 GOTO(out, rc = -E2BIG);
4219         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
4220         if (rc < 0)
4221                 /* Not a valid server; may be only fsname */
4222                 rc = 0;
4223         else
4224                 /* Strip -osc or -mdc suffix from svname */
4225                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
4226                                      mti->mti_svname))
4227                         GOTO(out, rc = -EINVAL);
4228         /*
4229          * Revoke lock so everyone updates.  Should be alright if
4230          * someone was already reading while we were updating the logs,
4231          * so we don't really need to hold the lock while we're
4232          * writing (above).
4233          */
4234         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
4235                 mti->mti_flags = rc | LDD_F_PARAM2;
4236                 mutex_lock(&fsdb->fsdb_mutex);
4237                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
4238                 mutex_unlock(&fsdb->fsdb_mutex);
4239                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
4240         } else {
4241                 mti->mti_flags = rc | LDD_F_PARAM;
4242                 mutex_lock(&fsdb->fsdb_mutex);
4243                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
4244                 mutex_unlock(&fsdb->fsdb_mutex);
4245                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4246         }
4247
4248 out:
4249         if (mti)
4250                 OBD_FREE_PTR(mti);
4251
4252         if (fsdb) {
4253                 if (free)
4254                         mgs_unlink_fsdb(mgs, fsdb);
4255                 mgs_put_fsdb(mgs, fsdb);
4256         }
4257
4258         RETURN(rc);
4259 }
4260
4261 static int mgs_write_log_pool(const struct lu_env *env,
4262                               struct mgs_device *mgs, char *logname,
4263                               struct fs_db *fsdb, char *tgtname,
4264                               enum lcfg_command_type cmd,
4265                               char *fsname, char *poolname,
4266                               char *ostname, char *comment)
4267 {
4268         struct llog_handle *llh = NULL;
4269         int rc;
4270
4271         rc = record_start_log(env, mgs, &llh, logname);
4272         if (rc)
4273                 return rc;
4274         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
4275         if (rc)
4276                 goto out;
4277         rc = record_base(env, llh, tgtname, 0, cmd,
4278                          fsname, poolname, ostname, NULL);
4279         if (rc)
4280                 goto out;
4281         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
4282 out:
4283         record_end_log(env, &llh);
4284         return rc;
4285 }
4286
4287 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
4288                     enum lcfg_command_type cmd, const char *nodemap_name,
4289                     char *param)
4290 {
4291         lnet_nid_t      nid[2];
4292         __u32           idmap[2];
4293         bool            bool_switch;
4294         __u32           int_id;
4295         int             rc = 0;
4296         ENTRY;
4297
4298         switch (cmd) {
4299         case LCFG_NODEMAP_ADD:
4300                 rc = nodemap_add(nodemap_name);
4301                 break;
4302         case LCFG_NODEMAP_DEL:
4303                 rc = nodemap_del(nodemap_name);
4304                 break;
4305         case LCFG_NODEMAP_ADD_RANGE:
4306                 rc = nodemap_parse_range(param, nid);
4307                 if (rc != 0)
4308                         break;
4309                 rc = nodemap_add_range(nodemap_name, nid);
4310                 break;
4311         case LCFG_NODEMAP_DEL_RANGE:
4312                 rc = nodemap_parse_range(param, nid);
4313                 if (rc != 0)
4314                         break;
4315                 rc = nodemap_del_range(nodemap_name, nid);
4316                 break;
4317         case LCFG_NODEMAP_ADMIN:
4318                 bool_switch = simple_strtoul(param, NULL, 10);
4319                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4320                 break;
4321         case LCFG_NODEMAP_DENY_UNKNOWN:
4322                 bool_switch = simple_strtoul(param, NULL, 10);
4323                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
4324                 break;
4325         case LCFG_NODEMAP_TRUSTED:
4326                 bool_switch = simple_strtoul(param, NULL, 10);
4327                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4328                 break;
4329         case LCFG_NODEMAP_SQUASH_UID:
4330                 int_id = simple_strtoul(param, NULL, 10);
4331                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4332                 break;
4333         case LCFG_NODEMAP_SQUASH_GID:
4334                 int_id = simple_strtoul(param, NULL, 10);
4335                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4336                 break;
4337         case LCFG_NODEMAP_ADD_UIDMAP:
4338         case LCFG_NODEMAP_ADD_GIDMAP:
4339                 rc = nodemap_parse_idmap(param, idmap);
4340                 if (rc != 0)
4341                         break;
4342                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4343                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4344                                                idmap);
4345                 else
4346                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4347                                                idmap);
4348                 break;
4349         case LCFG_NODEMAP_DEL_UIDMAP:
4350         case LCFG_NODEMAP_DEL_GIDMAP:
4351                 rc = nodemap_parse_idmap(param, idmap);
4352                 if (rc != 0)
4353                         break;
4354                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4355                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4356                                                idmap);
4357                 else
4358                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4359                                                idmap);
4360                 break;
4361         case LCFG_NODEMAP_SET_FILESET:
4362                 rc = nodemap_set_fileset(nodemap_name, param);
4363                 break;
4364         default:
4365                 rc = -EINVAL;
4366         }
4367
4368         RETURN(rc);
4369 }
4370
4371 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4372                  enum lcfg_command_type cmd, char *fsname,
4373                  char *poolname, char *ostname)
4374 {
4375         struct fs_db *fsdb;
4376         char *lovname;
4377         char *logname;
4378         char *label = NULL, *canceled_label = NULL;
4379         int label_sz;
4380         struct mgs_target_info *mti = NULL;
4381         bool checked = false;
4382         bool locked = false;
4383         bool free = false;
4384         int rc, i;
4385         ENTRY;
4386
4387         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4388         if (rc) {
4389                 CERROR("Can't get db for %s\n", fsname);
4390                 RETURN(rc);
4391         }
4392         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4393                 CERROR("%s is not defined\n", fsname);
4394                 free = true;
4395                 GOTO(out_fsdb, rc = -EINVAL);
4396         }
4397
4398         label_sz = 10 + strlen(fsname) + strlen(poolname);
4399
4400         /* check if ostname match fsname */
4401         if (ostname != NULL) {
4402                 char *ptr;
4403
4404                 ptr = strrchr(ostname, '-');
4405                 if ((ptr == NULL) ||
4406                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4407                         RETURN(-EINVAL);
4408                 label_sz += strlen(ostname);
4409         }
4410
4411         OBD_ALLOC(label, label_sz);
4412         if (!label)
4413                 GOTO(out_fsdb, rc = -ENOMEM);
4414
4415         switch(cmd) {
4416         case LCFG_POOL_NEW:
4417                 sprintf(label,
4418                         "new %s.%s", fsname, poolname);
4419                 break;
4420         case LCFG_POOL_ADD:
4421                 sprintf(label,
4422                         "add %s.%s.%s", fsname, poolname, ostname);
4423                 break;
4424         case LCFG_POOL_REM:
4425                 OBD_ALLOC(canceled_label, label_sz);
4426                 if (canceled_label == NULL)
4427                         GOTO(out_label, rc = -ENOMEM);
4428                 sprintf(label,
4429                         "rem %s.%s.%s", fsname, poolname, ostname);
4430                 sprintf(canceled_label,
4431                         "add %s.%s.%s", fsname, poolname, ostname);
4432                 break;
4433         case LCFG_POOL_DEL:
4434                 OBD_ALLOC(canceled_label, label_sz);
4435                 if (canceled_label == NULL)
4436                         GOTO(out_label, rc = -ENOMEM);
4437                 sprintf(label,
4438                         "del %s.%s", fsname, poolname);
4439                 sprintf(canceled_label,
4440                         "new %s.%s", fsname, poolname);
4441                 break;
4442         default:
4443                 break;
4444         }
4445
4446         OBD_ALLOC_PTR(mti);
4447         if (mti == NULL)
4448                 GOTO(out_cancel, rc = -ENOMEM);
4449         strncpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
4450
4451         mutex_lock(&fsdb->fsdb_mutex);
4452         locked = true;
4453         /* write pool def to all MDT logs */
4454         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4455                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4456                         rc = name_create_mdt_and_lov(&logname, &lovname,
4457                                                      fsdb, i);
4458                         if (rc)
4459                                 GOTO(out_mti, rc);
4460
4461                         if (!checked && (canceled_label == NULL)) {
4462                                 rc = mgs_check_marker(env, mgs, fsdb, mti,
4463                                                 logname, lovname, label);
4464                                 if (rc) {
4465                                         name_destroy(&logname);
4466                                         name_destroy(&lovname);
4467                                         GOTO(out_mti,
4468                                                 rc = (rc == LLOG_PROC_BREAK ?
4469                                                         -EEXIST : rc));
4470                                 }
4471                                 checked = true;
4472                         }
4473                         if (canceled_label != NULL)
4474                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4475                                                 lovname, canceled_label,
4476                                                 CM_SKIP);
4477
4478                         if (rc >= 0)
4479                                 rc = mgs_write_log_pool(env, mgs, logname,
4480                                                         fsdb, lovname, cmd,
4481                                                         fsname, poolname,
4482                                                         ostname, label);
4483                         name_destroy(&logname);
4484                         name_destroy(&lovname);
4485                         if (rc)
4486                                 GOTO(out_mti, rc);
4487                 }
4488         }
4489
4490         rc = name_create(&logname, fsname, "-client");
4491         if (rc)
4492                 GOTO(out_mti, rc);
4493
4494         if (!checked && (canceled_label == NULL)) {
4495                 rc = mgs_check_marker(env, mgs, fsdb, mti, logname,
4496                                 fsdb->fsdb_clilov, label);
4497                 if (rc) {
4498                         name_destroy(&logname);
4499                         GOTO(out_mti, rc = (rc == LLOG_PROC_BREAK ?
4500                                 -EEXIST : rc));
4501                 }
4502         }
4503         if (canceled_label != NULL) {
4504                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4505                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4506                 if (rc < 0) {
4507                         name_destroy(&logname);
4508                         GOTO(out_mti, rc);
4509                 }
4510         }
4511
4512         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4513                                 cmd, fsname, poolname, ostname, label);
4514         mutex_unlock(&fsdb->fsdb_mutex);
4515         locked = false;
4516         name_destroy(&logname);
4517         /* request for update */
4518         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4519
4520         GOTO(out_mti, rc);
4521
4522 out_mti:
4523         if (locked)
4524                 mutex_unlock(&fsdb->fsdb_mutex);
4525         if (mti != NULL)
4526                 OBD_FREE_PTR(mti);
4527 out_cancel:
4528         if (canceled_label != NULL)
4529                 OBD_FREE(canceled_label, label_sz);
4530 out_label:
4531         OBD_FREE(label, label_sz);
4532 out_fsdb:
4533         if (free)
4534                 mgs_unlink_fsdb(mgs, fsdb);
4535         mgs_put_fsdb(mgs, fsdb);
4536
4537         return rc;
4538 }