Whamcloud - gitweb
LU-1169 mgs: Fix race during new fsdb creation.
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 cfs_set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 cfs_set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         cfs_set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 cfs_set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         cfs_mutex_init(&fsdb->fsdb_mutex);
349         cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 cfs_set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         cfs_mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         cfs_mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         cfs_mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         cfs_mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         cfs_mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 cfs_mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 cfs_mutex_lock(&fsdb->fsdb_mutex);
450         cfs_mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         cfs_mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         cfs_mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (cfs_test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                  if (!cfs_test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         cfs_mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551                 if (fsdb->fsdb_mdt_count >= MAX_MDT_COUNT) {
552                         LCONSOLE_ERROR_MSG(0x13f, "The max mdt count"
553                                            "is %d\n", (int)MAX_MDT_COUNT);
554                         GOTO(out_up, rc = -ERANGE);
555                 }
556         } else {
557                 GOTO(out_up, rc = -EINVAL);
558         }
559
560         if (mti->mti_flags & LDD_F_NEED_INDEX) {
561                 rc = next_index(imap, INDEX_MAP_SIZE);
562                 if (rc == -1)
563                         GOTO(out_up, rc = -ERANGE);
564                 mti->mti_stripe_index = rc;
565                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
566                         fsdb->fsdb_mdt_count ++;
567         }
568
569         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
570                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
571                                    "but the max index is %d.\n",
572                                    mti->mti_svname, mti->mti_stripe_index,
573                                    INDEX_MAP_SIZE * 8);
574                 GOTO(out_up, rc = -ERANGE);
575         }
576
577         if (cfs_test_bit(mti->mti_stripe_index, imap)) {
578                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
579                     !(mti->mti_flags & LDD_F_WRITECONF)) {
580                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
581                                            "%d, but that index is already in "
582                                            "use. Use --writeconf to force\n",
583                                            mti->mti_svname,
584                                            mti->mti_stripe_index);
585                         GOTO(out_up, rc = -EADDRINUSE);
586                 } else {
587                         CDEBUG(D_MGS, "Server %s updating index %d\n",
588                                mti->mti_svname, mti->mti_stripe_index);
589                         GOTO(out_up, rc = EALREADY);
590                 }
591         }
592
593         cfs_set_bit(mti->mti_stripe_index, imap);
594         cfs_clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
595         cfs_mutex_unlock(&fsdb->fsdb_mutex);
596         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
597                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
598
599         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
600                mti->mti_stripe_index);
601
602         RETURN(0);
603 out_up:
604         cfs_mutex_unlock(&fsdb->fsdb_mutex);
605         return rc;
606 }
607
608 struct mgs_modify_lookup {
609         struct cfg_marker mml_marker;
610         int               mml_modified;
611 };
612
613 static int mgs_modify_handler(const struct lu_env *env,
614                               struct llog_handle *llh,
615                               struct llog_rec_hdr *rec, void *data)
616 {
617         struct mgs_modify_lookup *mml = data;
618         struct cfg_marker *marker;
619         struct lustre_cfg *lcfg = (struct lustre_cfg *)(rec + 1);
620         int cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
621                 sizeof(struct llog_rec_tail);
622         int rc;
623         ENTRY;
624
625         if (rec->lrh_type != OBD_CFG_REC) {
626                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
627                 RETURN(-EINVAL);
628         }
629
630         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
631         if (rc) {
632                 CERROR("Insane cfg\n");
633                 RETURN(rc);
634         }
635
636         /* We only care about markers */
637         if (lcfg->lcfg_command != LCFG_MARKER)
638                 RETURN(0);
639
640         marker = lustre_cfg_buf(lcfg, 1);
641         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
642             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
643             !(marker->cm_flags & CM_SKIP)) {
644                 /* Found a non-skipped marker match */
645                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
646                        rec->lrh_index, marker->cm_step,
647                        marker->cm_flags, mml->mml_marker.cm_flags,
648                        marker->cm_tgtname, marker->cm_comment);
649                 /* Overwrite the old marker llog entry */
650                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
651                 marker->cm_flags |= mml->mml_marker.cm_flags;
652                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
653                 /* Header and tail are added back to lrh_len in
654                    llog_lvfs_write_rec */
655                 rec->lrh_len = cfg_len;
656                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
657                                 rec->lrh_index);
658                 if (!rc)
659                          mml->mml_modified++;
660         }
661
662         RETURN(rc);
663 }
664
665 /**
666  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
667  * Return code:
668  * 0 - modified successfully,
669  * 1 - no modification was done
670  * negative - error
671  */
672 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
673                       struct fs_db *fsdb, struct mgs_target_info *mti,
674                       char *logname, char *devname, char *comment, int flags)
675 {
676         struct llog_handle *loghandle;
677         struct llog_ctxt *ctxt;
678         struct mgs_modify_lookup *mml;
679         int rc;
680
681         ENTRY;
682
683         LASSERT(cfs_mutex_is_locked(&fsdb->fsdb_mutex));
684         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
685                flags);
686
687         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
688         LASSERT(ctxt != NULL);
689         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
690         if (rc < 0) {
691                 if (rc == -ENOENT)
692                         rc = 0;
693                 GOTO(out_pop, rc);
694         }
695
696         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
697         if (rc)
698                 GOTO(out_close, rc);
699
700         if (llog_get_size(loghandle) <= 1)
701                 GOTO(out_close, rc = 0);
702
703         OBD_ALLOC_PTR(mml);
704         if (!mml)
705                 GOTO(out_close, rc = -ENOMEM);
706         strcpy(mml->mml_marker.cm_comment, comment);
707         strcpy(mml->mml_marker.cm_tgtname, devname);
708         /* Modify mostly means cancel */
709         mml->mml_marker.cm_flags = flags;
710         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
711         mml->mml_modified = 0;
712         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
713                           NULL);
714         if (!rc && !mml->mml_modified)
715                 rc = 1;
716         OBD_FREE_PTR(mml);
717
718 out_close:
719         llog_close(env, loghandle);
720 out_pop:
721         if (rc < 0)
722                 CERROR("%s: modify %s/%s failed: rc = %d\n",
723                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
724         llog_ctxt_put(ctxt);
725         RETURN(rc);
726 }
727
728 /******************** config log recording functions *********************/
729
730 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
731                        struct lustre_cfg *lcfg)
732 {
733         struct llog_rec_hdr      rec;
734         int                      buflen, rc;
735
736         if (!lcfg || !llh)
737                 return -ENOMEM;
738
739         LASSERT(llh->lgh_ctxt);
740
741         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
742                                 lcfg->lcfg_buflens);
743         rec.lrh_len = llog_data_len(buflen);
744         rec.lrh_type = OBD_CFG_REC;
745
746         /* idx = -1 means append */
747         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
748         if (rc)
749                 CERROR("failed %d\n", rc);
750         return rc;
751 }
752
753 static int record_base(const struct lu_env *env, struct llog_handle *llh,
754                      char *cfgname, lnet_nid_t nid, int cmd,
755                      char *s1, char *s2, char *s3, char *s4)
756 {
757         struct mgs_thread_info *mgi = mgs_env_info(env);
758         struct lustre_cfg     *lcfg;
759         int rc;
760
761         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
762                cmd, s1, s2, s3, s4);
763
764         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
765         if (s1)
766                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
767         if (s2)
768                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
769         if (s3)
770                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
771         if (s4)
772                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
773
774         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
775         if (!lcfg)
776                 return -ENOMEM;
777         lcfg->lcfg_nid = nid;
778
779         rc = record_lcfg(env, llh, lcfg);
780
781         lustre_cfg_free(lcfg);
782
783         if (rc) {
784                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
785                        cmd, s1, s2, s3, s4);
786         }
787         return rc;
788 }
789
790
791 static inline int record_add_uuid(const struct lu_env *env,
792                                   struct llog_handle *llh,
793                                   uint64_t nid, char *uuid)
794 {
795         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
796
797 }
798
799 static inline int record_add_conn(const struct lu_env *env,
800                                   struct llog_handle *llh,
801                                   char *devname, char *uuid)
802 {
803         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
804 }
805
806 static inline int record_attach(const struct lu_env *env,
807                                 struct llog_handle *llh, char *devname,
808                                 char *type, char *uuid)
809 {
810         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
811 }
812
813 static inline int record_setup(const struct lu_env *env,
814                                struct llog_handle *llh, char *devname,
815                                char *s1, char *s2, char *s3, char *s4)
816 {
817         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
818 }
819
820 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
821                             char *devname, struct lov_desc *desc)
822 {
823         struct mgs_thread_info *mgi = mgs_env_info(env);
824         struct lustre_cfg *lcfg;
825         int rc;
826
827         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
828         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
829         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
830         if (!lcfg)
831                 return -ENOMEM;
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835         return rc;
836 }
837
838 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
839                             char *devname, struct lmv_desc *desc)
840 {
841         struct mgs_thread_info *mgi = mgs_env_info(env);
842         struct lustre_cfg *lcfg;
843         int rc;
844
845         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
846         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
847         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
848
849         rc = record_lcfg(env, llh, lcfg);
850
851         lustre_cfg_free(lcfg);
852         return rc;
853 }
854
855 static inline int record_mdc_add(const struct lu_env *env,
856                                  struct llog_handle *llh,
857                                  char *logname, char *mdcuuid,
858                                  char *mdtuuid, char *index,
859                                  char *gen)
860 {
861         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
862                            mdtuuid,index,gen,mdcuuid);
863 }
864
865 static inline int record_lov_add(const struct lu_env *env,
866                                  struct llog_handle *llh,
867                                  char *lov_name, char *ost_uuid,
868                                  char *index, char *gen)
869 {
870         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
871                            ost_uuid,index,gen,0);
872 }
873
874 static inline int record_mount_opt(const struct lu_env *env,
875                                    struct llog_handle *llh,
876                                    char *profile, char *lov_name,
877                                    char *mdc_name)
878 {
879         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
880                            profile,lov_name,mdc_name,0);
881 }
882
883 static int record_marker(const struct lu_env *env,
884                          struct llog_handle *llh,
885                          struct fs_db *fsdb, __u32 flags,
886                          char *tgtname, char *comment)
887 {
888         struct mgs_thread_info *mgi = mgs_env_info(env);
889         struct lustre_cfg *lcfg;
890         int rc;
891
892         if (flags & CM_START)
893                 fsdb->fsdb_gen++;
894         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
895         mgi->mgi_marker.cm_flags = flags;
896         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
897         strncpy(mgi->mgi_marker.cm_tgtname, tgtname,
898                 sizeof(mgi->mgi_marker.cm_tgtname));
899         strncpy(mgi->mgi_marker.cm_comment, comment,
900                 sizeof(mgi->mgi_marker.cm_comment));
901         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
902         mgi->mgi_marker.cm_canceltime = 0;
903         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
904         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
905                             sizeof(mgi->mgi_marker));
906         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
907         if (!lcfg)
908                 return -ENOMEM;
909         rc = record_lcfg(env, llh, lcfg);
910
911         lustre_cfg_free(lcfg);
912         return rc;
913 }
914
915 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
916                             struct llog_handle **llh, char *name)
917 {
918         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
919         struct llog_ctxt        *ctxt;
920         int                      rc = 0;
921
922         if (*llh)
923                 GOTO(out, rc = -EBUSY);
924
925         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
926         if (!ctxt)
927                 GOTO(out, rc = -ENODEV);
928         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
929
930         rc = llog_open_create(env, ctxt, llh, NULL, name);
931         if (rc)
932                 GOTO(out_ctxt, rc);
933         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
934         if (rc)
935                 llog_close(env, *llh);
936 out_ctxt:
937         llog_ctxt_put(ctxt);
938 out:
939         if (rc) {
940                 CERROR("%s: can't start log %s: rc = %d\n",
941                        mgs->mgs_obd->obd_name, name, rc);
942                 *llh = NULL;
943         }
944         RETURN(rc);
945 }
946
947 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
948 {
949         int rc;
950
951         rc = llog_close(env, *llh);
952         *llh = NULL;
953
954         return rc;
955 }
956
957 static int mgs_log_is_empty(const struct lu_env *env,
958                             struct mgs_device *mgs, char *name)
959 {
960         struct llog_handle *llh;
961         struct llog_ctxt *ctxt;
962         int rc = 0;
963
964         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
965         LASSERT(ctxt != NULL);
966         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
967         if (rc < 0) {
968                 if (rc == -ENOENT)
969                         rc = 0;
970                 GOTO(out_ctxt, rc);
971         }
972
973         llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
974         if (rc)
975                 GOTO(out_close, rc);
976         rc = llog_get_size(llh);
977
978 out_close:
979         llog_close(env, llh);
980 out_ctxt:
981         llog_ctxt_put(ctxt);
982         /* header is record 1 */
983         return (rc <= 1);
984 }
985
986 /******************** config "macros" *********************/
987
988 /* write an lcfg directly into a log (with markers) */
989 static int mgs_write_log_direct(const struct lu_env *env,
990                                 struct mgs_device *mgs, struct fs_db *fsdb,
991                                 char *logname, struct lustre_cfg *lcfg,
992                                 char *devname, char *comment)
993 {
994         struct llog_handle *llh = NULL;
995         int rc;
996         ENTRY;
997
998         if (!lcfg)
999                 RETURN(-ENOMEM);
1000
1001         rc = record_start_log(env, mgs, &llh, logname);
1002         if (rc)
1003                 RETURN(rc);
1004
1005         /* FIXME These should be a single journal transaction */
1006         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1007         if (rc)
1008                 GOTO(out_end, rc);
1009         rc = record_lcfg(env, llh, lcfg);
1010         if (rc)
1011                 GOTO(out_end, rc);
1012         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1013         if (rc)
1014                 GOTO(out_end, rc);
1015 out_end:
1016         record_end_log(env, &llh);
1017         RETURN(rc);
1018 }
1019
1020 /* write the lcfg in all logs for the given fs */
1021 int mgs_write_log_direct_all(const struct lu_env *env,
1022                              struct mgs_device *mgs,
1023                              struct fs_db *fsdb,
1024                              struct mgs_target_info *mti,
1025                              struct lustre_cfg *lcfg,
1026                              char *devname, char *comment,
1027                              int server_only)
1028 {
1029         cfs_list_t list;
1030         struct mgs_direntry *dirent, *n;
1031         char *fsname = mti->mti_fsname;
1032         char *logname;
1033         int rc = 0, len = strlen(fsname);
1034         ENTRY;
1035
1036         /* We need to set params for any future logs
1037            as well. FIXME Append this file to every new log.
1038            Actually, we should store as params (text), not llogs.  Or
1039            in a database. */
1040         rc = name_create(&logname, fsname, "-params");
1041         if (rc)
1042                 RETURN(rc);
1043         if (mgs_log_is_empty(env, mgs, logname)) {
1044                 struct llog_handle *llh = NULL;
1045                 rc = record_start_log(env, mgs, &llh, logname);
1046                 record_end_log(env, &llh);
1047         }
1048         name_destroy(&logname);
1049         if (rc)
1050                 RETURN(rc);
1051
1052         /* Find all the logs in the CONFIGS directory */
1053         rc = class_dentry_readdir(env, mgs, &list);
1054         if (rc)
1055                 RETURN(rc);
1056
1057         /* Could use fsdb index maps instead of directory listing */
1058         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1059                 cfs_list_del(&dirent->list);
1060                 /* don't write to sptlrpc rule log */
1061                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1062                         goto next;
1063
1064                 /* caller wants write server logs only */
1065                 if (server_only && strstr(dirent->name, "-client") != NULL)
1066                         goto next;
1067
1068                 if (strncmp(fsname, dirent->name, len) == 0) {
1069                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1070                         /* Erase any old settings of this same parameter */
1071                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1072                                         devname, comment, CM_SKIP);
1073                         if (rc < 0)
1074                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1075                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1076                         /* Write the new one */
1077                         if (lcfg) {
1078                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1079                                                           dirent->name,
1080                                                           lcfg, devname,
1081                                                           comment);
1082                                 if (rc)
1083                                         CERROR("%s: writing log %s: rc = %d\n",
1084                                                mgs->mgs_obd->obd_name,
1085                                                dirent->name, rc);
1086                         }
1087                 }
1088 next:
1089                 mgs_direntry_free(dirent);
1090         }
1091
1092         RETURN(rc);
1093 }
1094
1095 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1096                                     struct mgs_device *mgs,
1097                                     struct fs_db *fsdb,
1098                                     struct mgs_target_info *mti,
1099                                     char *logname);
1100 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1101                                     struct mgs_device *mgs,
1102                                     struct fs_db *fsdb,
1103                                     struct mgs_target_info *mti,
1104                                     char *logname, char *suffix, char *lovname,
1105                                     enum lustre_sec_part sec_part, int flags);
1106 static int name_create_mdt_and_lov(char **logname, char **lovname,
1107                                    struct fs_db *fsdb, int i);
1108
1109 static int mgs_steal_llog_handler(const struct lu_env *env,
1110                                   struct llog_handle *llh,
1111                                   struct llog_rec_hdr *rec, void *data)
1112 {
1113         struct mgs_device *mgs;
1114         struct obd_device *obd;
1115         struct mgs_target_info *mti, *tmti;
1116         struct fs_db *fsdb;
1117         int cfg_len = rec->lrh_len;
1118         char *cfg_buf = (char*) (rec + 1);
1119         struct lustre_cfg *lcfg;
1120         int rc = 0;
1121         struct llog_handle *mdt_llh = NULL;
1122         static int got_an_osc_or_mdc = 0;
1123         /* 0: not found any osc/mdc;
1124            1: found osc;
1125            2: found mdc;
1126         */
1127         static int last_step = -1;
1128
1129         ENTRY;
1130
1131         mti = ((struct temp_comp*)data)->comp_mti;
1132         tmti = ((struct temp_comp*)data)->comp_tmti;
1133         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1134         obd = ((struct temp_comp *)data)->comp_obd;
1135         mgs = lu2mgs_dev(obd->obd_lu_dev);
1136         LASSERT(mgs);
1137
1138         if (rec->lrh_type != OBD_CFG_REC) {
1139                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1140                 RETURN(-EINVAL);
1141         }
1142
1143         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1144         if (rc) {
1145                 CERROR("Insane cfg\n");
1146                 RETURN(rc);
1147         }
1148
1149         lcfg = (struct lustre_cfg *)cfg_buf;
1150
1151         if (lcfg->lcfg_command == LCFG_MARKER) {
1152                 struct cfg_marker *marker;
1153                 marker = lustre_cfg_buf(lcfg, 1);
1154                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1155                     (marker->cm_flags & CM_START)){
1156                         got_an_osc_or_mdc = 1;
1157                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1158                                 sizeof(tmti->mti_svname));
1159                         rc = record_start_log(env, mgs, &mdt_llh,
1160                                               mti->mti_svname);
1161                         if (rc)
1162                                 RETURN(rc);
1163                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1164                                            mti->mti_svname,"add osc(copied)");
1165                         record_end_log(env, &mdt_llh);
1166                         last_step = marker->cm_step;
1167                         RETURN(rc);
1168                 }
1169                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1170                     (marker->cm_flags & CM_END)){
1171                         LASSERT(last_step == marker->cm_step);
1172                         last_step = -1;
1173                         got_an_osc_or_mdc = 0;
1174                         rc = record_start_log(env, mgs, &mdt_llh,
1175                                               mti->mti_svname);
1176                         if (rc)
1177                                 RETURN(rc);
1178                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1179                                            mti->mti_svname,"add osc(copied)");
1180                         record_end_log(env, &mdt_llh);
1181                         RETURN(rc);
1182                 }
1183                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1184                     (marker->cm_flags & CM_START)){
1185                         got_an_osc_or_mdc = 2;
1186                         last_step = marker->cm_step;
1187                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1188                                strlen(marker->cm_tgtname));
1189
1190                         RETURN(rc);
1191                 }
1192                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1193                     (marker->cm_flags & CM_END)){
1194                         LASSERT(last_step == marker->cm_step);
1195                         last_step = -1;
1196                         got_an_osc_or_mdc = 0;
1197                         RETURN(rc);
1198                 }
1199         }
1200
1201         if (got_an_osc_or_mdc == 0 || last_step < 0)
1202                 RETURN(rc);
1203
1204         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1205                 uint64_t nodenid;
1206                 nodenid = lcfg->lcfg_nid;
1207
1208                 tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1209                 tmti->mti_nid_count++;
1210
1211                 RETURN(rc);
1212         }
1213
1214         if (lcfg->lcfg_command == LCFG_SETUP) {
1215                 char *target;
1216
1217                 target = lustre_cfg_string(lcfg, 1);
1218                 memcpy(tmti->mti_uuid, target, strlen(target));
1219                 RETURN(rc);
1220         }
1221
1222         /* ignore client side sptlrpc_conf_log */
1223         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1224                 RETURN(rc);
1225
1226         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1227                 int index;
1228
1229                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1230                         RETURN (-EINVAL);
1231
1232                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1233                        strlen(mti->mti_fsname));
1234                 tmti->mti_stripe_index = index;
1235
1236                 rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, tmti,
1237                                               mti->mti_svname);
1238                 memset(tmti, 0, sizeof(*tmti));
1239                 RETURN(rc);
1240         }
1241
1242         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1243                 int index;
1244                 char mdt_index[9];
1245                 char *logname, *lovname;
1246
1247                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1248                                              mti->mti_stripe_index);
1249                 if (rc)
1250                         RETURN(rc);
1251                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1252
1253                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1254                         name_destroy(&logname);
1255                         name_destroy(&lovname);
1256                         RETURN(-EINVAL);
1257                 }
1258
1259                 tmti->mti_stripe_index = index;
1260                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1261                                          mdt_index, lovname,
1262                                          LUSTRE_SP_MDT, 0);
1263                 name_destroy(&logname);
1264                 name_destroy(&lovname);
1265                 RETURN(rc);
1266         }
1267         RETURN(rc);
1268 }
1269
1270 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1271 /* stealed from mgs_get_fsdb_from_llog*/
1272 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1273                                               struct mgs_device *mgs,
1274                                               char *client_name,
1275                                               struct temp_comp* comp)
1276 {
1277         struct llog_handle *loghandle;
1278         struct mgs_target_info *tmti;
1279         struct llog_ctxt *ctxt;
1280         int rc;
1281
1282         ENTRY;
1283
1284         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1285         LASSERT(ctxt != NULL);
1286
1287         OBD_ALLOC_PTR(tmti);
1288         if (tmti == NULL)
1289                 GOTO(out_ctxt, rc = -ENOMEM);
1290
1291         comp->comp_tmti = tmti;
1292         comp->comp_obd = mgs->mgs_obd;
1293
1294         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1295                        LLOG_OPEN_EXISTS);
1296         if (rc < 0) {
1297                 if (rc == -ENOENT)
1298                         rc = 0;
1299                 GOTO(out_pop, rc);
1300         }
1301
1302         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1303         if (rc)
1304                 GOTO(out_close, rc);
1305
1306         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1307                                   (void *)comp, NULL, false);
1308         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1309 out_close:
1310         llog_close(env, loghandle);
1311 out_pop:
1312         OBD_FREE_PTR(tmti);
1313 out_ctxt:
1314         llog_ctxt_put(ctxt);
1315         RETURN(rc);
1316 }
1317
1318 /* lmv is the second thing for client logs */
1319 /* copied from mgs_write_log_lov. Please refer to that.  */
1320 static int mgs_write_log_lmv(const struct lu_env *env,
1321                              struct mgs_device *mgs,
1322                              struct fs_db *fsdb,
1323                              struct mgs_target_info *mti,
1324                              char *logname, char *lmvname)
1325 {
1326         struct llog_handle *llh = NULL;
1327         struct lmv_desc *lmvdesc;
1328         char *uuid;
1329         int rc = 0;
1330         ENTRY;
1331
1332         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1333
1334         OBD_ALLOC_PTR(lmvdesc);
1335         if (lmvdesc == NULL)
1336                 RETURN(-ENOMEM);
1337         lmvdesc->ld_active_tgt_count = 0;
1338         lmvdesc->ld_tgt_count = 0;
1339         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1340         uuid = (char *)lmvdesc->ld_uuid.uuid;
1341
1342         rc = record_start_log(env, mgs, &llh, logname);
1343         if (rc)
1344                 GOTO(out_free, rc);
1345         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1346         if (rc)
1347                 GOTO(out_end, rc);
1348         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1349         if (rc)
1350                 GOTO(out_end, rc);
1351         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1352         if (rc)
1353                 GOTO(out_end, rc);
1354         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1355         if (rc)
1356                 GOTO(out_end, rc);
1357 out_end:
1358         record_end_log(env, &llh);
1359 out_free:
1360         OBD_FREE_PTR(lmvdesc);
1361         RETURN(rc);
1362 }
1363
1364 /* lov is the first thing in the mdt and client logs */
1365 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1366                              struct fs_db *fsdb, struct mgs_target_info *mti,
1367                              char *logname, char *lovname)
1368 {
1369         struct llog_handle *llh = NULL;
1370         struct lov_desc *lovdesc;
1371         char *uuid;
1372         int rc = 0;
1373         ENTRY;
1374
1375         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1376
1377         /*
1378         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1379         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1380               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1381         */
1382
1383         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1384         OBD_ALLOC_PTR(lovdesc);
1385         if (lovdesc == NULL)
1386                 RETURN(-ENOMEM);
1387         lovdesc->ld_magic = LOV_DESC_MAGIC;
1388         lovdesc->ld_tgt_count = 0;
1389         /* Defaults.  Can be changed later by lcfg config_param */
1390         lovdesc->ld_default_stripe_count = 1;
1391         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1392         lovdesc->ld_default_stripe_size = 1024 * 1024;
1393         lovdesc->ld_default_stripe_offset = -1;
1394         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1395         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1396         /* can these be the same? */
1397         uuid = (char *)lovdesc->ld_uuid.uuid;
1398
1399         /* This should always be the first entry in a log.
1400         rc = mgs_clear_log(obd, logname); */
1401         rc = record_start_log(env, mgs, &llh, logname);
1402         if (rc)
1403                 GOTO(out_free, rc);
1404         /* FIXME these should be a single journal transaction */
1405         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1406         if (rc)
1407                 GOTO(out_end, rc);
1408         rc = record_attach(env, llh, lovname, "lov", uuid);
1409         if (rc)
1410                 GOTO(out_end, rc);
1411         rc = record_lov_setup(env, llh, lovname, lovdesc);
1412         if (rc)
1413                 GOTO(out_end, rc);
1414         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1415         if (rc)
1416                 GOTO(out_end, rc);
1417         EXIT;
1418 out_end:
1419         record_end_log(env, &llh);
1420 out_free:
1421         OBD_FREE_PTR(lovdesc);
1422         return rc;
1423 }
1424
1425 /* add failnids to open log */
1426 static int mgs_write_log_failnids(const struct lu_env *env,
1427                                   struct mgs_target_info *mti,
1428                                   struct llog_handle *llh,
1429                                   char *cliname)
1430 {
1431         char *failnodeuuid = NULL;
1432         char *ptr = mti->mti_params;
1433         lnet_nid_t nid;
1434         int rc = 0;
1435
1436         /*
1437         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1438         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1439         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1440         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1441         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1442         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1443         */
1444
1445         /* Pull failnid info out of params string */
1446         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1447                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1448                         if (failnodeuuid == NULL) {
1449                                 /* We don't know the failover node name,
1450                                    so just use the first nid as the uuid */
1451                                 rc = name_create(&failnodeuuid,
1452                                                  libcfs_nid2str(nid), "");
1453                                 if (rc)
1454                                         return rc;
1455                         }
1456                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1457                                "client %s\n", libcfs_nid2str(nid),
1458                                failnodeuuid, cliname);
1459                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1460                 }
1461                 if (failnodeuuid)
1462                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1463         }
1464
1465         name_destroy(&failnodeuuid);
1466         return rc;
1467 }
1468
1469 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1470                                     struct mgs_device *mgs,
1471                                     struct fs_db *fsdb,
1472                                     struct mgs_target_info *mti,
1473                                     char *logname, char *lmvname)
1474 {
1475         struct llog_handle *llh = NULL;
1476         char *mdcname = NULL;
1477         char *nodeuuid = NULL;
1478         char *mdcuuid = NULL;
1479         char *lmvuuid = NULL;
1480         char index[6];
1481         int i, rc;
1482         ENTRY;
1483
1484         if (mgs_log_is_empty(env, mgs, logname)) {
1485                 CERROR("log is empty! Logical error\n");
1486                 RETURN(-EINVAL);
1487         }
1488
1489         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1490                mti->mti_svname, logname, lmvname);
1491
1492         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1493         if (rc)
1494                 RETURN(rc);
1495         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1496         if (rc)
1497                 GOTO(out_free, rc);
1498         rc = name_create(&mdcuuid, mdcname, "_UUID");
1499         if (rc)
1500                 GOTO(out_free, rc);
1501         rc = name_create(&lmvuuid, lmvname, "_UUID");
1502         if (rc)
1503                 GOTO(out_free, rc);
1504
1505         rc = record_start_log(env, mgs, &llh, logname);
1506         if (rc)
1507                 GOTO(out_free, rc);
1508         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1509                            "add mdc");
1510         if (rc)
1511                 GOTO(out_end, rc);
1512         for (i = 0; i < mti->mti_nid_count; i++) {
1513                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1514                        libcfs_nid2str(mti->mti_nids[i]));
1515
1516                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1517                 if (rc)
1518                         GOTO(out_end, rc);
1519         }
1520
1521         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1522         if (rc)
1523                 GOTO(out_end, rc);
1524         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1525         if (rc)
1526                 GOTO(out_end, rc);
1527         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1528         if (rc)
1529                 GOTO(out_end, rc);
1530         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1531         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1532                             index, "1");
1533         if (rc)
1534                 GOTO(out_end, rc);
1535         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1536                            "add mdc");
1537         if (rc)
1538                 GOTO(out_end, rc);
1539 out_end:
1540         record_end_log(env, &llh);
1541 out_free:
1542         name_destroy(&lmvuuid);
1543         name_destroy(&mdcuuid);
1544         name_destroy(&mdcname);
1545         name_destroy(&nodeuuid);
1546         RETURN(rc);
1547 }
1548
1549 /* add new mdc to already existent MDS */
1550 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1551                                     struct mgs_device *mgs,
1552                                     struct fs_db *fsdb,
1553                                     struct mgs_target_info *mti,
1554                                     char *logname)
1555 {
1556         struct llog_handle *llh = NULL;
1557         char *nodeuuid = NULL;
1558         char *mdcname = NULL;
1559         char *mdcuuid = NULL;
1560         char *mdtuuid = NULL;
1561         int idx = mti->mti_stripe_index;
1562         char index[9];
1563         int i, rc;
1564
1565         ENTRY;
1566         if (mgs_log_is_empty(env, mgs, logname)) {
1567                 CERROR("log is empty! Logical error\n");
1568                 RETURN (-EINVAL);
1569         }
1570
1571         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
1572
1573         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1574         if (rc)
1575                 RETURN(rc);
1576         snprintf(index, sizeof(index), "-mdc%04x", idx);
1577         rc = name_create(&mdcname, logname, index);
1578         if (rc)
1579                 GOTO(out_free, rc);
1580         rc = name_create(&mdcuuid, mdcname, "_UUID");
1581         if (rc)
1582                 GOTO(out_free, rc);
1583         rc = name_create(&mdtuuid, logname, "_UUID");
1584         if (rc)
1585                 GOTO(out_free, rc);
1586
1587         rc = record_start_log(env, mgs, &llh, logname);
1588         if (rc)
1589                 GOTO(out_free, rc);
1590         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
1591         if (rc)
1592                 GOTO(out_end, rc);
1593         for (i = 0; i < mti->mti_nid_count; i++) {
1594                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1595                        libcfs_nid2str(mti->mti_nids[i]));
1596                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1597                 if (rc)
1598                         GOTO(out_end, rc);
1599         }
1600         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
1601         if (rc)
1602                 GOTO(out_end, rc);
1603         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1604         if (rc)
1605                 GOTO(out_end, rc);
1606         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1607         if (rc)
1608                 GOTO(out_end, rc);
1609         snprintf(index, sizeof(index), "%d", idx);
1610
1611         rc = record_mdc_add(env, llh, logname, mdcuuid, mti->mti_uuid,
1612                             index, "1");
1613         if (rc)
1614                 GOTO(out_end, rc);
1615         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add mdc");
1616         if (rc)
1617                 GOTO(out_end, rc);
1618 out_end:
1619         record_end_log(env, &llh);
1620 out_free:
1621         name_destroy(&mdtuuid);
1622         name_destroy(&mdcuuid);
1623         name_destroy(&mdcname);
1624         name_destroy(&nodeuuid);
1625         RETURN(rc);
1626 }
1627
1628 static int mgs_write_log_mdt0(const struct lu_env *env,
1629                               struct mgs_device *mgs,
1630                               struct fs_db *fsdb,
1631                               struct mgs_target_info *mti)
1632 {
1633         char *log = mti->mti_svname;
1634         struct llog_handle *llh = NULL;
1635         char *uuid, *lovname;
1636         char mdt_index[6];
1637         char *ptr = mti->mti_params;
1638         int rc = 0, failout = 0;
1639         ENTRY;
1640
1641         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
1642         if (uuid == NULL)
1643                 RETURN(-ENOMEM);
1644
1645         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1646                 failout = (strncmp(ptr, "failout", 7) == 0);
1647
1648         rc = name_create(&lovname, log, "-mdtlov");
1649         if (rc)
1650                 GOTO(out_free, rc);
1651         if (mgs_log_is_empty(env, mgs, log)) {
1652                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
1653                 if (rc)
1654                         GOTO(out_lod, rc);
1655         }
1656
1657         sprintf(mdt_index, "%d", mti->mti_stripe_index);
1658
1659         rc = record_start_log(env, mgs, &llh, log);
1660         if (rc)
1661                 GOTO(out_lod, rc);
1662
1663         /* add MDT itself */
1664
1665         /* FIXME this whole fn should be a single journal transaction */
1666         sprintf(uuid, "%s_UUID", log);
1667         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
1668         if (rc)
1669                 GOTO(out_lod, rc);
1670         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
1671         if (rc)
1672                 GOTO(out_end, rc);
1673         rc = record_mount_opt(env, llh, log, lovname, NULL);
1674         if (rc)
1675                 GOTO(out_end, rc);
1676         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
1677                         failout ? "n" : "f");
1678         if (rc)
1679                 GOTO(out_end, rc);
1680         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
1681         if (rc)
1682                 GOTO(out_end, rc);
1683 out_end:
1684         record_end_log(env, &llh);
1685 out_lod:
1686         name_destroy(&lovname);
1687 out_free:
1688         OBD_FREE(uuid, sizeof(struct obd_uuid));
1689         RETURN(rc);
1690 }
1691
1692 static inline int name_create_mdt(char **logname, char *fsname, int i)
1693 {
1694         char mdt_index[9];
1695
1696         sprintf(mdt_index, "-MDT%04x", i);
1697         return name_create(logname, fsname, mdt_index);
1698 }
1699
1700 static int name_create_mdt_and_lov(char **logname, char **lovname,
1701                                     struct fs_db *fsdb, int i)
1702 {
1703         int rc;
1704
1705         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
1706         if (rc)
1707                 return rc;
1708         /* COMPAT_180 */
1709         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1710                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
1711         else
1712                 rc = name_create(lovname, *logname, "-mdtlov");
1713         if (rc) {
1714                 name_destroy(logname);
1715                 *logname = NULL;
1716         }
1717         return rc;
1718 }
1719
1720 static inline int name_create_mdt_osc(char **oscname, char *ostname,
1721                                        struct fs_db *fsdb, int i)
1722 {
1723         char suffix[16];
1724
1725         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1726                 sprintf(suffix, "-osc");
1727         else
1728                 sprintf(suffix, "-osc-MDT%04x", i);
1729         return name_create(oscname, ostname, suffix);
1730 }
1731
1732 /* envelope method for all layers log */
1733 static int mgs_write_log_mdt(const struct lu_env *env,
1734                              struct mgs_device *mgs,
1735                              struct fs_db *fsdb,
1736                              struct mgs_target_info *mti)
1737 {
1738         struct mgs_thread_info *mgi = mgs_env_info(env);
1739         struct llog_handle *llh = NULL;
1740         char *cliname;
1741         int rc, i = 0;
1742         ENTRY;
1743
1744         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
1745
1746         if (mti->mti_uuid[0] == '\0') {
1747                 /* Make up our own uuid */
1748                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1749                          "%s_UUID", mti->mti_svname);
1750         }
1751
1752         /* add mdt */
1753         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
1754         if (rc)
1755                 RETURN(rc);
1756         /* Append the mdt info to the client log */
1757         rc = name_create(&cliname, mti->mti_fsname, "-client");
1758         if (rc)
1759                 RETURN(rc);
1760
1761         if (mgs_log_is_empty(env, mgs, cliname)) {
1762                 /* Start client log */
1763                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
1764                                        fsdb->fsdb_clilov);
1765                 if (rc)
1766                         GOTO(out_free, rc);
1767                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
1768                                        fsdb->fsdb_clilmv);
1769                 if (rc)
1770                         GOTO(out_free, rc);
1771         }
1772
1773         /*
1774         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1775         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
1776         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
1777         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1778         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
1779         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
1780         */
1781
1782                 /* copy client info about lov/lmv */
1783                 mgi->mgi_comp.comp_mti = mti;
1784                 mgi->mgi_comp.comp_fsdb = fsdb;
1785
1786                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
1787                                                         &mgi->mgi_comp);
1788                 if (rc)
1789                         GOTO(out_free, rc);
1790                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
1791                                               fsdb->fsdb_clilmv);
1792                 if (rc)
1793                         GOTO(out_free, rc);
1794
1795                 /* add mountopts */
1796                 rc = record_start_log(env, mgs, &llh, cliname);
1797                 if (rc)
1798                         GOTO(out_free, rc);
1799
1800                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
1801                                    "mount opts");
1802                 if (rc)
1803                         GOTO(out_end, rc);
1804                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
1805                                       fsdb->fsdb_clilmv);
1806                 if (rc)
1807                         GOTO(out_end, rc);
1808                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
1809                                    "mount opts");
1810
1811         if (rc)
1812                 GOTO(out_end, rc);
1813         /* for_all_existing_mdt except current one */
1814         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1815                 char *mdtname;
1816                 if (i !=  mti->mti_stripe_index &&
1817                     cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1818                         rc = name_create_mdt(&mdtname, mti->mti_fsname, i);
1819                         if (rc)
1820                                 GOTO(out_end, rc);
1821                         rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, mti, mdtname);
1822                         name_destroy(&mdtname);
1823                         if (rc)
1824                                 GOTO(out_end, rc);
1825                 }
1826         }
1827 out_end:
1828         record_end_log(env, &llh);
1829 out_free:
1830         name_destroy(&cliname);
1831         RETURN(rc);
1832 }
1833
1834 /* Add the ost info to the client/mdt lov */
1835 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1836                                     struct mgs_device *mgs, struct fs_db *fsdb,
1837                                     struct mgs_target_info *mti,
1838                                     char *logname, char *suffix, char *lovname,
1839                                     enum lustre_sec_part sec_part, int flags)
1840 {
1841         struct llog_handle *llh = NULL;
1842         char *nodeuuid = NULL;
1843         char *oscname = NULL;
1844         char *oscuuid = NULL;
1845         char *lovuuid = NULL;
1846         char *svname = NULL;
1847         char index[6];
1848         int i, rc;
1849
1850         ENTRY;
1851         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
1852                mti->mti_svname, logname);
1853
1854         if (mgs_log_is_empty(env, mgs, logname)) {
1855                 CERROR("log is empty! Logical error\n");
1856                 RETURN (-EINVAL);
1857         }
1858
1859         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1860         if (rc)
1861                 RETURN(rc);
1862         rc = name_create(&svname, mti->mti_svname, "-osc");
1863         if (rc)
1864                 GOTO(out_free, rc);
1865         rc = name_create(&oscname, svname, suffix);
1866         if (rc)
1867                 GOTO(out_free, rc);
1868         rc = name_create(&oscuuid, oscname, "_UUID");
1869         if (rc)
1870                 GOTO(out_free, rc);
1871         rc = name_create(&lovuuid, lovname, "_UUID");
1872         if (rc)
1873                 GOTO(out_free, rc);
1874
1875         /*
1876         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1877         multihomed (#4)
1878         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
1879         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
1880         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
1881         failover (#6,7)
1882         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1883         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
1884         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
1885         */
1886
1887         rc = record_start_log(env, mgs, &llh, logname);
1888         if (rc)
1889                 GOTO(out_free, rc);
1890         /* FIXME these should be a single journal transaction */
1891         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
1892                            "add osc");
1893         if (rc)
1894                 GOTO(out_end, rc);
1895         for (i = 0; i < mti->mti_nid_count; i++) {
1896                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
1897                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1898                 if (rc)
1899                         GOTO(out_end, rc);
1900         }
1901         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
1902         if (rc)
1903                 GOTO(out_end, rc);
1904         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
1905         if (rc)
1906                 GOTO(out_end, rc);
1907         rc = mgs_write_log_failnids(env, mti, llh, oscname);
1908         if (rc)
1909                 GOTO(out_end, rc);
1910         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1911         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
1912         if (rc)
1913                 GOTO(out_end, rc);
1914         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
1915                            "add osc");
1916         if (rc)
1917                 GOTO(out_end, rc);
1918 out_end:
1919         record_end_log(env, &llh);
1920 out_free:
1921         name_destroy(&lovuuid);
1922         name_destroy(&oscuuid);
1923         name_destroy(&oscname);
1924         name_destroy(&svname);
1925         name_destroy(&nodeuuid);
1926         RETURN(rc);
1927 }
1928
1929 static int mgs_write_log_ost(const struct lu_env *env,
1930                              struct mgs_device *mgs, struct fs_db *fsdb,
1931                              struct mgs_target_info *mti)
1932 {
1933         struct llog_handle *llh = NULL;
1934         char *logname, *lovname;
1935         char *ptr = mti->mti_params;
1936         int rc, flags = 0, failout = 0, i;
1937         ENTRY;
1938
1939         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
1940
1941         /* The ost startup log */
1942
1943         /* If the ost log already exists, that means that someone reformatted
1944            the ost and it called target_add again. */
1945         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
1946                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
1947                                    "exists, yet the server claims it never "
1948                                    "registered. It may have been reformatted, "
1949                                    "or the index changed. writeconf the MDT to "
1950                                    "regenerate all logs.\n", mti->mti_svname);
1951                 RETURN(-EALREADY);
1952         }
1953
1954         /*
1955         attach obdfilter ost1 ost1_UUID
1956         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
1957         */
1958         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1959                 failout = (strncmp(ptr, "failout", 7) == 0);
1960         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
1961         if (rc)
1962                 RETURN(rc);
1963         /* FIXME these should be a single journal transaction */
1964         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
1965         if (rc)
1966                 GOTO(out_end, rc);
1967         if (*mti->mti_uuid == '\0')
1968                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1969                          "%s_UUID", mti->mti_svname);
1970         rc = record_attach(env, llh, mti->mti_svname,
1971                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
1972         if (rc)
1973                 GOTO(out_end, rc);
1974         rc = record_setup(env, llh, mti->mti_svname,
1975                           "dev"/*ignored*/, "type"/*ignored*/,
1976                           failout ? "n" : "f", 0/*options*/);
1977         if (rc)
1978                 GOTO(out_end, rc);
1979         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
1980         if (rc)
1981                 GOTO(out_end, rc);
1982 out_end:
1983         record_end_log(env, &llh);
1984         if (rc)
1985                 RETURN(rc);
1986         /* We also have to update the other logs where this osc is part of
1987            the lov */
1988
1989         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
1990                 /* If we're upgrading, the old mdt log already has our
1991                    entry. Let's do a fake one for fun. */
1992                 /* Note that we can't add any new failnids, since we don't
1993                    know the old osc names. */
1994                 flags = CM_SKIP | CM_UPGRADE146;
1995
1996         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
1997                 /* If the update flag isn't set, don't update client/mdt
1998                    logs. */
1999                 flags |= CM_SKIP;
2000                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2001                               "the MDT first to regenerate it.\n",
2002                               mti->mti_svname);
2003         }
2004
2005         /* Add ost to all MDT lov defs */
2006         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2007                 if (cfs_test_bit(i, fsdb->fsdb_mdt_index_map)) {
2008                         char mdt_index[9];
2009
2010                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2011                                                      i);
2012                         if (rc)
2013                                 RETURN(rc);
2014                         sprintf(mdt_index, "-MDT%04x", i);
2015                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2016                                                       logname, mdt_index,
2017                                                       lovname, LUSTRE_SP_MDT,
2018                                                       flags);
2019                         name_destroy(&logname);
2020                         name_destroy(&lovname);
2021                         if (rc)
2022                                 RETURN(rc);
2023                 }
2024         }
2025
2026         /* Append ost info to the client log */
2027         rc = name_create(&logname, mti->mti_fsname, "-client");
2028         if (rc)
2029                 RETURN(rc);
2030         if (mgs_log_is_empty(env, mgs, logname)) {
2031                 /* Start client log */
2032                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2033                                        fsdb->fsdb_clilov);
2034                 if (rc)
2035                         GOTO(out_free, rc);
2036                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2037                                        fsdb->fsdb_clilmv);
2038                 if (rc)
2039                         GOTO(out_free, rc);
2040         }
2041         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2042                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2043 out_free:
2044         name_destroy(&logname);
2045         RETURN(rc);
2046 }
2047
2048 static __inline__ int mgs_param_empty(char *ptr)
2049 {
2050         char *tmp;
2051
2052         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2053                 return 1;
2054         return 0;
2055 }
2056
2057 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2058                                           struct mgs_device *mgs,
2059                                           struct fs_db *fsdb,
2060                                           struct mgs_target_info *mti,
2061                                           char *logname, char *cliname)
2062 {
2063         int rc;
2064         struct llog_handle *llh = NULL;
2065
2066         if (mgs_param_empty(mti->mti_params)) {
2067                 /* Remove _all_ failnids */
2068                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2069                                 mti->mti_svname, "add failnid", CM_SKIP);
2070                 return rc < 0 ? rc : 0;
2071         }
2072
2073         /* Otherwise failover nids are additive */
2074         rc = record_start_log(env, mgs, &llh, logname);
2075         if (rc)
2076                 return rc;
2077                 /* FIXME this should be a single journal transaction */
2078         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2079                            "add failnid");
2080         if (rc)
2081                 goto out_end;
2082         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2083         if (rc)
2084                 goto out_end;
2085         rc = record_marker(env, llh, fsdb, CM_END,
2086                            mti->mti_svname, "add failnid");
2087 out_end:
2088         record_end_log(env, &llh);
2089         return rc;
2090 }
2091
2092
2093 /* Add additional failnids to an existing log.
2094    The mdc/osc must have been added to logs first */
2095 /* tcp nids must be in dotted-quad ascii -
2096    we can't resolve hostnames from the kernel. */
2097 static int mgs_write_log_add_failnid(const struct lu_env *env,
2098                                      struct mgs_device *mgs,
2099                                      struct fs_db *fsdb,
2100                                      struct mgs_target_info *mti)
2101 {
2102         char *logname, *cliname;
2103         int rc;
2104         ENTRY;
2105
2106         /* FIXME we currently can't erase the failnids
2107          * given when a target first registers, since they aren't part of
2108          * an "add uuid" stanza */
2109
2110         /* Verify that we know about this target */
2111         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2112                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2113                                    "yet. It must be started before failnids "
2114                                    "can be added.\n", mti->mti_svname);
2115                 RETURN(-ENOENT);
2116         }
2117
2118         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2119         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2120                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2121         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2122                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2123         } else {
2124                 RETURN(-EINVAL);
2125         }
2126         if (rc)
2127                 RETURN(rc);
2128         /* Add failover nids to the client log */
2129         rc = name_create(&logname, mti->mti_fsname, "-client");
2130         if (rc) {
2131                 name_destroy(&cliname);
2132                 RETURN(rc);
2133         }
2134         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2135         name_destroy(&logname);
2136         name_destroy(&cliname);
2137         if (rc)
2138                 RETURN(rc);
2139
2140         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2141                 /* Add OST failover nids to the MDT logs as well */
2142                 int i;
2143
2144                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2145                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2146                                 continue;
2147                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2148                         if (rc)
2149                                 RETURN(rc);
2150                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2151                                                  fsdb, i);
2152                         if (rc) {
2153                                 name_destroy(&logname);
2154                                 RETURN(rc);
2155                         }
2156                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2157                                                             mti, logname,
2158                                                             cliname);
2159                         name_destroy(&cliname);
2160                         name_destroy(&logname);
2161                         if (rc)
2162                                 RETURN(rc);
2163                 }
2164         }
2165
2166         RETURN(rc);
2167 }
2168
2169 static int mgs_wlp_lcfg(const struct lu_env *env,
2170                         struct mgs_device *mgs, struct fs_db *fsdb,
2171                         struct mgs_target_info *mti,
2172                         char *logname, struct lustre_cfg_bufs *bufs,
2173                         char *tgtname, char *ptr)
2174 {
2175         char comment[MTI_NAME_MAXLEN];
2176         char *tmp;
2177         struct lustre_cfg *lcfg;
2178         int rc, del;
2179
2180         /* Erase any old settings of this same parameter */
2181         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2182         comment[MTI_NAME_MAXLEN - 1] = 0;
2183         /* But don't try to match the value. */
2184         if ((tmp = strchr(comment, '=')))
2185             *tmp = 0;
2186         /* FIXME we should skip settings that are the same as old values */
2187         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2188         if (rc < 0)
2189                 return rc;
2190         del = mgs_param_empty(ptr);
2191
2192         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2193                       "Sett" : "Modify", tgtname, comment, logname);
2194         if (del)
2195                 return rc;
2196
2197         lustre_cfg_bufs_reset(bufs, tgtname);
2198         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2199         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2200         if (!lcfg)
2201                 return -ENOMEM;
2202         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2203         lustre_cfg_free(lcfg);
2204         return rc;
2205 }
2206
2207 /* write global variable settings into log */
2208 static int mgs_write_log_sys(const struct lu_env *env,
2209                              struct mgs_device *mgs, struct fs_db *fsdb,
2210                              struct mgs_target_info *mti, char *sys, char *ptr)
2211 {
2212         struct mgs_thread_info *mgi = mgs_env_info(env);
2213         struct lustre_cfg *lcfg;
2214         char *tmp, sep;
2215         int rc, cmd, convert = 1;
2216
2217         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2218                 cmd = LCFG_SET_TIMEOUT;
2219         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2220                 cmd = LCFG_SET_LDLM_TIMEOUT;
2221         /* Check for known params here so we can return error to lctl */
2222         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2223                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2224                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2225                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2226                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2227                 cmd = LCFG_PARAM;
2228         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2229                 convert = 0; /* Don't convert string value to integer */
2230                 cmd = LCFG_PARAM;
2231         } else {
2232                 return -EINVAL;
2233         }
2234
2235         if (mgs_param_empty(ptr))
2236                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2237         else
2238                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2239
2240         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2241         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2242         if (!convert && *tmp != '\0')
2243                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2244         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2245         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2246         /* truncate the comment to the parameter name */
2247         ptr = tmp - 1;
2248         sep = *ptr;
2249         *ptr = '\0';
2250         /* modify all servers and clients */
2251         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2252                                       *tmp == '\0' ? NULL : lcfg,
2253                                       mti->mti_fsname, sys, 0);
2254         if (rc == 0 && *tmp != '\0') {
2255                 switch (cmd) {
2256                 case LCFG_SET_TIMEOUT:
2257                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2258                                 class_process_config(lcfg);
2259                         break;
2260                 case LCFG_SET_LDLM_TIMEOUT:
2261                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2262                                 class_process_config(lcfg);
2263                         break;
2264                 default:
2265                         break;
2266                 }
2267         }
2268         *ptr = sep;
2269         lustre_cfg_free(lcfg);
2270         return rc;
2271 }
2272
2273 /* write quota settings into log */
2274 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2275                                struct fs_db *fsdb, struct mgs_target_info *mti,
2276                                char *quota, char *ptr)
2277 {
2278         struct mgs_thread_info  *mgi = mgs_env_info(env);
2279         struct lustre_cfg       *lcfg;
2280         char                    *tmp;
2281         char                     sep;
2282         int                      rc, cmd = LCFG_PARAM;
2283
2284         /* support only 'meta' and 'data' pools so far */
2285         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2286             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2287                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2288                        "& quota.ost are)\n", ptr);
2289                 return -EINVAL;
2290         }
2291
2292         if (*tmp == '\0') {
2293                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2294         } else {
2295                 CDEBUG(D_MGS, "global '%s'\n", quota);
2296
2297                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2298                     strcmp(tmp, "none") != 0) {
2299                         CERROR("enable option(%s) isn't supported\n", tmp);
2300                         return -EINVAL;
2301                 }
2302         }
2303
2304         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2305         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2306         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2307         /* truncate the comment to the parameter name */
2308         ptr = tmp - 1;
2309         sep = *ptr;
2310         *ptr = '\0';
2311
2312         /* XXX we duplicated quota enable information in all server
2313          *     config logs, it should be moved to a separate config
2314          *     log once we cleanup the config log for global param. */
2315         /* modify all servers */
2316         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2317                                       *tmp == '\0' ? NULL : lcfg,
2318                                       mti->mti_fsname, quota, 1);
2319         *ptr = sep;
2320         lustre_cfg_free(lcfg);
2321         return rc;
2322 }
2323
2324 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2325                                    struct mgs_device *mgs,
2326                                    struct fs_db *fsdb,
2327                                    struct mgs_target_info *mti,
2328                                    char *param)
2329 {
2330         struct mgs_thread_info *mgi = mgs_env_info(env);
2331         struct llog_handle     *llh = NULL;
2332         char                   *logname;
2333         char                   *comment, *ptr;
2334         struct lustre_cfg      *lcfg;
2335         int                     rc, len;
2336         ENTRY;
2337
2338         /* get comment */
2339         ptr = strchr(param, '=');
2340         LASSERT(ptr);
2341         len = ptr - param;
2342
2343         OBD_ALLOC(comment, len + 1);
2344         if (comment == NULL)
2345                 RETURN(-ENOMEM);
2346         strncpy(comment, param, len);
2347         comment[len] = '\0';
2348
2349         /* prepare lcfg */
2350         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2351         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2352         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2353         if (lcfg == NULL)
2354                 GOTO(out_comment, rc = -ENOMEM);
2355
2356         /* construct log name */
2357         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2358         if (rc)
2359                 GOTO(out_lcfg, rc);
2360
2361         if (mgs_log_is_empty(env, mgs, logname)) {
2362                 rc = record_start_log(env, mgs, &llh, logname);
2363                 if (rc)
2364                         GOTO(out, rc);
2365                 record_end_log(env, &llh);
2366         }
2367
2368         /* obsolete old one */
2369         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2370                         comment, CM_SKIP);
2371         if (rc < 0)
2372                 GOTO(out, rc);
2373         /* write the new one */
2374         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2375                                   mti->mti_svname, comment);
2376         if (rc)
2377                 CERROR("err %d writing log %s\n", rc, logname);
2378 out:
2379         name_destroy(&logname);
2380 out_lcfg:
2381         lustre_cfg_free(lcfg);
2382 out_comment:
2383         OBD_FREE(comment, len + 1);
2384         RETURN(rc);
2385 }
2386
2387 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2388                                         char *param)
2389 {
2390         char    *ptr;
2391
2392         /* disable the adjustable udesc parameter for now, i.e. use default
2393          * setting that client always ship udesc to MDT if possible. to enable
2394          * it simply remove the following line */
2395         goto error_out;
2396
2397         ptr = strchr(param, '=');
2398         if (ptr == NULL)
2399                 goto error_out;
2400         *ptr++ = '\0';
2401
2402         if (strcmp(param, PARAM_SRPC_UDESC))
2403                 goto error_out;
2404
2405         if (strcmp(ptr, "yes") == 0) {
2406                 cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2407                 CWARN("Enable user descriptor shipping from client to MDT\n");
2408         } else if (strcmp(ptr, "no") == 0) {
2409                 cfs_clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2410                 CWARN("Disable user descriptor shipping from client to MDT\n");
2411         } else {
2412                 *(ptr - 1) = '=';
2413                 goto error_out;
2414         }
2415         return 0;
2416
2417 error_out:
2418         CERROR("Invalid param: %s\n", param);
2419         return -EINVAL;
2420 }
2421
2422 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2423                                   const char *svname,
2424                                   char *param)
2425 {
2426         struct sptlrpc_rule      rule;
2427         struct sptlrpc_rule_set *rset;
2428         int                      rc;
2429         ENTRY;
2430
2431         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2432                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2433                 RETURN(-EINVAL);
2434         }
2435
2436         if (strncmp(param, PARAM_SRPC_UDESC,
2437                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2438                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2439         }
2440
2441         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2442                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2443                 RETURN(-EINVAL);
2444         }
2445
2446         param += sizeof(PARAM_SRPC_FLVR) - 1;
2447
2448         rc = sptlrpc_parse_rule(param, &rule);
2449         if (rc)
2450                 RETURN(rc);
2451
2452         /* mgs rules implies must be mgc->mgs */
2453         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2454                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2455                      rule.sr_from != LUSTRE_SP_ANY) ||
2456                     (rule.sr_to != LUSTRE_SP_MGS &&
2457                      rule.sr_to != LUSTRE_SP_ANY))
2458                         RETURN(-EINVAL);
2459         }
2460
2461         /* preapre room for this coming rule. svcname format should be:
2462          * - fsname: general rule
2463          * - fsname-tgtname: target-specific rule
2464          */
2465         if (strchr(svname, '-')) {
2466                 struct mgs_tgt_srpc_conf *tgtconf;
2467                 int                       found = 0;
2468
2469                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2470                      tgtconf = tgtconf->mtsc_next) {
2471                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2472                                 found = 1;
2473                                 break;
2474                         }
2475                 }
2476
2477                 if (!found) {
2478                         int name_len;
2479
2480                         OBD_ALLOC_PTR(tgtconf);
2481                         if (tgtconf == NULL)
2482                                 RETURN(-ENOMEM);
2483
2484                         name_len = strlen(svname);
2485
2486                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
2487                         if (tgtconf->mtsc_tgt == NULL) {
2488                                 OBD_FREE_PTR(tgtconf);
2489                                 RETURN(-ENOMEM);
2490                         }
2491                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
2492
2493                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
2494                         fsdb->fsdb_srpc_tgt = tgtconf;
2495                 }
2496
2497                 rset = &tgtconf->mtsc_rset;
2498         } else {
2499                 rset = &fsdb->fsdb_srpc_gen;
2500         }
2501
2502         rc = sptlrpc_rule_set_merge(rset, &rule);
2503
2504         RETURN(rc);
2505 }
2506
2507 static int mgs_srpc_set_param(const struct lu_env *env,
2508                               struct mgs_device *mgs,
2509                               struct fs_db *fsdb,
2510                               struct mgs_target_info *mti,
2511                               char *param)
2512 {
2513         char                   *copy;
2514         int                     rc, copy_size;
2515         ENTRY;
2516
2517 #ifndef HAVE_GSS
2518         RETURN(-EINVAL);
2519 #endif
2520         /* keep a copy of original param, which could be destroied
2521          * during parsing */
2522         copy_size = strlen(param) + 1;
2523         OBD_ALLOC(copy, copy_size);
2524         if (copy == NULL)
2525                 return -ENOMEM;
2526         memcpy(copy, param, copy_size);
2527
2528         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
2529         if (rc)
2530                 goto out_free;
2531
2532         /* previous steps guaranteed the syntax is correct */
2533         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
2534         if (rc)
2535                 goto out_free;
2536
2537         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2538                 /*
2539                  * for mgs rules, make them effective immediately.
2540                  */
2541                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
2542                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
2543                                                  &fsdb->fsdb_srpc_gen);
2544         }
2545
2546 out_free:
2547         OBD_FREE(copy, copy_size);
2548         RETURN(rc);
2549 }
2550
2551 struct mgs_srpc_read_data {
2552         struct fs_db   *msrd_fsdb;
2553         int             msrd_skip;
2554 };
2555
2556 static int mgs_srpc_read_handler(const struct lu_env *env,
2557                                  struct llog_handle *llh,
2558                                  struct llog_rec_hdr *rec, void *data)
2559 {
2560         struct mgs_srpc_read_data *msrd = data;
2561         struct cfg_marker         *marker;
2562         struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
2563         char                      *svname, *param;
2564         int                        cfg_len, rc;
2565         ENTRY;
2566
2567         if (rec->lrh_type != OBD_CFG_REC) {
2568                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2569                 RETURN(-EINVAL);
2570         }
2571
2572         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
2573                   sizeof(struct llog_rec_tail);
2574
2575         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2576         if (rc) {
2577                 CERROR("Insane cfg\n");
2578                 RETURN(rc);
2579         }
2580
2581         if (lcfg->lcfg_command == LCFG_MARKER) {
2582                 marker = lustre_cfg_buf(lcfg, 1);
2583
2584                 if (marker->cm_flags & CM_START &&
2585                     marker->cm_flags & CM_SKIP)
2586                         msrd->msrd_skip = 1;
2587                 if (marker->cm_flags & CM_END)
2588                         msrd->msrd_skip = 0;
2589
2590                 RETURN(0);
2591         }
2592
2593         if (msrd->msrd_skip)
2594                 RETURN(0);
2595
2596         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
2597                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
2598                 RETURN(0);
2599         }
2600
2601         svname = lustre_cfg_string(lcfg, 0);
2602         if (svname == NULL) {
2603                 CERROR("svname is empty\n");
2604                 RETURN(0);
2605         }
2606
2607         param = lustre_cfg_string(lcfg, 1);
2608         if (param == NULL) {
2609                 CERROR("param is empty\n");
2610                 RETURN(0);
2611         }
2612
2613         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
2614         if (rc)
2615                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
2616
2617         RETURN(0);
2618 }
2619
2620 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
2621                                 struct mgs_device *mgs,
2622                                 struct fs_db *fsdb)
2623 {
2624         struct llog_handle        *llh = NULL;
2625         struct llog_ctxt          *ctxt;
2626         char                      *logname;
2627         struct mgs_srpc_read_data  msrd;
2628         int                        rc;
2629         ENTRY;
2630
2631         /* construct log name */
2632         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
2633         if (rc)
2634                 RETURN(rc);
2635
2636         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2637         LASSERT(ctxt != NULL);
2638
2639         if (mgs_log_is_empty(env, mgs, logname))
2640                 GOTO(out, rc = 0);
2641
2642         rc = llog_open(env, ctxt, &llh, NULL, logname,
2643                        LLOG_OPEN_EXISTS);
2644         if (rc < 0) {
2645                 if (rc == -ENOENT)
2646                         rc = 0;
2647                 GOTO(out, rc);
2648         }
2649
2650         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
2651         if (rc)
2652                 GOTO(out_close, rc);
2653
2654         if (llog_get_size(llh) <= 1)
2655                 GOTO(out_close, rc = 0);
2656
2657         msrd.msrd_fsdb = fsdb;
2658         msrd.msrd_skip = 0;
2659
2660         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
2661                           NULL);
2662
2663 out_close:
2664         llog_close(env, llh);
2665 out:
2666         llog_ctxt_put(ctxt);
2667         name_destroy(&logname);
2668
2669         if (rc)
2670                 CERROR("failed to read sptlrpc config database: %d\n", rc);
2671         RETURN(rc);
2672 }
2673
2674 /* Permanent settings of all parameters by writing into the appropriate
2675  * configuration logs.
2676  * A parameter with null value ("<param>='\0'") means to erase it out of
2677  * the logs.
2678  */
2679 static int mgs_write_log_param(const struct lu_env *env,
2680                                struct mgs_device *mgs, struct fs_db *fsdb,
2681                                struct mgs_target_info *mti, char *ptr)
2682 {
2683         struct mgs_thread_info *mgi = mgs_env_info(env);
2684         char *logname;
2685         char *tmp;
2686         int rc = 0, rc2 = 0;
2687         ENTRY;
2688
2689         /* For various parameter settings, we have to figure out which logs
2690            care about them (e.g. both mdt and client for lov settings) */
2691         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2692
2693         /* The params are stored in MOUNT_DATA_FILE and modified via
2694            tunefs.lustre, or set using lctl conf_param */
2695
2696         /* Processed in lustre_start_mgc */
2697         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
2698                 GOTO(end, rc);
2699
2700         /* Processed in ost/mdt */
2701         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
2702                 GOTO(end, rc);
2703
2704         /* Processed in mgs_write_log_ost */
2705         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
2706                 if (mti->mti_flags & LDD_F_PARAM) {
2707                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
2708                                            "changed with tunefs.lustre"
2709                                            "and --writeconf\n", ptr);
2710                         rc = -EPERM;
2711                 }
2712                 GOTO(end, rc);
2713         }
2714
2715         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
2716                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
2717                 GOTO(end, rc);
2718         }
2719
2720         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
2721                 /* Add a failover nidlist */
2722                 rc = 0;
2723                 /* We already processed failovers params for new
2724                    targets in mgs_write_log_target */
2725                 if (mti->mti_flags & LDD_F_PARAM) {
2726                         CDEBUG(D_MGS, "Adding failnode\n");
2727                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
2728                 }
2729                 GOTO(end, rc);
2730         }
2731
2732         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
2733                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
2734                 GOTO(end, rc);
2735         }
2736
2737         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
2738                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
2739                 GOTO(end, rc);
2740         }
2741
2742         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
2743                 /* active=0 means off, anything else means on */
2744                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
2745                 int i;
2746
2747                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2748                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
2749                                            "be (de)activated.\n",
2750                                            mti->mti_svname);
2751                         GOTO(end, rc = -EINVAL);
2752                 }
2753                 LCONSOLE_WARN("Permanently %sactivating %s\n",
2754                               flag ? "de": "re", mti->mti_svname);
2755                 /* Modify clilov */
2756                 rc = name_create(&logname, mti->mti_fsname, "-client");
2757                 if (rc)
2758                         GOTO(end, rc);
2759                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2760                                 mti->mti_svname, "add osc", flag);
2761                 name_destroy(&logname);
2762                 if (rc)
2763                         goto active_err;
2764                 /* Modify mdtlov */
2765                 /* Add to all MDT logs for CMD */
2766                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2767                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2768                                 continue;
2769                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2770                         if (rc)
2771                                 GOTO(end, rc);
2772                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
2773                                         mti->mti_svname, "add osc", flag);
2774                         name_destroy(&logname);
2775                         if (rc)
2776                                 goto active_err;
2777                 }
2778         active_err:
2779                 if (rc) {
2780                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
2781                                            "log (%d). No permanent "
2782                                            "changes were made to the "
2783                                            "config log.\n",
2784                                            mti->mti_svname, rc);
2785                         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
2786                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
2787                                                    " because the log"
2788                                                    "is in the old 1.4"
2789                                                    "style. Consider "
2790                                                    " --writeconf to "
2791                                                    "update the logs.\n");
2792                         GOTO(end, rc);
2793                 }
2794                 /* Fall through to osc proc for deactivating live OSC
2795                    on running MDT / clients. */
2796         }
2797         /* Below here, let obd's XXX_process_config methods handle it */
2798
2799         /* All lov. in proc */
2800         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
2801                 char *mdtlovname;
2802
2803                 CDEBUG(D_MGS, "lov param %s\n", ptr);
2804                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
2805                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
2806                                            "set on the MDT, not %s. "
2807                                            "Ignoring.\n",
2808                                            mti->mti_svname);
2809                         GOTO(end, rc = 0);
2810                 }
2811
2812                 /* Modify mdtlov */
2813                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2814                         GOTO(end, rc = -ENODEV);
2815
2816                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
2817                                              mti->mti_stripe_index);
2818                 if (rc)
2819                         GOTO(end, rc);
2820                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2821                                   &mgi->mgi_bufs, mdtlovname, ptr);
2822                 name_destroy(&logname);
2823                 name_destroy(&mdtlovname);
2824                 if (rc)
2825                         GOTO(end, rc);
2826
2827                 /* Modify clilov */
2828                 rc = name_create(&logname, mti->mti_fsname, "-client");
2829                 if (rc)
2830                         GOTO(end, rc);
2831                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2832                                   fsdb->fsdb_clilov, ptr);
2833                 name_destroy(&logname);
2834                 GOTO(end, rc);
2835         }
2836
2837         /* All osc., mdc., llite. params in proc */
2838         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
2839             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
2840             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
2841                 char *cname;
2842
2843                 if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2844                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
2845                                            " cannot be modified. Consider"
2846                                            " updating the configuration with"
2847                                            " --writeconf\n",
2848                                            mti->mti_svname);
2849                         GOTO(end, rc = -EINVAL);
2850                 }
2851                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
2852                         rc = name_create(&cname, mti->mti_fsname, "-client");
2853                         /* Add the client type to match the obdname in
2854                            class_config_llog_handler */
2855                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2856                         rc = name_create(&cname, mti->mti_svname, "-mdc");
2857                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2858                         rc = name_create(&cname, mti->mti_svname, "-osc");
2859                 } else {
2860                         GOTO(end, rc = -EINVAL);
2861                 }
2862                 if (rc)
2863                         GOTO(end, rc);
2864
2865                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2866
2867                 /* Modify client */
2868                 rc = name_create(&logname, mti->mti_fsname, "-client");
2869                 if (rc) {
2870                         name_destroy(&cname);
2871                         GOTO(end, rc);
2872                 }
2873                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2874                                   cname, ptr);
2875
2876                 /* osc params affect the MDT as well */
2877                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2878                         int i;
2879
2880                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2881                                 if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2882                                         continue;
2883                                 name_destroy(&cname);
2884                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
2885                                                          fsdb, i);
2886                                 name_destroy(&logname);
2887                                 if (rc)
2888                                         break;
2889                                 rc = name_create_mdt(&logname,
2890                                                      mti->mti_fsname, i);
2891                                 if (rc)
2892                                         break;
2893                                 if (!mgs_log_is_empty(env, mgs, logname)) {
2894                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
2895                                                           mti, logname,
2896                                                           &mgi->mgi_bufs,
2897                                                           cname, ptr);
2898                                         if (rc)
2899                                                 break;
2900                                 }
2901                         }
2902                 }
2903                 name_destroy(&logname);
2904                 name_destroy(&cname);
2905                 GOTO(end, rc);
2906         }
2907
2908         /* All mdt. params in proc */
2909         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
2910                 int i;
2911                 __u32 idx;
2912
2913                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2914                 if (strncmp(mti->mti_svname, mti->mti_fsname,
2915                             MTI_NAME_MAXLEN) == 0)
2916                         /* device is unspecified completely? */
2917                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
2918                 else
2919                         rc = server_name2index(mti->mti_svname, &idx, NULL);
2920                 if (rc < 0)
2921                         goto active_err;
2922                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
2923                         goto active_err;
2924                 if (rc & LDD_F_SV_ALL) {
2925                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2926                                 if (!cfs_test_bit(i,
2927                                                   fsdb->fsdb_mdt_index_map))
2928                                         continue;
2929                                 rc = name_create_mdt(&logname,
2930                                                 mti->mti_fsname, i);
2931                                 if (rc)
2932                                         goto active_err;
2933                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2934                                                   logname, &mgi->mgi_bufs,
2935                                                   logname, ptr);
2936                                 name_destroy(&logname);
2937                                 if (rc)
2938                                         goto active_err;
2939                         }
2940                 } else {
2941                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2942                                           mti->mti_svname, &mgi->mgi_bufs,
2943                                           mti->mti_svname, ptr);
2944                         if (rc)
2945                                 goto active_err;
2946                 }
2947                 GOTO(end, rc);
2948         }
2949
2950         /* All mdd., ost. params in proc */
2951         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
2952             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
2953                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2954                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2955                         GOTO(end, rc = -ENODEV);
2956
2957                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2958                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
2959                 GOTO(end, rc);
2960         }
2961
2962         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
2963         rc2 = -ENOSYS;
2964
2965 end:
2966         if (rc)
2967                 CERROR("err %d on param '%s'\n", rc, ptr);
2968
2969         RETURN(rc ?: rc2);
2970 }
2971
2972 /* Not implementing automatic failover nid addition at this time. */
2973 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
2974                       struct mgs_target_info *mti)
2975 {
2976 #if 0
2977         struct fs_db *fsdb;
2978         int rc;
2979         ENTRY;
2980
2981         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
2982         if (rc)
2983                 RETURN(rc);
2984
2985         if (mgs_log_is_empty(obd, mti->mti_svname))
2986                 /* should never happen */
2987                 RETURN(-ENOENT);
2988
2989         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
2990
2991         /* FIXME We can just check mti->params to see if we're already in
2992            the failover list.  Modify mti->params for rewriting back at
2993            server_register_target(). */
2994
2995         cfs_mutex_lock(&fsdb->fsdb_mutex);
2996         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
2997         cfs_mutex_unlock(&fsdb->fsdb_mutex);
2998
2999         RETURN(rc);
3000 #endif
3001         return 0;
3002 }
3003
3004 int mgs_write_log_target(const struct lu_env *env,
3005                          struct mgs_device *mgs,
3006                          struct mgs_target_info *mti,
3007                          struct fs_db *fsdb)
3008 {
3009         int rc = -EINVAL;
3010         char *buf, *params;
3011         ENTRY;
3012
3013         /* set/check the new target index */
3014         rc = mgs_set_index(env, mgs, mti);
3015         if (rc < 0) {
3016                 CERROR("Can't get index (%d)\n", rc);
3017                 RETURN(rc);
3018         }
3019
3020         if (rc == EALREADY) {
3021                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3022                               mti->mti_stripe_index, mti->mti_svname);
3023                 /* We would like to mark old log sections as invalid
3024                    and add new log sections in the client and mdt logs.
3025                    But if we add new sections, then live clients will
3026                    get repeat setup instructions for already running
3027                    osc's. So don't update the client/mdt logs. */
3028                 mti->mti_flags &= ~LDD_F_UPDATE;
3029         }
3030
3031         cfs_mutex_lock(&fsdb->fsdb_mutex);
3032
3033         if (mti->mti_flags &
3034             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3035                 /* Generate a log from scratch */
3036                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3037                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3038                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3039                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3040                 } else {
3041                         CERROR("Unknown target type %#x, can't create log for "
3042                                "%s\n", mti->mti_flags, mti->mti_svname);
3043                 }
3044                 if (rc) {
3045                         CERROR("Can't write logs for %s (%d)\n",
3046                                mti->mti_svname, rc);
3047                         GOTO(out_up, rc);
3048                 }
3049         } else {
3050                 /* Just update the params from tunefs in mgs_write_log_params */
3051                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3052                 mti->mti_flags |= LDD_F_PARAM;
3053         }
3054
3055         /* allocate temporary buffer, where class_get_next_param will
3056            make copy of a current  parameter */
3057         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3058         if (buf == NULL)
3059                 GOTO(out_up, rc = -ENOMEM);
3060         params = mti->mti_params;
3061         while (params != NULL) {
3062                 rc = class_get_next_param(&params, buf);
3063                 if (rc) {
3064                         if (rc == 1)
3065                                 /* there is no next parameter, that is
3066                                    not an error */
3067                                 rc = 0;
3068                         break;
3069                 }
3070                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3071                        params, buf);
3072                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3073                 if (rc)
3074                         break;
3075         }
3076
3077         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3078
3079 out_up:
3080         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3081         RETURN(rc);
3082 }
3083
3084 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3085 {
3086         struct llog_ctxt        *ctxt;
3087         int                      rc = 0;
3088
3089         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3090         if (ctxt == NULL) {
3091                 CERROR("%s: MGS config context doesn't exist\n",
3092                        mgs->mgs_obd->obd_name);
3093                 rc = -ENODEV;
3094         } else {
3095                 rc = llog_erase(env, ctxt, NULL, name);
3096                 /* llog may not exist */
3097                 if (rc == -ENOENT)
3098                         rc = 0;
3099                 llog_ctxt_put(ctxt);
3100         }
3101
3102         if (rc)
3103                 CERROR("%s: failed to clear log %s: %d\n",
3104                        mgs->mgs_obd->obd_name, name, rc);
3105
3106         return rc;
3107 }
3108
3109 /* erase all logs for the given fs */
3110 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3111 {
3112         struct fs_db *fsdb;
3113         cfs_list_t list;
3114         struct mgs_direntry *dirent, *n;
3115         int rc, len = strlen(fsname);
3116         char *suffix;
3117         ENTRY;
3118
3119         /* Find all the logs in the CONFIGS directory */
3120         rc = class_dentry_readdir(env, mgs, &list);
3121         if (rc)
3122                 RETURN(rc);
3123
3124         cfs_mutex_lock(&mgs->mgs_mutex);
3125
3126         /* Delete the fs db */
3127         fsdb = mgs_find_fsdb(mgs, fsname);
3128         if (fsdb)
3129                 mgs_free_fsdb(mgs, fsdb);
3130
3131         cfs_mutex_unlock(&mgs->mgs_mutex);
3132
3133         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3134                 cfs_list_del(&dirent->list);
3135                 suffix = strrchr(dirent->name, '-');
3136                 if (suffix != NULL) {
3137                         if ((len == suffix - dirent->name) &&
3138                             (strncmp(fsname, dirent->name, len) == 0)) {
3139                                 CDEBUG(D_MGS, "Removing log %s\n",
3140                                        dirent->name);
3141                                 mgs_erase_log(env, mgs, dirent->name);
3142                         }
3143                 }
3144                 mgs_direntry_free(dirent);
3145         }
3146
3147         RETURN(rc);
3148 }
3149
3150 /* from llog_swab */
3151 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3152 {
3153         int i;
3154         ENTRY;
3155
3156         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3157         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3158
3159         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3160         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3161         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3162         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3163
3164         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3165         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3166                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3167                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3168                                i, lcfg->lcfg_buflens[i],
3169                                lustre_cfg_string(lcfg, i));
3170                 }
3171         EXIT;
3172 }
3173
3174 /* Set a permanent (config log) param for a target or fs
3175  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3176  *             buf1 contains the single parameter
3177  */
3178 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3179                  struct lustre_cfg *lcfg, char *fsname)
3180 {
3181         struct fs_db *fsdb;
3182         struct mgs_target_info *mti;
3183         char *devname, *param;
3184         char *ptr, *tmp;
3185         __u32 index;
3186         int rc = 0;
3187         ENTRY;
3188
3189         print_lustre_cfg(lcfg);
3190
3191         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3192         devname = lustre_cfg_string(lcfg, 0);
3193         param = lustre_cfg_string(lcfg, 1);
3194         if (!devname) {
3195                 /* Assume device name embedded in param:
3196                    lustre-OST0000.osc.max_dirty_mb=32 */
3197                 ptr = strchr(param, '.');
3198                 if (ptr) {
3199                         devname = param;
3200                         *ptr = 0;
3201                         param = ptr + 1;
3202                 }
3203         }
3204         if (!devname) {
3205                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3206                 RETURN(-ENOSYS);
3207         }
3208
3209         /* Extract fsname */
3210         ptr = strrchr(devname, '-');
3211         memset(fsname, 0, MTI_NAME_MAXLEN);
3212         if (ptr && (server_name2index(ptr, &index, NULL) >= 0)) {
3213                 /* param related to llite isn't allowed to set by OST or MDT */
3214                 if (strncmp(param, PARAM_LLITE, sizeof(PARAM_LLITE)) == 0)
3215                         RETURN(-EINVAL);
3216
3217                 strncpy(fsname, devname, ptr - devname);
3218         } else {
3219                 /* assume devname is the fsname */
3220                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3221         }
3222         fsname[MTI_NAME_MAXLEN - 1] = 0;
3223         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3224
3225         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3226         if (rc)
3227                 RETURN(rc);
3228         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3229             cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3230                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3231                        "is '%s'\n", fsname, devname);
3232                 mgs_free_fsdb(mgs, fsdb);
3233                 RETURN(-EINVAL);
3234         }
3235
3236         /* Create a fake mti to hold everything */
3237         OBD_ALLOC_PTR(mti);
3238         if (!mti)
3239                 GOTO(out, rc = -ENOMEM);
3240         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3241         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3242         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3243         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3244         if (rc < 0)
3245                 /* Not a valid server; may be only fsname */
3246                 rc = 0;
3247         else
3248                 /* Strip -osc or -mdc suffix from svname */
3249                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3250                                      mti->mti_svname))
3251                         GOTO(out, rc = -EINVAL);
3252
3253         mti->mti_flags = rc | LDD_F_PARAM;
3254
3255         cfs_mutex_lock(&fsdb->fsdb_mutex);
3256         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3257         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3258
3259         /*
3260          * Revoke lock so everyone updates.  Should be alright if
3261          * someone was already reading while we were updating the logs,
3262          * so we don't really need to hold the lock while we're
3263          * writing (above).
3264          */
3265         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3266 out:
3267         OBD_FREE_PTR(mti);
3268         RETURN(rc);
3269 }
3270
3271 static int mgs_write_log_pool(const struct lu_env *env,
3272                               struct mgs_device *mgs, char *logname,
3273                               struct fs_db *fsdb, char *lovname,
3274                               enum lcfg_command_type cmd,
3275                               char *poolname, char *fsname,
3276                               char *ostname, char *comment)
3277 {
3278         struct llog_handle *llh = NULL;
3279         int rc;
3280
3281         rc = record_start_log(env, mgs, &llh, logname);
3282         if (rc)
3283                 return rc;
3284         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3285         if (rc)
3286                 goto out;
3287         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3288         if (rc)
3289                 goto out;
3290         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3291 out:
3292         record_end_log(env, &llh);
3293         return rc;
3294 }
3295
3296 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3297                  enum lcfg_command_type cmd, char *fsname,
3298                  char *poolname, char *ostname)
3299 {
3300         struct fs_db *fsdb;
3301         char *lovname;
3302         char *logname;
3303         char *label = NULL, *canceled_label = NULL;
3304         int label_sz;
3305         struct mgs_target_info *mti = NULL;
3306         int rc, i;
3307         ENTRY;
3308
3309         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3310         if (rc) {
3311                 CERROR("Can't get db for %s\n", fsname);
3312                 RETURN(rc);
3313         }
3314         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3315                 CERROR("%s is not defined\n", fsname);
3316                 mgs_free_fsdb(mgs, fsdb);
3317                 RETURN(-EINVAL);
3318         }
3319
3320         label_sz = 10 + strlen(fsname) + strlen(poolname);
3321
3322         /* check if ostname match fsname */
3323         if (ostname != NULL) {
3324                 char *ptr;
3325
3326                 ptr = strrchr(ostname, '-');
3327                 if ((ptr == NULL) ||
3328                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3329                         RETURN(-EINVAL);
3330                 label_sz += strlen(ostname);
3331         }
3332
3333         OBD_ALLOC(label, label_sz);
3334         if (label == NULL)
3335                 RETURN(-ENOMEM);
3336
3337         switch(cmd) {
3338         case LCFG_POOL_NEW:
3339                 sprintf(label,
3340                         "new %s.%s", fsname, poolname);
3341                 break;
3342         case LCFG_POOL_ADD:
3343                 sprintf(label,
3344                         "add %s.%s.%s", fsname, poolname, ostname);
3345                 break;
3346         case LCFG_POOL_REM:
3347                 OBD_ALLOC(canceled_label, label_sz);
3348                 if (canceled_label == NULL)
3349                         GOTO(out_label, rc = -ENOMEM);
3350                 sprintf(label,
3351                         "rem %s.%s.%s", fsname, poolname, ostname);
3352                 sprintf(canceled_label,
3353                         "add %s.%s.%s", fsname, poolname, ostname);
3354                 break;
3355         case LCFG_POOL_DEL:
3356                 OBD_ALLOC(canceled_label, label_sz);
3357                 if (canceled_label == NULL)
3358                         GOTO(out_label, rc = -ENOMEM);
3359                 sprintf(label,
3360                         "del %s.%s", fsname, poolname);
3361                 sprintf(canceled_label,
3362                         "new %s.%s", fsname, poolname);
3363                 break;
3364         default:
3365                 break;
3366         }
3367
3368         cfs_mutex_lock(&fsdb->fsdb_mutex);
3369
3370         if (canceled_label != NULL) {
3371                 OBD_ALLOC_PTR(mti);
3372                 if (mti == NULL)
3373                         GOTO(out_cancel, rc = -ENOMEM);
3374         }
3375
3376         /* write pool def to all MDT logs */
3377         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3378                  if (cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3379                         rc = name_create_mdt_and_lov(&logname, &lovname,
3380                                                      fsdb, i);
3381                         if (rc) {
3382                                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3383                                 GOTO(out_mti, rc);
3384                         }
3385                         if (canceled_label != NULL) {
3386                                 strcpy(mti->mti_svname, "lov pool");
3387                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3388                                                 lovname, canceled_label,
3389                                                 CM_SKIP);
3390                         }
3391
3392                         if (rc >= 0)
3393                                 rc = mgs_write_log_pool(env, mgs, logname,
3394                                                         fsdb, lovname, cmd,
3395                                                         fsname, poolname,
3396                                                         ostname, label);
3397                         name_destroy(&logname);
3398                         name_destroy(&lovname);
3399                         if (rc) {
3400                                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3401                                 GOTO(out_mti, rc);
3402                         }
3403                 }
3404         }
3405
3406         rc = name_create(&logname, fsname, "-client");
3407         if (rc) {
3408                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3409                 GOTO(out_mti, rc);
3410         }
3411         if (canceled_label != NULL) {
3412                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3413                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3414                 if (rc < 0) {
3415                         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3416                         name_destroy(&logname);
3417                         GOTO(out_mti, rc);
3418                 }
3419         }
3420
3421         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3422                                 cmd, fsname, poolname, ostname, label);
3423         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3424         name_destroy(&logname);
3425         /* request for update */
3426         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3427
3428         EXIT;
3429 out_mti:
3430         if (mti != NULL)
3431                 OBD_FREE_PTR(mti);
3432 out_cancel:
3433         if (canceled_label != NULL)
3434                 OBD_FREE(canceled_label, label_sz);
3435 out_label:
3436         OBD_FREE(label, label_sz);
3437         return rc;
3438 }
3439
3440 #if 0
3441 /******************** unused *********************/
3442 static int mgs_backup_llog(struct obd_device *obd, char* fsname)
3443 {
3444         struct file *filp, *bak_filp;
3445         struct lvfs_run_ctxt saved;
3446         char *logname, *buf;
3447         loff_t soff = 0 , doff = 0;
3448         int count = 4096, len;
3449         int rc = 0;
3450
3451         OBD_ALLOC(logname, PATH_MAX);
3452         if (logname == NULL)
3453                 return -ENOMEM;
3454
3455         OBD_ALLOC(buf, count);
3456         if (!buf)
3457                 GOTO(out , rc = -ENOMEM);
3458
3459         len = snprintf(logname, PATH_MAX, "%s/%s.bak",
3460                        MOUNT_CONFIGS_DIR, fsname);
3461
3462         if (len >= PATH_MAX - 1) {
3463                 GOTO(out, -ENAMETOOLONG);
3464         }
3465
3466         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3467
3468         bak_filp = l_filp_open(logname, O_RDWR|O_CREAT|O_TRUNC, 0660);
3469         if (IS_ERR(bak_filp)) {
3470                 rc = PTR_ERR(bak_filp);
3471                 CERROR("backup logfile open %s: %d\n", logname, rc);
3472                 GOTO(pop, rc);
3473         }
3474         sprintf(logname, "%s/%s", MOUNT_CONFIGS_DIR, fsname);
3475         filp = l_filp_open(logname, O_RDONLY, 0);
3476         if (IS_ERR(filp)) {
3477                 rc = PTR_ERR(filp);
3478                 CERROR("logfile open %s: %d\n", logname, rc);
3479                 GOTO(close1f, rc);
3480         }
3481
3482         while ((rc = lustre_fread(filp, buf, count, &soff)) > 0) {
3483                 rc = lustre_fwrite(bak_filp, buf, count, &doff);
3484                 break;
3485         }
3486
3487         filp_close(filp, 0);
3488 close1f:
3489         filp_close(bak_filp, 0);
3490 pop:
3491         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3492 out:
3493         if (buf)
3494                 OBD_FREE(buf, count);
3495         OBD_FREE(logname, PATH_MAX);
3496         return rc;
3497 }
3498
3499 #endif