Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551                 if (fsdb->fsdb_mdt_count >= MAX_MDT_COUNT) {
552                         LCONSOLE_ERROR_MSG(0x13f, "The max mdt count"
553                                            "is %d\n", (int)MAX_MDT_COUNT);
554                         GOTO(out_up, rc = -ERANGE);
555                 }
556         } else {
557                 GOTO(out_up, rc = -EINVAL);
558         }
559
560         if (mti->mti_flags & LDD_F_NEED_INDEX) {
561                 rc = next_index(imap, INDEX_MAP_SIZE);
562                 if (rc == -1)
563                         GOTO(out_up, rc = -ERANGE);
564                 mti->mti_stripe_index = rc;
565                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
566                         fsdb->fsdb_mdt_count ++;
567         }
568
569         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
570                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
571                                    "but the max index is %d.\n",
572                                    mti->mti_svname, mti->mti_stripe_index,
573                                    INDEX_MAP_SIZE * 8);
574                 GOTO(out_up, rc = -ERANGE);
575         }
576
577         if (test_bit(mti->mti_stripe_index, imap)) {
578                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
579                     !(mti->mti_flags & LDD_F_WRITECONF)) {
580                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
581                                            "%d, but that index is already in "
582                                            "use. Use --writeconf to force\n",
583                                            mti->mti_svname,
584                                            mti->mti_stripe_index);
585                         GOTO(out_up, rc = -EADDRINUSE);
586                 } else {
587                         CDEBUG(D_MGS, "Server %s updating index %d\n",
588                                mti->mti_svname, mti->mti_stripe_index);
589                         GOTO(out_up, rc = EALREADY);
590                 }
591         }
592
593         set_bit(mti->mti_stripe_index, imap);
594         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
595         mutex_unlock(&fsdb->fsdb_mutex);
596         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
597                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
598
599         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
600                mti->mti_stripe_index);
601
602         RETURN(0);
603 out_up:
604         mutex_unlock(&fsdb->fsdb_mutex);
605         return rc;
606 }
607
608 struct mgs_modify_lookup {
609         struct cfg_marker mml_marker;
610         int               mml_modified;
611 };
612
613 static int mgs_modify_handler(const struct lu_env *env,
614                               struct llog_handle *llh,
615                               struct llog_rec_hdr *rec, void *data)
616 {
617         struct mgs_modify_lookup *mml = data;
618         struct cfg_marker *marker;
619         struct lustre_cfg *lcfg = (struct lustre_cfg *)(rec + 1);
620         int cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
621                 sizeof(struct llog_rec_tail);
622         int rc;
623         ENTRY;
624
625         if (rec->lrh_type != OBD_CFG_REC) {
626                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
627                 RETURN(-EINVAL);
628         }
629
630         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
631         if (rc) {
632                 CERROR("Insane cfg\n");
633                 RETURN(rc);
634         }
635
636         /* We only care about markers */
637         if (lcfg->lcfg_command != LCFG_MARKER)
638                 RETURN(0);
639
640         marker = lustre_cfg_buf(lcfg, 1);
641         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
642             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
643             !(marker->cm_flags & CM_SKIP)) {
644                 /* Found a non-skipped marker match */
645                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
646                        rec->lrh_index, marker->cm_step,
647                        marker->cm_flags, mml->mml_marker.cm_flags,
648                        marker->cm_tgtname, marker->cm_comment);
649                 /* Overwrite the old marker llog entry */
650                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
651                 marker->cm_flags |= mml->mml_marker.cm_flags;
652                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
653                 /* Header and tail are added back to lrh_len in
654                    llog_lvfs_write_rec */
655                 rec->lrh_len = cfg_len;
656                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
657                                 rec->lrh_index);
658                 if (!rc)
659                          mml->mml_modified++;
660         }
661
662         RETURN(rc);
663 }
664
665 /**
666  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
667  * Return code:
668  * 0 - modified successfully,
669  * 1 - no modification was done
670  * negative - error
671  */
672 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
673                       struct fs_db *fsdb, struct mgs_target_info *mti,
674                       char *logname, char *devname, char *comment, int flags)
675 {
676         struct llog_handle *loghandle;
677         struct llog_ctxt *ctxt;
678         struct mgs_modify_lookup *mml;
679         int rc;
680
681         ENTRY;
682
683         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
684         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
685                flags);
686
687         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
688         LASSERT(ctxt != NULL);
689         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
690         if (rc < 0) {
691                 if (rc == -ENOENT)
692                         rc = 0;
693                 GOTO(out_pop, rc);
694         }
695
696         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
697         if (rc)
698                 GOTO(out_close, rc);
699
700         if (llog_get_size(loghandle) <= 1)
701                 GOTO(out_close, rc = 0);
702
703         OBD_ALLOC_PTR(mml);
704         if (!mml)
705                 GOTO(out_close, rc = -ENOMEM);
706         strcpy(mml->mml_marker.cm_comment, comment);
707         strcpy(mml->mml_marker.cm_tgtname, devname);
708         /* Modify mostly means cancel */
709         mml->mml_marker.cm_flags = flags;
710         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
711         mml->mml_modified = 0;
712         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
713                           NULL);
714         if (!rc && !mml->mml_modified)
715                 rc = 1;
716         OBD_FREE_PTR(mml);
717
718 out_close:
719         llog_close(env, loghandle);
720 out_pop:
721         if (rc < 0)
722                 CERROR("%s: modify %s/%s failed: rc = %d\n",
723                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
724         llog_ctxt_put(ctxt);
725         RETURN(rc);
726 }
727
728 /******************** config log recording functions *********************/
729
730 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
731                        struct lustre_cfg *lcfg)
732 {
733         struct llog_rec_hdr      rec;
734         int                      buflen, rc;
735
736         if (!lcfg || !llh)
737                 return -ENOMEM;
738
739         LASSERT(llh->lgh_ctxt);
740
741         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
742                                 lcfg->lcfg_buflens);
743         rec.lrh_len = llog_data_len(buflen);
744         rec.lrh_type = OBD_CFG_REC;
745
746         /* idx = -1 means append */
747         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
748         if (rc)
749                 CERROR("failed %d\n", rc);
750         return rc;
751 }
752
753 static int record_base(const struct lu_env *env, struct llog_handle *llh,
754                      char *cfgname, lnet_nid_t nid, int cmd,
755                      char *s1, char *s2, char *s3, char *s4)
756 {
757         struct mgs_thread_info *mgi = mgs_env_info(env);
758         struct lustre_cfg     *lcfg;
759         int rc;
760
761         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
762                cmd, s1, s2, s3, s4);
763
764         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
765         if (s1)
766                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
767         if (s2)
768                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
769         if (s3)
770                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
771         if (s4)
772                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
773
774         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
775         if (!lcfg)
776                 return -ENOMEM;
777         lcfg->lcfg_nid = nid;
778
779         rc = record_lcfg(env, llh, lcfg);
780
781         lustre_cfg_free(lcfg);
782
783         if (rc) {
784                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
785                        cmd, s1, s2, s3, s4);
786         }
787         return rc;
788 }
789
790
791 static inline int record_add_uuid(const struct lu_env *env,
792                                   struct llog_handle *llh,
793                                   uint64_t nid, char *uuid)
794 {
795         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
796
797 }
798
799 static inline int record_add_conn(const struct lu_env *env,
800                                   struct llog_handle *llh,
801                                   char *devname, char *uuid)
802 {
803         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
804 }
805
806 static inline int record_attach(const struct lu_env *env,
807                                 struct llog_handle *llh, char *devname,
808                                 char *type, char *uuid)
809 {
810         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
811 }
812
813 static inline int record_setup(const struct lu_env *env,
814                                struct llog_handle *llh, char *devname,
815                                char *s1, char *s2, char *s3, char *s4)
816 {
817         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
818 }
819
820 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
821                             char *devname, struct lov_desc *desc)
822 {
823         struct mgs_thread_info *mgi = mgs_env_info(env);
824         struct lustre_cfg *lcfg;
825         int rc;
826
827         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
828         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
829         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
830         if (!lcfg)
831                 return -ENOMEM;
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835         return rc;
836 }
837
838 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
839                             char *devname, struct lmv_desc *desc)
840 {
841         struct mgs_thread_info *mgi = mgs_env_info(env);
842         struct lustre_cfg *lcfg;
843         int rc;
844
845         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
846         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
847         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
848
849         rc = record_lcfg(env, llh, lcfg);
850
851         lustre_cfg_free(lcfg);
852         return rc;
853 }
854
855 static inline int record_mdc_add(const struct lu_env *env,
856                                  struct llog_handle *llh,
857                                  char *logname, char *mdcuuid,
858                                  char *mdtuuid, char *index,
859                                  char *gen)
860 {
861         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
862                            mdtuuid,index,gen,mdcuuid);
863 }
864
865 static inline int record_lov_add(const struct lu_env *env,
866                                  struct llog_handle *llh,
867                                  char *lov_name, char *ost_uuid,
868                                  char *index, char *gen)
869 {
870         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
871                            ost_uuid,index,gen,0);
872 }
873
874 static inline int record_mount_opt(const struct lu_env *env,
875                                    struct llog_handle *llh,
876                                    char *profile, char *lov_name,
877                                    char *mdc_name)
878 {
879         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
880                            profile,lov_name,mdc_name,0);
881 }
882
883 static int record_marker(const struct lu_env *env,
884                          struct llog_handle *llh,
885                          struct fs_db *fsdb, __u32 flags,
886                          char *tgtname, char *comment)
887 {
888         struct mgs_thread_info *mgi = mgs_env_info(env);
889         struct lustre_cfg *lcfg;
890         int rc;
891
892         if (flags & CM_START)
893                 fsdb->fsdb_gen++;
894         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
895         mgi->mgi_marker.cm_flags = flags;
896         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
897         strncpy(mgi->mgi_marker.cm_tgtname, tgtname,
898                 sizeof(mgi->mgi_marker.cm_tgtname));
899         strncpy(mgi->mgi_marker.cm_comment, comment,
900                 sizeof(mgi->mgi_marker.cm_comment));
901         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
902         mgi->mgi_marker.cm_canceltime = 0;
903         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
904         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
905                             sizeof(mgi->mgi_marker));
906         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
907         if (!lcfg)
908                 return -ENOMEM;
909         rc = record_lcfg(env, llh, lcfg);
910
911         lustre_cfg_free(lcfg);
912         return rc;
913 }
914
915 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
916                             struct llog_handle **llh, char *name)
917 {
918         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
919         struct llog_ctxt        *ctxt;
920         int                      rc = 0;
921
922         if (*llh)
923                 GOTO(out, rc = -EBUSY);
924
925         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
926         if (!ctxt)
927                 GOTO(out, rc = -ENODEV);
928         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
929
930         rc = llog_open_create(env, ctxt, llh, NULL, name);
931         if (rc)
932                 GOTO(out_ctxt, rc);
933         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
934         if (rc)
935                 llog_close(env, *llh);
936 out_ctxt:
937         llog_ctxt_put(ctxt);
938 out:
939         if (rc) {
940                 CERROR("%s: can't start log %s: rc = %d\n",
941                        mgs->mgs_obd->obd_name, name, rc);
942                 *llh = NULL;
943         }
944         RETURN(rc);
945 }
946
947 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
948 {
949         int rc;
950
951         rc = llog_close(env, *llh);
952         *llh = NULL;
953
954         return rc;
955 }
956
957 static int mgs_log_is_empty(const struct lu_env *env,
958                             struct mgs_device *mgs, char *name)
959 {
960         struct llog_handle *llh;
961         struct llog_ctxt *ctxt;
962         int rc = 0;
963
964         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
965         LASSERT(ctxt != NULL);
966         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
967         if (rc < 0) {
968                 if (rc == -ENOENT)
969                         rc = 0;
970                 GOTO(out_ctxt, rc);
971         }
972
973         llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
974         if (rc)
975                 GOTO(out_close, rc);
976         rc = llog_get_size(llh);
977
978 out_close:
979         llog_close(env, llh);
980 out_ctxt:
981         llog_ctxt_put(ctxt);
982         /* header is record 1 */
983         return (rc <= 1);
984 }
985
986 /******************** config "macros" *********************/
987
988 /* write an lcfg directly into a log (with markers) */
989 static int mgs_write_log_direct(const struct lu_env *env,
990                                 struct mgs_device *mgs, struct fs_db *fsdb,
991                                 char *logname, struct lustre_cfg *lcfg,
992                                 char *devname, char *comment)
993 {
994         struct llog_handle *llh = NULL;
995         int rc;
996         ENTRY;
997
998         if (!lcfg)
999                 RETURN(-ENOMEM);
1000
1001         rc = record_start_log(env, mgs, &llh, logname);
1002         if (rc)
1003                 RETURN(rc);
1004
1005         /* FIXME These should be a single journal transaction */
1006         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1007         if (rc)
1008                 GOTO(out_end, rc);
1009         rc = record_lcfg(env, llh, lcfg);
1010         if (rc)
1011                 GOTO(out_end, rc);
1012         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1013         if (rc)
1014                 GOTO(out_end, rc);
1015 out_end:
1016         record_end_log(env, &llh);
1017         RETURN(rc);
1018 }
1019
1020 /* write the lcfg in all logs for the given fs */
1021 int mgs_write_log_direct_all(const struct lu_env *env,
1022                              struct mgs_device *mgs,
1023                              struct fs_db *fsdb,
1024                              struct mgs_target_info *mti,
1025                              struct lustre_cfg *lcfg,
1026                              char *devname, char *comment,
1027                              int server_only)
1028 {
1029         cfs_list_t list;
1030         struct mgs_direntry *dirent, *n;
1031         char *fsname = mti->mti_fsname;
1032         char *logname;
1033         int rc = 0, len = strlen(fsname);
1034         ENTRY;
1035
1036         /* We need to set params for any future logs
1037            as well. FIXME Append this file to every new log.
1038            Actually, we should store as params (text), not llogs.  Or
1039            in a database. */
1040         rc = name_create(&logname, fsname, "-params");
1041         if (rc)
1042                 RETURN(rc);
1043         if (mgs_log_is_empty(env, mgs, logname)) {
1044                 struct llog_handle *llh = NULL;
1045                 rc = record_start_log(env, mgs, &llh, logname);
1046                 record_end_log(env, &llh);
1047         }
1048         name_destroy(&logname);
1049         if (rc)
1050                 RETURN(rc);
1051
1052         /* Find all the logs in the CONFIGS directory */
1053         rc = class_dentry_readdir(env, mgs, &list);
1054         if (rc)
1055                 RETURN(rc);
1056
1057         /* Could use fsdb index maps instead of directory listing */
1058         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1059                 cfs_list_del(&dirent->list);
1060                 /* don't write to sptlrpc rule log */
1061                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1062                         goto next;
1063
1064                 /* caller wants write server logs only */
1065                 if (server_only && strstr(dirent->name, "-client") != NULL)
1066                         goto next;
1067
1068                 if (strncmp(fsname, dirent->name, len) == 0) {
1069                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1070                         /* Erase any old settings of this same parameter */
1071                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1072                                         devname, comment, CM_SKIP);
1073                         if (rc < 0)
1074                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1075                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1076                         /* Write the new one */
1077                         if (lcfg) {
1078                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1079                                                           dirent->name,
1080                                                           lcfg, devname,
1081                                                           comment);
1082                                 if (rc)
1083                                         CERROR("%s: writing log %s: rc = %d\n",
1084                                                mgs->mgs_obd->obd_name,
1085                                                dirent->name, rc);
1086                         }
1087                 }
1088 next:
1089                 mgs_direntry_free(dirent);
1090         }
1091
1092         RETURN(rc);
1093 }
1094
1095 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1096                                     struct mgs_device *mgs,
1097                                     struct fs_db *fsdb,
1098                                     struct mgs_target_info *mti,
1099                                     char *logname);
1100 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1101                                     struct mgs_device *mgs,
1102                                     struct fs_db *fsdb,
1103                                     struct mgs_target_info *mti,
1104                                     char *logname, char *suffix, char *lovname,
1105                                     enum lustre_sec_part sec_part, int flags);
1106 static int name_create_mdt_and_lov(char **logname, char **lovname,
1107                                    struct fs_db *fsdb, int i);
1108
1109 static int mgs_steal_llog_handler(const struct lu_env *env,
1110                                   struct llog_handle *llh,
1111                                   struct llog_rec_hdr *rec, void *data)
1112 {
1113         struct mgs_device *mgs;
1114         struct obd_device *obd;
1115         struct mgs_target_info *mti, *tmti;
1116         struct fs_db *fsdb;
1117         int cfg_len = rec->lrh_len;
1118         char *cfg_buf = (char*) (rec + 1);
1119         struct lustre_cfg *lcfg;
1120         int rc = 0;
1121         struct llog_handle *mdt_llh = NULL;
1122         static int got_an_osc_or_mdc = 0;
1123         /* 0: not found any osc/mdc;
1124            1: found osc;
1125            2: found mdc;
1126         */
1127         static int last_step = -1;
1128
1129         ENTRY;
1130
1131         mti = ((struct temp_comp*)data)->comp_mti;
1132         tmti = ((struct temp_comp*)data)->comp_tmti;
1133         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1134         obd = ((struct temp_comp *)data)->comp_obd;
1135         mgs = lu2mgs_dev(obd->obd_lu_dev);
1136         LASSERT(mgs);
1137
1138         if (rec->lrh_type != OBD_CFG_REC) {
1139                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1140                 RETURN(-EINVAL);
1141         }
1142
1143         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1144         if (rc) {
1145                 CERROR("Insane cfg\n");
1146                 RETURN(rc);
1147         }
1148
1149         lcfg = (struct lustre_cfg *)cfg_buf;
1150
1151         if (lcfg->lcfg_command == LCFG_MARKER) {
1152                 struct cfg_marker *marker;
1153                 marker = lustre_cfg_buf(lcfg, 1);
1154                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1155                     (marker->cm_flags & CM_START)){
1156                         got_an_osc_or_mdc = 1;
1157                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1158                                 sizeof(tmti->mti_svname));
1159                         rc = record_start_log(env, mgs, &mdt_llh,
1160                                               mti->mti_svname);
1161                         if (rc)
1162                                 RETURN(rc);
1163                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1164                                            mti->mti_svname,"add osc(copied)");
1165                         record_end_log(env, &mdt_llh);
1166                         last_step = marker->cm_step;
1167                         RETURN(rc);
1168                 }
1169                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1170                     (marker->cm_flags & CM_END)){
1171                         LASSERT(last_step == marker->cm_step);
1172                         last_step = -1;
1173                         got_an_osc_or_mdc = 0;
1174                         rc = record_start_log(env, mgs, &mdt_llh,
1175                                               mti->mti_svname);
1176                         if (rc)
1177                                 RETURN(rc);
1178                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1179                                            mti->mti_svname,"add osc(copied)");
1180                         record_end_log(env, &mdt_llh);
1181                         RETURN(rc);
1182                 }
1183                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1184                     (marker->cm_flags & CM_START)){
1185                         got_an_osc_or_mdc = 2;
1186                         last_step = marker->cm_step;
1187                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1188                                strlen(marker->cm_tgtname));
1189
1190                         RETURN(rc);
1191                 }
1192                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1193                     (marker->cm_flags & CM_END)){
1194                         LASSERT(last_step == marker->cm_step);
1195                         last_step = -1;
1196                         got_an_osc_or_mdc = 0;
1197                         RETURN(rc);
1198                 }
1199         }
1200
1201         if (got_an_osc_or_mdc == 0 || last_step < 0)
1202                 RETURN(rc);
1203
1204         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1205                 uint64_t nodenid;
1206                 nodenid = lcfg->lcfg_nid;
1207
1208                 tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1209                 tmti->mti_nid_count++;
1210
1211                 RETURN(rc);
1212         }
1213
1214         if (lcfg->lcfg_command == LCFG_SETUP) {
1215                 char *target;
1216
1217                 target = lustre_cfg_string(lcfg, 1);
1218                 memcpy(tmti->mti_uuid, target, strlen(target));
1219                 RETURN(rc);
1220         }
1221
1222         /* ignore client side sptlrpc_conf_log */
1223         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1224                 RETURN(rc);
1225
1226         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1227                 int index;
1228
1229                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1230                         RETURN (-EINVAL);
1231
1232                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1233                        strlen(mti->mti_fsname));
1234                 tmti->mti_stripe_index = index;
1235
1236                 rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, tmti,
1237                                               mti->mti_svname);
1238                 memset(tmti, 0, sizeof(*tmti));
1239                 RETURN(rc);
1240         }
1241
1242         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1243                 int index;
1244                 char mdt_index[9];
1245                 char *logname, *lovname;
1246
1247                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1248                                              mti->mti_stripe_index);
1249                 if (rc)
1250                         RETURN(rc);
1251                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1252
1253                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1254                         name_destroy(&logname);
1255                         name_destroy(&lovname);
1256                         RETURN(-EINVAL);
1257                 }
1258
1259                 tmti->mti_stripe_index = index;
1260                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1261                                          mdt_index, lovname,
1262                                          LUSTRE_SP_MDT, 0);
1263                 name_destroy(&logname);
1264                 name_destroy(&lovname);
1265                 RETURN(rc);
1266         }
1267         RETURN(rc);
1268 }
1269
1270 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1271 /* stealed from mgs_get_fsdb_from_llog*/
1272 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1273                                               struct mgs_device *mgs,
1274                                               char *client_name,
1275                                               struct temp_comp* comp)
1276 {
1277         struct llog_handle *loghandle;
1278         struct mgs_target_info *tmti;
1279         struct llog_ctxt *ctxt;
1280         int rc;
1281
1282         ENTRY;
1283
1284         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1285         LASSERT(ctxt != NULL);
1286
1287         OBD_ALLOC_PTR(tmti);
1288         if (tmti == NULL)
1289                 GOTO(out_ctxt, rc = -ENOMEM);
1290
1291         comp->comp_tmti = tmti;
1292         comp->comp_obd = mgs->mgs_obd;
1293
1294         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1295                        LLOG_OPEN_EXISTS);
1296         if (rc < 0) {
1297                 if (rc == -ENOENT)
1298                         rc = 0;
1299                 GOTO(out_pop, rc);
1300         }
1301
1302         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1303         if (rc)
1304                 GOTO(out_close, rc);
1305
1306         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1307                                   (void *)comp, NULL, false);
1308         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1309 out_close:
1310         llog_close(env, loghandle);
1311 out_pop:
1312         OBD_FREE_PTR(tmti);
1313 out_ctxt:
1314         llog_ctxt_put(ctxt);
1315         RETURN(rc);
1316 }
1317
1318 /* lmv is the second thing for client logs */
1319 /* copied from mgs_write_log_lov. Please refer to that.  */
1320 static int mgs_write_log_lmv(const struct lu_env *env,
1321                              struct mgs_device *mgs,
1322                              struct fs_db *fsdb,
1323                              struct mgs_target_info *mti,
1324                              char *logname, char *lmvname)
1325 {
1326         struct llog_handle *llh = NULL;
1327         struct lmv_desc *lmvdesc;
1328         char *uuid;
1329         int rc = 0;
1330         ENTRY;
1331
1332         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1333
1334         OBD_ALLOC_PTR(lmvdesc);
1335         if (lmvdesc == NULL)
1336                 RETURN(-ENOMEM);
1337         lmvdesc->ld_active_tgt_count = 0;
1338         lmvdesc->ld_tgt_count = 0;
1339         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1340         uuid = (char *)lmvdesc->ld_uuid.uuid;
1341
1342         rc = record_start_log(env, mgs, &llh, logname);
1343         if (rc)
1344                 GOTO(out_free, rc);
1345         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1346         if (rc)
1347                 GOTO(out_end, rc);
1348         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1349         if (rc)
1350                 GOTO(out_end, rc);
1351         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1352         if (rc)
1353                 GOTO(out_end, rc);
1354         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1355         if (rc)
1356                 GOTO(out_end, rc);
1357 out_end:
1358         record_end_log(env, &llh);
1359 out_free:
1360         OBD_FREE_PTR(lmvdesc);
1361         RETURN(rc);
1362 }
1363
1364 /* lov is the first thing in the mdt and client logs */
1365 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1366                              struct fs_db *fsdb, struct mgs_target_info *mti,
1367                              char *logname, char *lovname)
1368 {
1369         struct llog_handle *llh = NULL;
1370         struct lov_desc *lovdesc;
1371         char *uuid;
1372         int rc = 0;
1373         ENTRY;
1374
1375         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1376
1377         /*
1378         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1379         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1380               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1381         */
1382
1383         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1384         OBD_ALLOC_PTR(lovdesc);
1385         if (lovdesc == NULL)
1386                 RETURN(-ENOMEM);
1387         lovdesc->ld_magic = LOV_DESC_MAGIC;
1388         lovdesc->ld_tgt_count = 0;
1389         /* Defaults.  Can be changed later by lcfg config_param */
1390         lovdesc->ld_default_stripe_count = 1;
1391         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1392         lovdesc->ld_default_stripe_size = 1024 * 1024;
1393         lovdesc->ld_default_stripe_offset = -1;
1394         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1395         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1396         /* can these be the same? */
1397         uuid = (char *)lovdesc->ld_uuid.uuid;
1398
1399         /* This should always be the first entry in a log.
1400         rc = mgs_clear_log(obd, logname); */
1401         rc = record_start_log(env, mgs, &llh, logname);
1402         if (rc)
1403                 GOTO(out_free, rc);
1404         /* FIXME these should be a single journal transaction */
1405         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1406         if (rc)
1407                 GOTO(out_end, rc);
1408         rc = record_attach(env, llh, lovname, "lov", uuid);
1409         if (rc)
1410                 GOTO(out_end, rc);
1411         rc = record_lov_setup(env, llh, lovname, lovdesc);
1412         if (rc)
1413                 GOTO(out_end, rc);
1414         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1415         if (rc)
1416                 GOTO(out_end, rc);
1417         EXIT;
1418 out_end:
1419         record_end_log(env, &llh);
1420 out_free:
1421         OBD_FREE_PTR(lovdesc);
1422         return rc;
1423 }
1424
1425 /* add failnids to open log */
1426 static int mgs_write_log_failnids(const struct lu_env *env,
1427                                   struct mgs_target_info *mti,
1428                                   struct llog_handle *llh,
1429                                   char *cliname)
1430 {
1431         char *failnodeuuid = NULL;
1432         char *ptr = mti->mti_params;
1433         lnet_nid_t nid;
1434         int rc = 0;
1435
1436         /*
1437         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1438         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1439         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1440         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1441         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1442         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1443         */
1444
1445         /* Pull failnid info out of params string */
1446         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1447                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1448                         if (failnodeuuid == NULL) {
1449                                 /* We don't know the failover node name,
1450                                    so just use the first nid as the uuid */
1451                                 rc = name_create(&failnodeuuid,
1452                                                  libcfs_nid2str(nid), "");
1453                                 if (rc)
1454                                         return rc;
1455                         }
1456                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1457                                "client %s\n", libcfs_nid2str(nid),
1458                                failnodeuuid, cliname);
1459                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1460                 }
1461                 if (failnodeuuid)
1462                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1463         }
1464
1465         name_destroy(&failnodeuuid);
1466         return rc;
1467 }
1468
1469 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1470                                     struct mgs_device *mgs,
1471                                     struct fs_db *fsdb,
1472                                     struct mgs_target_info *mti,
1473                                     char *logname, char *lmvname)
1474 {
1475         struct llog_handle *llh = NULL;
1476         char *mdcname = NULL;
1477         char *nodeuuid = NULL;
1478         char *mdcuuid = NULL;
1479         char *lmvuuid = NULL;
1480         char index[6];
1481         int i, rc;
1482         ENTRY;
1483
1484         if (mgs_log_is_empty(env, mgs, logname)) {
1485                 CERROR("log is empty! Logical error\n");
1486                 RETURN(-EINVAL);
1487         }
1488
1489         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1490                mti->mti_svname, logname, lmvname);
1491
1492         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1493         if (rc)
1494                 RETURN(rc);
1495         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1496         if (rc)
1497                 GOTO(out_free, rc);
1498         rc = name_create(&mdcuuid, mdcname, "_UUID");
1499         if (rc)
1500                 GOTO(out_free, rc);
1501         rc = name_create(&lmvuuid, lmvname, "_UUID");
1502         if (rc)
1503                 GOTO(out_free, rc);
1504
1505         rc = record_start_log(env, mgs, &llh, logname);
1506         if (rc)
1507                 GOTO(out_free, rc);
1508         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1509                            "add mdc");
1510         if (rc)
1511                 GOTO(out_end, rc);
1512         for (i = 0; i < mti->mti_nid_count; i++) {
1513                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1514                        libcfs_nid2str(mti->mti_nids[i]));
1515
1516                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1517                 if (rc)
1518                         GOTO(out_end, rc);
1519         }
1520
1521         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1522         if (rc)
1523                 GOTO(out_end, rc);
1524         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1525         if (rc)
1526                 GOTO(out_end, rc);
1527         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1528         if (rc)
1529                 GOTO(out_end, rc);
1530         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1531         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1532                             index, "1");
1533         if (rc)
1534                 GOTO(out_end, rc);
1535         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1536                            "add mdc");
1537         if (rc)
1538                 GOTO(out_end, rc);
1539 out_end:
1540         record_end_log(env, &llh);
1541 out_free:
1542         name_destroy(&lmvuuid);
1543         name_destroy(&mdcuuid);
1544         name_destroy(&mdcname);
1545         name_destroy(&nodeuuid);
1546         RETURN(rc);
1547 }
1548
1549 /* add new mdc to already existent MDS */
1550 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1551                                     struct mgs_device *mgs,
1552                                     struct fs_db *fsdb,
1553                                     struct mgs_target_info *mti,
1554                                     char *logname)
1555 {
1556         struct llog_handle *llh = NULL;
1557         char *nodeuuid = NULL;
1558         char *mdcname = NULL;
1559         char *mdcuuid = NULL;
1560         char *mdtuuid = NULL;
1561         int idx = mti->mti_stripe_index;
1562         char index[9];
1563         int i, rc;
1564
1565         ENTRY;
1566         if (mgs_log_is_empty(env, mgs, logname)) {
1567                 CERROR("log is empty! Logical error\n");
1568                 RETURN (-EINVAL);
1569         }
1570
1571         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
1572
1573         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1574         if (rc)
1575                 RETURN(rc);
1576         snprintf(index, sizeof(index), "-mdc%04x", idx);
1577         rc = name_create(&mdcname, logname, index);
1578         if (rc)
1579                 GOTO(out_free, rc);
1580         rc = name_create(&mdcuuid, mdcname, "_UUID");
1581         if (rc)
1582                 GOTO(out_free, rc);
1583         rc = name_create(&mdtuuid, logname, "_UUID");
1584         if (rc)
1585                 GOTO(out_free, rc);
1586
1587         rc = record_start_log(env, mgs, &llh, logname);
1588         if (rc)
1589                 GOTO(out_free, rc);
1590         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
1591         if (rc)
1592                 GOTO(out_end, rc);
1593         for (i = 0; i < mti->mti_nid_count; i++) {
1594                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1595                        libcfs_nid2str(mti->mti_nids[i]));
1596                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1597                 if (rc)
1598                         GOTO(out_end, rc);
1599         }
1600         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
1601         if (rc)
1602                 GOTO(out_end, rc);
1603         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1604         if (rc)
1605                 GOTO(out_end, rc);
1606         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1607         if (rc)
1608                 GOTO(out_end, rc);
1609         snprintf(index, sizeof(index), "%d", idx);
1610
1611         rc = record_mdc_add(env, llh, logname, mdcuuid, mti->mti_uuid,
1612                             index, "1");
1613         if (rc)
1614                 GOTO(out_end, rc);
1615         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add mdc");
1616         if (rc)
1617                 GOTO(out_end, rc);
1618 out_end:
1619         record_end_log(env, &llh);
1620 out_free:
1621         name_destroy(&mdtuuid);
1622         name_destroy(&mdcuuid);
1623         name_destroy(&mdcname);
1624         name_destroy(&nodeuuid);
1625         RETURN(rc);
1626 }
1627
1628 static int mgs_write_log_mdt0(const struct lu_env *env,
1629                               struct mgs_device *mgs,
1630                               struct fs_db *fsdb,
1631                               struct mgs_target_info *mti)
1632 {
1633         char *log = mti->mti_svname;
1634         struct llog_handle *llh = NULL;
1635         char *uuid, *lovname;
1636         char mdt_index[6];
1637         char *ptr = mti->mti_params;
1638         int rc = 0, failout = 0;
1639         ENTRY;
1640
1641         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
1642         if (uuid == NULL)
1643                 RETURN(-ENOMEM);
1644
1645         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1646                 failout = (strncmp(ptr, "failout", 7) == 0);
1647
1648         rc = name_create(&lovname, log, "-mdtlov");
1649         if (rc)
1650                 GOTO(out_free, rc);
1651         if (mgs_log_is_empty(env, mgs, log)) {
1652                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
1653                 if (rc)
1654                         GOTO(out_lod, rc);
1655         }
1656
1657         sprintf(mdt_index, "%d", mti->mti_stripe_index);
1658
1659         rc = record_start_log(env, mgs, &llh, log);
1660         if (rc)
1661                 GOTO(out_lod, rc);
1662
1663         /* add MDT itself */
1664
1665         /* FIXME this whole fn should be a single journal transaction */
1666         sprintf(uuid, "%s_UUID", log);
1667         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
1668         if (rc)
1669                 GOTO(out_lod, rc);
1670         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
1671         if (rc)
1672                 GOTO(out_end, rc);
1673         rc = record_mount_opt(env, llh, log, lovname, NULL);
1674         if (rc)
1675                 GOTO(out_end, rc);
1676         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
1677                         failout ? "n" : "f");
1678         if (rc)
1679                 GOTO(out_end, rc);
1680         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
1681         if (rc)
1682                 GOTO(out_end, rc);
1683 out_end:
1684         record_end_log(env, &llh);
1685 out_lod:
1686         name_destroy(&lovname);
1687 out_free:
1688         OBD_FREE(uuid, sizeof(struct obd_uuid));
1689         RETURN(rc);
1690 }
1691
1692 static inline int name_create_mdt(char **logname, char *fsname, int i)
1693 {
1694         char mdt_index[9];
1695
1696         sprintf(mdt_index, "-MDT%04x", i);
1697         return name_create(logname, fsname, mdt_index);
1698 }
1699
1700 static int name_create_mdt_and_lov(char **logname, char **lovname,
1701                                     struct fs_db *fsdb, int i)
1702 {
1703         int rc;
1704
1705         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
1706         if (rc)
1707                 return rc;
1708         /* COMPAT_180 */
1709         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1710                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
1711         else
1712                 rc = name_create(lovname, *logname, "-mdtlov");
1713         if (rc) {
1714                 name_destroy(logname);
1715                 *logname = NULL;
1716         }
1717         return rc;
1718 }
1719
1720 static inline int name_create_mdt_osc(char **oscname, char *ostname,
1721                                        struct fs_db *fsdb, int i)
1722 {
1723         char suffix[16];
1724
1725         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1726                 sprintf(suffix, "-osc");
1727         else
1728                 sprintf(suffix, "-osc-MDT%04x", i);
1729         return name_create(oscname, ostname, suffix);
1730 }
1731
1732 /* envelope method for all layers log */
1733 static int mgs_write_log_mdt(const struct lu_env *env,
1734                              struct mgs_device *mgs,
1735                              struct fs_db *fsdb,
1736                              struct mgs_target_info *mti)
1737 {
1738         struct mgs_thread_info *mgi = mgs_env_info(env);
1739         struct llog_handle *llh = NULL;
1740         char *cliname;
1741         int rc, i = 0;
1742         ENTRY;
1743
1744         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
1745
1746         if (mti->mti_uuid[0] == '\0') {
1747                 /* Make up our own uuid */
1748                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1749                          "%s_UUID", mti->mti_svname);
1750         }
1751
1752         /* add mdt */
1753         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
1754         if (rc)
1755                 RETURN(rc);
1756         /* Append the mdt info to the client log */
1757         rc = name_create(&cliname, mti->mti_fsname, "-client");
1758         if (rc)
1759                 RETURN(rc);
1760
1761         if (mgs_log_is_empty(env, mgs, cliname)) {
1762                 /* Start client log */
1763                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
1764                                        fsdb->fsdb_clilov);
1765                 if (rc)
1766                         GOTO(out_free, rc);
1767                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
1768                                        fsdb->fsdb_clilmv);
1769                 if (rc)
1770                         GOTO(out_free, rc);
1771         }
1772
1773         /*
1774         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1775         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
1776         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
1777         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1778         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
1779         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
1780         */
1781
1782                 /* copy client info about lov/lmv */
1783                 mgi->mgi_comp.comp_mti = mti;
1784                 mgi->mgi_comp.comp_fsdb = fsdb;
1785
1786                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
1787                                                         &mgi->mgi_comp);
1788                 if (rc)
1789                         GOTO(out_free, rc);
1790                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
1791                                               fsdb->fsdb_clilmv);
1792                 if (rc)
1793                         GOTO(out_free, rc);
1794
1795                 /* add mountopts */
1796                 rc = record_start_log(env, mgs, &llh, cliname);
1797                 if (rc)
1798                         GOTO(out_free, rc);
1799
1800                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
1801                                    "mount opts");
1802                 if (rc)
1803                         GOTO(out_end, rc);
1804                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
1805                                       fsdb->fsdb_clilmv);
1806                 if (rc)
1807                         GOTO(out_end, rc);
1808                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
1809                                    "mount opts");
1810
1811         if (rc)
1812                 GOTO(out_end, rc);
1813         /* for_all_existing_mdt except current one */
1814         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1815                 char *mdtname;
1816                 if (i !=  mti->mti_stripe_index &&
1817                     test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1818                         rc = name_create_mdt(&mdtname, mti->mti_fsname, i);
1819                         if (rc)
1820                                 GOTO(out_end, rc);
1821                         rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, mti, mdtname);
1822                         name_destroy(&mdtname);
1823                         if (rc)
1824                                 GOTO(out_end, rc);
1825                 }
1826         }
1827 out_end:
1828         record_end_log(env, &llh);
1829 out_free:
1830         name_destroy(&cliname);
1831         RETURN(rc);
1832 }
1833
1834 /* Add the ost info to the client/mdt lov */
1835 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1836                                     struct mgs_device *mgs, struct fs_db *fsdb,
1837                                     struct mgs_target_info *mti,
1838                                     char *logname, char *suffix, char *lovname,
1839                                     enum lustre_sec_part sec_part, int flags)
1840 {
1841         struct llog_handle *llh = NULL;
1842         char *nodeuuid = NULL;
1843         char *oscname = NULL;
1844         char *oscuuid = NULL;
1845         char *lovuuid = NULL;
1846         char *svname = NULL;
1847         char index[6];
1848         int i, rc;
1849
1850         ENTRY;
1851         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
1852                mti->mti_svname, logname);
1853
1854         if (mgs_log_is_empty(env, mgs, logname)) {
1855                 CERROR("log is empty! Logical error\n");
1856                 RETURN (-EINVAL);
1857         }
1858
1859         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1860         if (rc)
1861                 RETURN(rc);
1862         rc = name_create(&svname, mti->mti_svname, "-osc");
1863         if (rc)
1864                 GOTO(out_free, rc);
1865         /* for the system upgraded from old 1.8, keep using the old osc naming
1866          * style for mdt, see name_create_mdt_osc(). LU-1257 */
1867         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1868                 rc = name_create(&oscname, svname, "");
1869         else
1870                 rc = name_create(&oscname, svname, suffix);
1871         if (rc)
1872                 GOTO(out_free, rc);
1873         rc = name_create(&oscuuid, oscname, "_UUID");
1874         if (rc)
1875                 GOTO(out_free, rc);
1876         rc = name_create(&lovuuid, lovname, "_UUID");
1877         if (rc)
1878                 GOTO(out_free, rc);
1879
1880         /*
1881         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1882         multihomed (#4)
1883         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
1884         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
1885         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
1886         failover (#6,7)
1887         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1888         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
1889         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
1890         */
1891
1892         rc = record_start_log(env, mgs, &llh, logname);
1893         if (rc)
1894                 GOTO(out_free, rc);
1895         /* FIXME these should be a single journal transaction */
1896         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
1897                            "add osc");
1898         if (rc)
1899                 GOTO(out_end, rc);
1900         for (i = 0; i < mti->mti_nid_count; i++) {
1901                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
1902                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1903                 if (rc)
1904                         GOTO(out_end, rc);
1905         }
1906         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
1907         if (rc)
1908                 GOTO(out_end, rc);
1909         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
1910         if (rc)
1911                 GOTO(out_end, rc);
1912         rc = mgs_write_log_failnids(env, mti, llh, oscname);
1913         if (rc)
1914                 GOTO(out_end, rc);
1915         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1916         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
1917         if (rc)
1918                 GOTO(out_end, rc);
1919         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
1920                            "add osc");
1921         if (rc)
1922                 GOTO(out_end, rc);
1923 out_end:
1924         record_end_log(env, &llh);
1925 out_free:
1926         name_destroy(&lovuuid);
1927         name_destroy(&oscuuid);
1928         name_destroy(&oscname);
1929         name_destroy(&svname);
1930         name_destroy(&nodeuuid);
1931         RETURN(rc);
1932 }
1933
1934 static int mgs_write_log_ost(const struct lu_env *env,
1935                              struct mgs_device *mgs, struct fs_db *fsdb,
1936                              struct mgs_target_info *mti)
1937 {
1938         struct llog_handle *llh = NULL;
1939         char *logname, *lovname;
1940         char *ptr = mti->mti_params;
1941         int rc, flags = 0, failout = 0, i;
1942         ENTRY;
1943
1944         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
1945
1946         /* The ost startup log */
1947
1948         /* If the ost log already exists, that means that someone reformatted
1949            the ost and it called target_add again. */
1950         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
1951                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
1952                                    "exists, yet the server claims it never "
1953                                    "registered. It may have been reformatted, "
1954                                    "or the index changed. writeconf the MDT to "
1955                                    "regenerate all logs.\n", mti->mti_svname);
1956                 RETURN(-EALREADY);
1957         }
1958
1959         /*
1960         attach obdfilter ost1 ost1_UUID
1961         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
1962         */
1963         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1964                 failout = (strncmp(ptr, "failout", 7) == 0);
1965         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
1966         if (rc)
1967                 RETURN(rc);
1968         /* FIXME these should be a single journal transaction */
1969         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
1970         if (rc)
1971                 GOTO(out_end, rc);
1972         if (*mti->mti_uuid == '\0')
1973                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1974                          "%s_UUID", mti->mti_svname);
1975         rc = record_attach(env, llh, mti->mti_svname,
1976                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
1977         if (rc)
1978                 GOTO(out_end, rc);
1979         rc = record_setup(env, llh, mti->mti_svname,
1980                           "dev"/*ignored*/, "type"/*ignored*/,
1981                           failout ? "n" : "f", 0/*options*/);
1982         if (rc)
1983                 GOTO(out_end, rc);
1984         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
1985         if (rc)
1986                 GOTO(out_end, rc);
1987 out_end:
1988         record_end_log(env, &llh);
1989         if (rc)
1990                 RETURN(rc);
1991         /* We also have to update the other logs where this osc is part of
1992            the lov */
1993
1994         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
1995                 /* If we're upgrading, the old mdt log already has our
1996                    entry. Let's do a fake one for fun. */
1997                 /* Note that we can't add any new failnids, since we don't
1998                    know the old osc names. */
1999                 flags = CM_SKIP | CM_UPGRADE146;
2000
2001         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2002                 /* If the update flag isn't set, don't update client/mdt
2003                    logs. */
2004                 flags |= CM_SKIP;
2005                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2006                               "the MDT first to regenerate it.\n",
2007                               mti->mti_svname);
2008         }
2009
2010         /* Add ost to all MDT lov defs */
2011         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2012                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2013                         char mdt_index[9];
2014
2015                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2016                                                      i);
2017                         if (rc)
2018                                 RETURN(rc);
2019                         sprintf(mdt_index, "-MDT%04x", i);
2020                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2021                                                       logname, mdt_index,
2022                                                       lovname, LUSTRE_SP_MDT,
2023                                                       flags);
2024                         name_destroy(&logname);
2025                         name_destroy(&lovname);
2026                         if (rc)
2027                                 RETURN(rc);
2028                 }
2029         }
2030
2031         /* Append ost info to the client log */
2032         rc = name_create(&logname, mti->mti_fsname, "-client");
2033         if (rc)
2034                 RETURN(rc);
2035         if (mgs_log_is_empty(env, mgs, logname)) {
2036                 /* Start client log */
2037                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2038                                        fsdb->fsdb_clilov);
2039                 if (rc)
2040                         GOTO(out_free, rc);
2041                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2042                                        fsdb->fsdb_clilmv);
2043                 if (rc)
2044                         GOTO(out_free, rc);
2045         }
2046         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2047                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2048 out_free:
2049         name_destroy(&logname);
2050         RETURN(rc);
2051 }
2052
2053 static __inline__ int mgs_param_empty(char *ptr)
2054 {
2055         char *tmp;
2056
2057         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2058                 return 1;
2059         return 0;
2060 }
2061
2062 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2063                                           struct mgs_device *mgs,
2064                                           struct fs_db *fsdb,
2065                                           struct mgs_target_info *mti,
2066                                           char *logname, char *cliname)
2067 {
2068         int rc;
2069         struct llog_handle *llh = NULL;
2070
2071         if (mgs_param_empty(mti->mti_params)) {
2072                 /* Remove _all_ failnids */
2073                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2074                                 mti->mti_svname, "add failnid", CM_SKIP);
2075                 return rc < 0 ? rc : 0;
2076         }
2077
2078         /* Otherwise failover nids are additive */
2079         rc = record_start_log(env, mgs, &llh, logname);
2080         if (rc)
2081                 return rc;
2082                 /* FIXME this should be a single journal transaction */
2083         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2084                            "add failnid");
2085         if (rc)
2086                 goto out_end;
2087         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2088         if (rc)
2089                 goto out_end;
2090         rc = record_marker(env, llh, fsdb, CM_END,
2091                            mti->mti_svname, "add failnid");
2092 out_end:
2093         record_end_log(env, &llh);
2094         return rc;
2095 }
2096
2097
2098 /* Add additional failnids to an existing log.
2099    The mdc/osc must have been added to logs first */
2100 /* tcp nids must be in dotted-quad ascii -
2101    we can't resolve hostnames from the kernel. */
2102 static int mgs_write_log_add_failnid(const struct lu_env *env,
2103                                      struct mgs_device *mgs,
2104                                      struct fs_db *fsdb,
2105                                      struct mgs_target_info *mti)
2106 {
2107         char *logname, *cliname;
2108         int rc;
2109         ENTRY;
2110
2111         /* FIXME we currently can't erase the failnids
2112          * given when a target first registers, since they aren't part of
2113          * an "add uuid" stanza */
2114
2115         /* Verify that we know about this target */
2116         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2117                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2118                                    "yet. It must be started before failnids "
2119                                    "can be added.\n", mti->mti_svname);
2120                 RETURN(-ENOENT);
2121         }
2122
2123         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2124         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2125                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2126         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2127                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2128         } else {
2129                 RETURN(-EINVAL);
2130         }
2131         if (rc)
2132                 RETURN(rc);
2133         /* Add failover nids to the client log */
2134         rc = name_create(&logname, mti->mti_fsname, "-client");
2135         if (rc) {
2136                 name_destroy(&cliname);
2137                 RETURN(rc);
2138         }
2139         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2140         name_destroy(&logname);
2141         name_destroy(&cliname);
2142         if (rc)
2143                 RETURN(rc);
2144
2145         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2146                 /* Add OST failover nids to the MDT logs as well */
2147                 int i;
2148
2149                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2150                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2151                                 continue;
2152                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2153                         if (rc)
2154                                 RETURN(rc);
2155                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2156                                                  fsdb, i);
2157                         if (rc) {
2158                                 name_destroy(&logname);
2159                                 RETURN(rc);
2160                         }
2161                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2162                                                             mti, logname,
2163                                                             cliname);
2164                         name_destroy(&cliname);
2165                         name_destroy(&logname);
2166                         if (rc)
2167                                 RETURN(rc);
2168                 }
2169         }
2170
2171         RETURN(rc);
2172 }
2173
2174 static int mgs_wlp_lcfg(const struct lu_env *env,
2175                         struct mgs_device *mgs, struct fs_db *fsdb,
2176                         struct mgs_target_info *mti,
2177                         char *logname, struct lustre_cfg_bufs *bufs,
2178                         char *tgtname, char *ptr)
2179 {
2180         char comment[MTI_NAME_MAXLEN];
2181         char *tmp;
2182         struct lustre_cfg *lcfg;
2183         int rc, del;
2184
2185         /* Erase any old settings of this same parameter */
2186         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2187         comment[MTI_NAME_MAXLEN - 1] = 0;
2188         /* But don't try to match the value. */
2189         if ((tmp = strchr(comment, '=')))
2190             *tmp = 0;
2191         /* FIXME we should skip settings that are the same as old values */
2192         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2193         if (rc < 0)
2194                 return rc;
2195         del = mgs_param_empty(ptr);
2196
2197         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2198                       "Sett" : "Modify", tgtname, comment, logname);
2199         if (del)
2200                 return rc;
2201
2202         lustre_cfg_bufs_reset(bufs, tgtname);
2203         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2204         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2205         if (!lcfg)
2206                 return -ENOMEM;
2207         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2208         lustre_cfg_free(lcfg);
2209         return rc;
2210 }
2211
2212 /* write global variable settings into log */
2213 static int mgs_write_log_sys(const struct lu_env *env,
2214                              struct mgs_device *mgs, struct fs_db *fsdb,
2215                              struct mgs_target_info *mti, char *sys, char *ptr)
2216 {
2217         struct mgs_thread_info *mgi = mgs_env_info(env);
2218         struct lustre_cfg *lcfg;
2219         char *tmp, sep;
2220         int rc, cmd, convert = 1;
2221
2222         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2223                 cmd = LCFG_SET_TIMEOUT;
2224         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2225                 cmd = LCFG_SET_LDLM_TIMEOUT;
2226         /* Check for known params here so we can return error to lctl */
2227         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2228                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2229                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2230                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2231                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2232                 cmd = LCFG_PARAM;
2233         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2234                 convert = 0; /* Don't convert string value to integer */
2235                 cmd = LCFG_PARAM;
2236         } else {
2237                 return -EINVAL;
2238         }
2239
2240         if (mgs_param_empty(ptr))
2241                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2242         else
2243                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2244
2245         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2246         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2247         if (!convert && *tmp != '\0')
2248                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2249         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2250         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2251         /* truncate the comment to the parameter name */
2252         ptr = tmp - 1;
2253         sep = *ptr;
2254         *ptr = '\0';
2255         /* modify all servers and clients */
2256         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2257                                       *tmp == '\0' ? NULL : lcfg,
2258                                       mti->mti_fsname, sys, 0);
2259         if (rc == 0 && *tmp != '\0') {
2260                 switch (cmd) {
2261                 case LCFG_SET_TIMEOUT:
2262                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2263                                 class_process_config(lcfg);
2264                         break;
2265                 case LCFG_SET_LDLM_TIMEOUT:
2266                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2267                                 class_process_config(lcfg);
2268                         break;
2269                 default:
2270                         break;
2271                 }
2272         }
2273         *ptr = sep;
2274         lustre_cfg_free(lcfg);
2275         return rc;
2276 }
2277
2278 /* write quota settings into log */
2279 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2280                                struct fs_db *fsdb, struct mgs_target_info *mti,
2281                                char *quota, char *ptr)
2282 {
2283         struct mgs_thread_info  *mgi = mgs_env_info(env);
2284         struct lustre_cfg       *lcfg;
2285         char                    *tmp;
2286         char                     sep;
2287         int                      rc, cmd = LCFG_PARAM;
2288
2289         /* support only 'meta' and 'data' pools so far */
2290         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2291             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2292                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2293                        "& quota.ost are)\n", ptr);
2294                 return -EINVAL;
2295         }
2296
2297         if (*tmp == '\0') {
2298                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2299         } else {
2300                 CDEBUG(D_MGS, "global '%s'\n", quota);
2301
2302                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2303                     strcmp(tmp, "none") != 0) {
2304                         CERROR("enable option(%s) isn't supported\n", tmp);
2305                         return -EINVAL;
2306                 }
2307         }
2308
2309         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2310         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2311         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2312         /* truncate the comment to the parameter name */
2313         ptr = tmp - 1;
2314         sep = *ptr;
2315         *ptr = '\0';
2316
2317         /* XXX we duplicated quota enable information in all server
2318          *     config logs, it should be moved to a separate config
2319          *     log once we cleanup the config log for global param. */
2320         /* modify all servers */
2321         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2322                                       *tmp == '\0' ? NULL : lcfg,
2323                                       mti->mti_fsname, quota, 1);
2324         *ptr = sep;
2325         lustre_cfg_free(lcfg);
2326         return rc;
2327 }
2328
2329 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2330                                    struct mgs_device *mgs,
2331                                    struct fs_db *fsdb,
2332                                    struct mgs_target_info *mti,
2333                                    char *param)
2334 {
2335         struct mgs_thread_info *mgi = mgs_env_info(env);
2336         struct llog_handle     *llh = NULL;
2337         char                   *logname;
2338         char                   *comment, *ptr;
2339         struct lustre_cfg      *lcfg;
2340         int                     rc, len;
2341         ENTRY;
2342
2343         /* get comment */
2344         ptr = strchr(param, '=');
2345         LASSERT(ptr);
2346         len = ptr - param;
2347
2348         OBD_ALLOC(comment, len + 1);
2349         if (comment == NULL)
2350                 RETURN(-ENOMEM);
2351         strncpy(comment, param, len);
2352         comment[len] = '\0';
2353
2354         /* prepare lcfg */
2355         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2356         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2357         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2358         if (lcfg == NULL)
2359                 GOTO(out_comment, rc = -ENOMEM);
2360
2361         /* construct log name */
2362         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2363         if (rc)
2364                 GOTO(out_lcfg, rc);
2365
2366         if (mgs_log_is_empty(env, mgs, logname)) {
2367                 rc = record_start_log(env, mgs, &llh, logname);
2368                 if (rc)
2369                         GOTO(out, rc);
2370                 record_end_log(env, &llh);
2371         }
2372
2373         /* obsolete old one */
2374         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2375                         comment, CM_SKIP);
2376         if (rc < 0)
2377                 GOTO(out, rc);
2378         /* write the new one */
2379         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2380                                   mti->mti_svname, comment);
2381         if (rc)
2382                 CERROR("err %d writing log %s\n", rc, logname);
2383 out:
2384         name_destroy(&logname);
2385 out_lcfg:
2386         lustre_cfg_free(lcfg);
2387 out_comment:
2388         OBD_FREE(comment, len + 1);
2389         RETURN(rc);
2390 }
2391
2392 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2393                                         char *param)
2394 {
2395         char    *ptr;
2396
2397         /* disable the adjustable udesc parameter for now, i.e. use default
2398          * setting that client always ship udesc to MDT if possible. to enable
2399          * it simply remove the following line */
2400         goto error_out;
2401
2402         ptr = strchr(param, '=');
2403         if (ptr == NULL)
2404                 goto error_out;
2405         *ptr++ = '\0';
2406
2407         if (strcmp(param, PARAM_SRPC_UDESC))
2408                 goto error_out;
2409
2410         if (strcmp(ptr, "yes") == 0) {
2411                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2412                 CWARN("Enable user descriptor shipping from client to MDT\n");
2413         } else if (strcmp(ptr, "no") == 0) {
2414                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2415                 CWARN("Disable user descriptor shipping from client to MDT\n");
2416         } else {
2417                 *(ptr - 1) = '=';
2418                 goto error_out;
2419         }
2420         return 0;
2421
2422 error_out:
2423         CERROR("Invalid param: %s\n", param);
2424         return -EINVAL;
2425 }
2426
2427 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2428                                   const char *svname,
2429                                   char *param)
2430 {
2431         struct sptlrpc_rule      rule;
2432         struct sptlrpc_rule_set *rset;
2433         int                      rc;
2434         ENTRY;
2435
2436         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2437                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2438                 RETURN(-EINVAL);
2439         }
2440
2441         if (strncmp(param, PARAM_SRPC_UDESC,
2442                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2443                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2444         }
2445
2446         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2447                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2448                 RETURN(-EINVAL);
2449         }
2450
2451         param += sizeof(PARAM_SRPC_FLVR) - 1;
2452
2453         rc = sptlrpc_parse_rule(param, &rule);
2454         if (rc)
2455                 RETURN(rc);
2456
2457         /* mgs rules implies must be mgc->mgs */
2458         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2459                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2460                      rule.sr_from != LUSTRE_SP_ANY) ||
2461                     (rule.sr_to != LUSTRE_SP_MGS &&
2462                      rule.sr_to != LUSTRE_SP_ANY))
2463                         RETURN(-EINVAL);
2464         }
2465
2466         /* preapre room for this coming rule. svcname format should be:
2467          * - fsname: general rule
2468          * - fsname-tgtname: target-specific rule
2469          */
2470         if (strchr(svname, '-')) {
2471                 struct mgs_tgt_srpc_conf *tgtconf;
2472                 int                       found = 0;
2473
2474                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2475                      tgtconf = tgtconf->mtsc_next) {
2476                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2477                                 found = 1;
2478                                 break;
2479                         }
2480                 }
2481
2482                 if (!found) {
2483                         int name_len;
2484
2485                         OBD_ALLOC_PTR(tgtconf);
2486                         if (tgtconf == NULL)
2487                                 RETURN(-ENOMEM);
2488
2489                         name_len = strlen(svname);
2490
2491                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
2492                         if (tgtconf->mtsc_tgt == NULL) {
2493                                 OBD_FREE_PTR(tgtconf);
2494                                 RETURN(-ENOMEM);
2495                         }
2496                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
2497
2498                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
2499                         fsdb->fsdb_srpc_tgt = tgtconf;
2500                 }
2501
2502                 rset = &tgtconf->mtsc_rset;
2503         } else {
2504                 rset = &fsdb->fsdb_srpc_gen;
2505         }
2506
2507         rc = sptlrpc_rule_set_merge(rset, &rule);
2508
2509         RETURN(rc);
2510 }
2511
2512 static int mgs_srpc_set_param(const struct lu_env *env,
2513                               struct mgs_device *mgs,
2514                               struct fs_db *fsdb,
2515                               struct mgs_target_info *mti,
2516                               char *param)
2517 {
2518         char                   *copy;
2519         int                     rc, copy_size;
2520         ENTRY;
2521
2522 #ifndef HAVE_GSS
2523         RETURN(-EINVAL);
2524 #endif
2525         /* keep a copy of original param, which could be destroied
2526          * during parsing */
2527         copy_size = strlen(param) + 1;
2528         OBD_ALLOC(copy, copy_size);
2529         if (copy == NULL)
2530                 return -ENOMEM;
2531         memcpy(copy, param, copy_size);
2532
2533         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
2534         if (rc)
2535                 goto out_free;
2536
2537         /* previous steps guaranteed the syntax is correct */
2538         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
2539         if (rc)
2540                 goto out_free;
2541
2542         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2543                 /*
2544                  * for mgs rules, make them effective immediately.
2545                  */
2546                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
2547                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
2548                                                  &fsdb->fsdb_srpc_gen);
2549         }
2550
2551 out_free:
2552         OBD_FREE(copy, copy_size);
2553         RETURN(rc);
2554 }
2555
2556 struct mgs_srpc_read_data {
2557         struct fs_db   *msrd_fsdb;
2558         int             msrd_skip;
2559 };
2560
2561 static int mgs_srpc_read_handler(const struct lu_env *env,
2562                                  struct llog_handle *llh,
2563                                  struct llog_rec_hdr *rec, void *data)
2564 {
2565         struct mgs_srpc_read_data *msrd = data;
2566         struct cfg_marker         *marker;
2567         struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
2568         char                      *svname, *param;
2569         int                        cfg_len, rc;
2570         ENTRY;
2571
2572         if (rec->lrh_type != OBD_CFG_REC) {
2573                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2574                 RETURN(-EINVAL);
2575         }
2576
2577         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
2578                   sizeof(struct llog_rec_tail);
2579
2580         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2581         if (rc) {
2582                 CERROR("Insane cfg\n");
2583                 RETURN(rc);
2584         }
2585
2586         if (lcfg->lcfg_command == LCFG_MARKER) {
2587                 marker = lustre_cfg_buf(lcfg, 1);
2588
2589                 if (marker->cm_flags & CM_START &&
2590                     marker->cm_flags & CM_SKIP)
2591                         msrd->msrd_skip = 1;
2592                 if (marker->cm_flags & CM_END)
2593                         msrd->msrd_skip = 0;
2594
2595                 RETURN(0);
2596         }
2597
2598         if (msrd->msrd_skip)
2599                 RETURN(0);
2600
2601         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
2602                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
2603                 RETURN(0);
2604         }
2605
2606         svname = lustre_cfg_string(lcfg, 0);
2607         if (svname == NULL) {
2608                 CERROR("svname is empty\n");
2609                 RETURN(0);
2610         }
2611
2612         param = lustre_cfg_string(lcfg, 1);
2613         if (param == NULL) {
2614                 CERROR("param is empty\n");
2615                 RETURN(0);
2616         }
2617
2618         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
2619         if (rc)
2620                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
2621
2622         RETURN(0);
2623 }
2624
2625 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
2626                                 struct mgs_device *mgs,
2627                                 struct fs_db *fsdb)
2628 {
2629         struct llog_handle        *llh = NULL;
2630         struct llog_ctxt          *ctxt;
2631         char                      *logname;
2632         struct mgs_srpc_read_data  msrd;
2633         int                        rc;
2634         ENTRY;
2635
2636         /* construct log name */
2637         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
2638         if (rc)
2639                 RETURN(rc);
2640
2641         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2642         LASSERT(ctxt != NULL);
2643
2644         if (mgs_log_is_empty(env, mgs, logname))
2645                 GOTO(out, rc = 0);
2646
2647         rc = llog_open(env, ctxt, &llh, NULL, logname,
2648                        LLOG_OPEN_EXISTS);
2649         if (rc < 0) {
2650                 if (rc == -ENOENT)
2651                         rc = 0;
2652                 GOTO(out, rc);
2653         }
2654
2655         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
2656         if (rc)
2657                 GOTO(out_close, rc);
2658
2659         if (llog_get_size(llh) <= 1)
2660                 GOTO(out_close, rc = 0);
2661
2662         msrd.msrd_fsdb = fsdb;
2663         msrd.msrd_skip = 0;
2664
2665         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
2666                           NULL);
2667
2668 out_close:
2669         llog_close(env, llh);
2670 out:
2671         llog_ctxt_put(ctxt);
2672         name_destroy(&logname);
2673
2674         if (rc)
2675                 CERROR("failed to read sptlrpc config database: %d\n", rc);
2676         RETURN(rc);
2677 }
2678
2679 /* Permanent settings of all parameters by writing into the appropriate
2680  * configuration logs.
2681  * A parameter with null value ("<param>='\0'") means to erase it out of
2682  * the logs.
2683  */
2684 static int mgs_write_log_param(const struct lu_env *env,
2685                                struct mgs_device *mgs, struct fs_db *fsdb,
2686                                struct mgs_target_info *mti, char *ptr)
2687 {
2688         struct mgs_thread_info *mgi = mgs_env_info(env);
2689         char *logname;
2690         char *tmp;
2691         int rc = 0, rc2 = 0;
2692         ENTRY;
2693
2694         /* For various parameter settings, we have to figure out which logs
2695            care about them (e.g. both mdt and client for lov settings) */
2696         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2697
2698         /* The params are stored in MOUNT_DATA_FILE and modified via
2699            tunefs.lustre, or set using lctl conf_param */
2700
2701         /* Processed in lustre_start_mgc */
2702         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
2703                 GOTO(end, rc);
2704
2705         /* Processed in ost/mdt */
2706         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
2707                 GOTO(end, rc);
2708
2709         /* Processed in mgs_write_log_ost */
2710         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
2711                 if (mti->mti_flags & LDD_F_PARAM) {
2712                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
2713                                            "changed with tunefs.lustre"
2714                                            "and --writeconf\n", ptr);
2715                         rc = -EPERM;
2716                 }
2717                 GOTO(end, rc);
2718         }
2719
2720         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
2721                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
2722                 GOTO(end, rc);
2723         }
2724
2725         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
2726                 /* Add a failover nidlist */
2727                 rc = 0;
2728                 /* We already processed failovers params for new
2729                    targets in mgs_write_log_target */
2730                 if (mti->mti_flags & LDD_F_PARAM) {
2731                         CDEBUG(D_MGS, "Adding failnode\n");
2732                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
2733                 }
2734                 GOTO(end, rc);
2735         }
2736
2737         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
2738                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
2739                 GOTO(end, rc);
2740         }
2741
2742         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
2743                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
2744                 GOTO(end, rc);
2745         }
2746
2747         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
2748                 /* active=0 means off, anything else means on */
2749                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
2750                 int i;
2751
2752                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2753                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
2754                                            "be (de)activated.\n",
2755                                            mti->mti_svname);
2756                         GOTO(end, rc = -EINVAL);
2757                 }
2758                 LCONSOLE_WARN("Permanently %sactivating %s\n",
2759                               flag ? "de": "re", mti->mti_svname);
2760                 /* Modify clilov */
2761                 rc = name_create(&logname, mti->mti_fsname, "-client");
2762                 if (rc)
2763                         GOTO(end, rc);
2764                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2765                                 mti->mti_svname, "add osc", flag);
2766                 name_destroy(&logname);
2767                 if (rc)
2768                         goto active_err;
2769                 /* Modify mdtlov */
2770                 /* Add to all MDT logs for CMD */
2771                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2772                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2773                                 continue;
2774                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2775                         if (rc)
2776                                 GOTO(end, rc);
2777                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
2778                                         mti->mti_svname, "add osc", flag);
2779                         name_destroy(&logname);
2780                         if (rc)
2781                                 goto active_err;
2782                 }
2783         active_err:
2784                 if (rc) {
2785                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
2786                                            "log (%d). No permanent "
2787                                            "changes were made to the "
2788                                            "config log.\n",
2789                                            mti->mti_svname, rc);
2790                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
2791                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
2792                                                    " because the log"
2793                                                    "is in the old 1.4"
2794                                                    "style. Consider "
2795                                                    " --writeconf to "
2796                                                    "update the logs.\n");
2797                         GOTO(end, rc);
2798                 }
2799                 /* Fall through to osc proc for deactivating live OSC
2800                    on running MDT / clients. */
2801         }
2802         /* Below here, let obd's XXX_process_config methods handle it */
2803
2804         /* All lov. in proc */
2805         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
2806                 char *mdtlovname;
2807
2808                 CDEBUG(D_MGS, "lov param %s\n", ptr);
2809                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
2810                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
2811                                            "set on the MDT, not %s. "
2812                                            "Ignoring.\n",
2813                                            mti->mti_svname);
2814                         GOTO(end, rc = 0);
2815                 }
2816
2817                 /* Modify mdtlov */
2818                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2819                         GOTO(end, rc = -ENODEV);
2820
2821                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
2822                                              mti->mti_stripe_index);
2823                 if (rc)
2824                         GOTO(end, rc);
2825                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2826                                   &mgi->mgi_bufs, mdtlovname, ptr);
2827                 name_destroy(&logname);
2828                 name_destroy(&mdtlovname);
2829                 if (rc)
2830                         GOTO(end, rc);
2831
2832                 /* Modify clilov */
2833                 rc = name_create(&logname, mti->mti_fsname, "-client");
2834                 if (rc)
2835                         GOTO(end, rc);
2836                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2837                                   fsdb->fsdb_clilov, ptr);
2838                 name_destroy(&logname);
2839                 GOTO(end, rc);
2840         }
2841
2842         /* All osc., mdc., llite. params in proc */
2843         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
2844             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
2845             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
2846                 char *cname;
2847
2848                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2849                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
2850                                            " cannot be modified. Consider"
2851                                            " updating the configuration with"
2852                                            " --writeconf\n",
2853                                            mti->mti_svname);
2854                         GOTO(end, rc = -EINVAL);
2855                 }
2856                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
2857                         rc = name_create(&cname, mti->mti_fsname, "-client");
2858                         /* Add the client type to match the obdname in
2859                            class_config_llog_handler */
2860                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2861                         rc = name_create(&cname, mti->mti_svname, "-mdc");
2862                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2863                         rc = name_create(&cname, mti->mti_svname, "-osc");
2864                 } else {
2865                         GOTO(end, rc = -EINVAL);
2866                 }
2867                 if (rc)
2868                         GOTO(end, rc);
2869
2870                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2871
2872                 /* Modify client */
2873                 rc = name_create(&logname, mti->mti_fsname, "-client");
2874                 if (rc) {
2875                         name_destroy(&cname);
2876                         GOTO(end, rc);
2877                 }
2878                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2879                                   cname, ptr);
2880
2881                 /* osc params affect the MDT as well */
2882                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2883                         int i;
2884
2885                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2886                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2887                                         continue;
2888                                 name_destroy(&cname);
2889                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
2890                                                          fsdb, i);
2891                                 name_destroy(&logname);
2892                                 if (rc)
2893                                         break;
2894                                 rc = name_create_mdt(&logname,
2895                                                      mti->mti_fsname, i);
2896                                 if (rc)
2897                                         break;
2898                                 if (!mgs_log_is_empty(env, mgs, logname)) {
2899                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
2900                                                           mti, logname,
2901                                                           &mgi->mgi_bufs,
2902                                                           cname, ptr);
2903                                         if (rc)
2904                                                 break;
2905                                 }
2906                         }
2907                 }
2908                 name_destroy(&logname);
2909                 name_destroy(&cname);
2910                 GOTO(end, rc);
2911         }
2912
2913         /* All mdt. params in proc */
2914         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
2915                 int i;
2916                 __u32 idx;
2917
2918                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2919                 if (strncmp(mti->mti_svname, mti->mti_fsname,
2920                             MTI_NAME_MAXLEN) == 0)
2921                         /* device is unspecified completely? */
2922                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
2923                 else
2924                         rc = server_name2index(mti->mti_svname, &idx, NULL);
2925                 if (rc < 0)
2926                         goto active_err;
2927                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
2928                         goto active_err;
2929                 if (rc & LDD_F_SV_ALL) {
2930                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2931                                 if (!test_bit(i,
2932                                                   fsdb->fsdb_mdt_index_map))
2933                                         continue;
2934                                 rc = name_create_mdt(&logname,
2935                                                 mti->mti_fsname, i);
2936                                 if (rc)
2937                                         goto active_err;
2938                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2939                                                   logname, &mgi->mgi_bufs,
2940                                                   logname, ptr);
2941                                 name_destroy(&logname);
2942                                 if (rc)
2943                                         goto active_err;
2944                         }
2945                 } else {
2946                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2947                                           mti->mti_svname, &mgi->mgi_bufs,
2948                                           mti->mti_svname, ptr);
2949                         if (rc)
2950                                 goto active_err;
2951                 }
2952                 GOTO(end, rc);
2953         }
2954
2955         /* All mdd., ost. params in proc */
2956         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
2957             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
2958                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2959                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2960                         GOTO(end, rc = -ENODEV);
2961
2962                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2963                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
2964                 GOTO(end, rc);
2965         }
2966
2967         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
2968         rc2 = -ENOSYS;
2969
2970 end:
2971         if (rc)
2972                 CERROR("err %d on param '%s'\n", rc, ptr);
2973
2974         RETURN(rc ?: rc2);
2975 }
2976
2977 /* Not implementing automatic failover nid addition at this time. */
2978 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
2979                       struct mgs_target_info *mti)
2980 {
2981 #if 0
2982         struct fs_db *fsdb;
2983         int rc;
2984         ENTRY;
2985
2986         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
2987         if (rc)
2988                 RETURN(rc);
2989
2990         if (mgs_log_is_empty(obd, mti->mti_svname))
2991                 /* should never happen */
2992                 RETURN(-ENOENT);
2993
2994         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
2995
2996         /* FIXME We can just check mti->params to see if we're already in
2997            the failover list.  Modify mti->params for rewriting back at
2998            server_register_target(). */
2999
3000         mutex_lock(&fsdb->fsdb_mutex);
3001         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3002         mutex_unlock(&fsdb->fsdb_mutex);
3003
3004         RETURN(rc);
3005 #endif
3006         return 0;
3007 }
3008
3009 int mgs_write_log_target(const struct lu_env *env,
3010                          struct mgs_device *mgs,
3011                          struct mgs_target_info *mti,
3012                          struct fs_db *fsdb)
3013 {
3014         int rc = -EINVAL;
3015         char *buf, *params;
3016         ENTRY;
3017
3018         /* set/check the new target index */
3019         rc = mgs_set_index(env, mgs, mti);
3020         if (rc < 0) {
3021                 CERROR("Can't get index (%d)\n", rc);
3022                 RETURN(rc);
3023         }
3024
3025         if (rc == EALREADY) {
3026                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3027                               mti->mti_stripe_index, mti->mti_svname);
3028                 /* We would like to mark old log sections as invalid
3029                    and add new log sections in the client and mdt logs.
3030                    But if we add new sections, then live clients will
3031                    get repeat setup instructions for already running
3032                    osc's. So don't update the client/mdt logs. */
3033                 mti->mti_flags &= ~LDD_F_UPDATE;
3034         }
3035
3036         mutex_lock(&fsdb->fsdb_mutex);
3037
3038         if (mti->mti_flags &
3039             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3040                 /* Generate a log from scratch */
3041                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3042                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3043                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3044                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3045                 } else {
3046                         CERROR("Unknown target type %#x, can't create log for "
3047                                "%s\n", mti->mti_flags, mti->mti_svname);
3048                 }
3049                 if (rc) {
3050                         CERROR("Can't write logs for %s (%d)\n",
3051                                mti->mti_svname, rc);
3052                         GOTO(out_up, rc);
3053                 }
3054         } else {
3055                 /* Just update the params from tunefs in mgs_write_log_params */
3056                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3057                 mti->mti_flags |= LDD_F_PARAM;
3058         }
3059
3060         /* allocate temporary buffer, where class_get_next_param will
3061            make copy of a current  parameter */
3062         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3063         if (buf == NULL)
3064                 GOTO(out_up, rc = -ENOMEM);
3065         params = mti->mti_params;
3066         while (params != NULL) {
3067                 rc = class_get_next_param(&params, buf);
3068                 if (rc) {
3069                         if (rc == 1)
3070                                 /* there is no next parameter, that is
3071                                    not an error */
3072                                 rc = 0;
3073                         break;
3074                 }
3075                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3076                        params, buf);
3077                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3078                 if (rc)
3079                         break;
3080         }
3081
3082         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3083
3084 out_up:
3085         mutex_unlock(&fsdb->fsdb_mutex);
3086         RETURN(rc);
3087 }
3088
3089 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3090 {
3091         struct llog_ctxt        *ctxt;
3092         int                      rc = 0;
3093
3094         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3095         if (ctxt == NULL) {
3096                 CERROR("%s: MGS config context doesn't exist\n",
3097                        mgs->mgs_obd->obd_name);
3098                 rc = -ENODEV;
3099         } else {
3100                 rc = llog_erase(env, ctxt, NULL, name);
3101                 /* llog may not exist */
3102                 if (rc == -ENOENT)
3103                         rc = 0;
3104                 llog_ctxt_put(ctxt);
3105         }
3106
3107         if (rc)
3108                 CERROR("%s: failed to clear log %s: %d\n",
3109                        mgs->mgs_obd->obd_name, name, rc);
3110
3111         return rc;
3112 }
3113
3114 /* erase all logs for the given fs */
3115 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3116 {
3117         struct fs_db *fsdb;
3118         cfs_list_t list;
3119         struct mgs_direntry *dirent, *n;
3120         int rc, len = strlen(fsname);
3121         char *suffix;
3122         ENTRY;
3123
3124         /* Find all the logs in the CONFIGS directory */
3125         rc = class_dentry_readdir(env, mgs, &list);
3126         if (rc)
3127                 RETURN(rc);
3128
3129         mutex_lock(&mgs->mgs_mutex);
3130
3131         /* Delete the fs db */
3132         fsdb = mgs_find_fsdb(mgs, fsname);
3133         if (fsdb)
3134                 mgs_free_fsdb(mgs, fsdb);
3135
3136         mutex_unlock(&mgs->mgs_mutex);
3137
3138         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3139                 cfs_list_del(&dirent->list);
3140                 suffix = strrchr(dirent->name, '-');
3141                 if (suffix != NULL) {
3142                         if ((len == suffix - dirent->name) &&
3143                             (strncmp(fsname, dirent->name, len) == 0)) {
3144                                 CDEBUG(D_MGS, "Removing log %s\n",
3145                                        dirent->name);
3146                                 mgs_erase_log(env, mgs, dirent->name);
3147                         }
3148                 }
3149                 mgs_direntry_free(dirent);
3150         }
3151
3152         RETURN(rc);
3153 }
3154
3155 /* from llog_swab */
3156 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3157 {
3158         int i;
3159         ENTRY;
3160
3161         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3162         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3163
3164         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3165         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3166         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3167         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3168
3169         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3170         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3171                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3172                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3173                                i, lcfg->lcfg_buflens[i],
3174                                lustre_cfg_string(lcfg, i));
3175                 }
3176         EXIT;
3177 }
3178
3179 /* Set a permanent (config log) param for a target or fs
3180  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3181  *             buf1 contains the single parameter
3182  */
3183 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3184                  struct lustre_cfg *lcfg, char *fsname)
3185 {
3186         struct fs_db *fsdb;
3187         struct mgs_target_info *mti;
3188         char *devname, *param;
3189         char *ptr, *tmp;
3190         __u32 index;
3191         int rc = 0;
3192         ENTRY;
3193
3194         print_lustre_cfg(lcfg);
3195
3196         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3197         devname = lustre_cfg_string(lcfg, 0);
3198         param = lustre_cfg_string(lcfg, 1);
3199         if (!devname) {
3200                 /* Assume device name embedded in param:
3201                    lustre-OST0000.osc.max_dirty_mb=32 */
3202                 ptr = strchr(param, '.');
3203                 if (ptr) {
3204                         devname = param;
3205                         *ptr = 0;
3206                         param = ptr + 1;
3207                 }
3208         }
3209         if (!devname) {
3210                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3211                 RETURN(-ENOSYS);
3212         }
3213
3214         /* Extract fsname */
3215         ptr = strrchr(devname, '-');
3216         memset(fsname, 0, MTI_NAME_MAXLEN);
3217         if (ptr && (server_name2index(ptr, &index, NULL) >= 0)) {
3218                 /* param related to llite isn't allowed to set by OST or MDT */
3219                 if (strncmp(param, PARAM_LLITE, sizeof(PARAM_LLITE)) == 0)
3220                         RETURN(-EINVAL);
3221
3222                 strncpy(fsname, devname, ptr - devname);
3223         } else {
3224                 /* assume devname is the fsname */
3225                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3226         }
3227         fsname[MTI_NAME_MAXLEN - 1] = 0;
3228         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3229
3230         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3231         if (rc)
3232                 RETURN(rc);
3233         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3234             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3235                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3236                        "is '%s'\n", fsname, devname);
3237                 mgs_free_fsdb(mgs, fsdb);
3238                 RETURN(-EINVAL);
3239         }
3240
3241         /* Create a fake mti to hold everything */
3242         OBD_ALLOC_PTR(mti);
3243         if (!mti)
3244                 GOTO(out, rc = -ENOMEM);
3245         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3246         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3247         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3248         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3249         if (rc < 0)
3250                 /* Not a valid server; may be only fsname */
3251                 rc = 0;
3252         else
3253                 /* Strip -osc or -mdc suffix from svname */
3254                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3255                                      mti->mti_svname))
3256                         GOTO(out, rc = -EINVAL);
3257
3258         mti->mti_flags = rc | LDD_F_PARAM;
3259
3260         mutex_lock(&fsdb->fsdb_mutex);
3261         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3262         mutex_unlock(&fsdb->fsdb_mutex);
3263
3264         /*
3265          * Revoke lock so everyone updates.  Should be alright if
3266          * someone was already reading while we were updating the logs,
3267          * so we don't really need to hold the lock while we're
3268          * writing (above).
3269          */
3270         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3271 out:
3272         OBD_FREE_PTR(mti);
3273         RETURN(rc);
3274 }
3275
3276 static int mgs_write_log_pool(const struct lu_env *env,
3277                               struct mgs_device *mgs, char *logname,
3278                               struct fs_db *fsdb, char *lovname,
3279                               enum lcfg_command_type cmd,
3280                               char *poolname, char *fsname,
3281                               char *ostname, char *comment)
3282 {
3283         struct llog_handle *llh = NULL;
3284         int rc;
3285
3286         rc = record_start_log(env, mgs, &llh, logname);
3287         if (rc)
3288                 return rc;
3289         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3290         if (rc)
3291                 goto out;
3292         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3293         if (rc)
3294                 goto out;
3295         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3296 out:
3297         record_end_log(env, &llh);
3298         return rc;
3299 }
3300
3301 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3302                  enum lcfg_command_type cmd, char *fsname,
3303                  char *poolname, char *ostname)
3304 {
3305         struct fs_db *fsdb;
3306         char *lovname;
3307         char *logname;
3308         char *label = NULL, *canceled_label = NULL;
3309         int label_sz;
3310         struct mgs_target_info *mti = NULL;
3311         int rc, i;
3312         ENTRY;
3313
3314         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3315         if (rc) {
3316                 CERROR("Can't get db for %s\n", fsname);
3317                 RETURN(rc);
3318         }
3319         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3320                 CERROR("%s is not defined\n", fsname);
3321                 mgs_free_fsdb(mgs, fsdb);
3322                 RETURN(-EINVAL);
3323         }
3324
3325         label_sz = 10 + strlen(fsname) + strlen(poolname);
3326
3327         /* check if ostname match fsname */
3328         if (ostname != NULL) {
3329                 char *ptr;
3330
3331                 ptr = strrchr(ostname, '-');
3332                 if ((ptr == NULL) ||
3333                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3334                         RETURN(-EINVAL);
3335                 label_sz += strlen(ostname);
3336         }
3337
3338         OBD_ALLOC(label, label_sz);
3339         if (label == NULL)
3340                 RETURN(-ENOMEM);
3341
3342         switch(cmd) {
3343         case LCFG_POOL_NEW:
3344                 sprintf(label,
3345                         "new %s.%s", fsname, poolname);
3346                 break;
3347         case LCFG_POOL_ADD:
3348                 sprintf(label,
3349                         "add %s.%s.%s", fsname, poolname, ostname);
3350                 break;
3351         case LCFG_POOL_REM:
3352                 OBD_ALLOC(canceled_label, label_sz);
3353                 if (canceled_label == NULL)
3354                         GOTO(out_label, rc = -ENOMEM);
3355                 sprintf(label,
3356                         "rem %s.%s.%s", fsname, poolname, ostname);
3357                 sprintf(canceled_label,
3358                         "add %s.%s.%s", fsname, poolname, ostname);
3359                 break;
3360         case LCFG_POOL_DEL:
3361                 OBD_ALLOC(canceled_label, label_sz);
3362                 if (canceled_label == NULL)
3363                         GOTO(out_label, rc = -ENOMEM);
3364                 sprintf(label,
3365                         "del %s.%s", fsname, poolname);
3366                 sprintf(canceled_label,
3367                         "new %s.%s", fsname, poolname);
3368                 break;
3369         default:
3370                 break;
3371         }
3372
3373         mutex_lock(&fsdb->fsdb_mutex);
3374
3375         if (canceled_label != NULL) {
3376                 OBD_ALLOC_PTR(mti);
3377                 if (mti == NULL)
3378                         GOTO(out_cancel, rc = -ENOMEM);
3379         }
3380
3381         /* write pool def to all MDT logs */
3382         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3383                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3384                         rc = name_create_mdt_and_lov(&logname, &lovname,
3385                                                      fsdb, i);
3386                         if (rc) {
3387                                 mutex_unlock(&fsdb->fsdb_mutex);
3388                                 GOTO(out_mti, rc);
3389                         }
3390                         if (canceled_label != NULL) {
3391                                 strcpy(mti->mti_svname, "lov pool");
3392                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3393                                                 lovname, canceled_label,
3394                                                 CM_SKIP);
3395                         }
3396
3397                         if (rc >= 0)
3398                                 rc = mgs_write_log_pool(env, mgs, logname,
3399                                                         fsdb, lovname, cmd,
3400                                                         fsname, poolname,
3401                                                         ostname, label);
3402                         name_destroy(&logname);
3403                         name_destroy(&lovname);
3404                         if (rc) {
3405                                 mutex_unlock(&fsdb->fsdb_mutex);
3406                                 GOTO(out_mti, rc);
3407                         }
3408                 }
3409         }
3410
3411         rc = name_create(&logname, fsname, "-client");
3412         if (rc) {
3413                 mutex_unlock(&fsdb->fsdb_mutex);
3414                 GOTO(out_mti, rc);
3415         }
3416         if (canceled_label != NULL) {
3417                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3418                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3419                 if (rc < 0) {
3420                         mutex_unlock(&fsdb->fsdb_mutex);
3421                         name_destroy(&logname);
3422                         GOTO(out_mti, rc);
3423                 }
3424         }
3425
3426         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3427                                 cmd, fsname, poolname, ostname, label);
3428         mutex_unlock(&fsdb->fsdb_mutex);
3429         name_destroy(&logname);
3430         /* request for update */
3431         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3432
3433         EXIT;
3434 out_mti:
3435         if (mti != NULL)
3436                 OBD_FREE_PTR(mti);
3437 out_cancel:
3438         if (canceled_label != NULL)
3439                 OBD_FREE(canceled_label, label_sz);
3440 out_label:
3441         OBD_FREE(label, label_sz);
3442         return rc;
3443 }
3444
3445 #if 0
3446 /******************** unused *********************/
3447 static int mgs_backup_llog(struct obd_device *obd, char* fsname)
3448 {
3449         struct file *filp, *bak_filp;
3450         struct lvfs_run_ctxt saved;
3451         char *logname, *buf;
3452         loff_t soff = 0 , doff = 0;
3453         int count = 4096, len;
3454         int rc = 0;
3455
3456         OBD_ALLOC(logname, PATH_MAX);
3457         if (logname == NULL)
3458                 return -ENOMEM;
3459
3460         OBD_ALLOC(buf, count);
3461         if (!buf)
3462                 GOTO(out , rc = -ENOMEM);
3463
3464         len = snprintf(logname, PATH_MAX, "%s/%s.bak",
3465                        MOUNT_CONFIGS_DIR, fsname);
3466
3467         if (len >= PATH_MAX - 1) {
3468                 GOTO(out, -ENAMETOOLONG);
3469         }
3470
3471         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3472
3473         bak_filp = l_filp_open(logname, O_RDWR|O_CREAT|O_TRUNC, 0660);
3474         if (IS_ERR(bak_filp)) {
3475                 rc = PTR_ERR(bak_filp);
3476                 CERROR("backup logfile open %s: %d\n", logname, rc);
3477                 GOTO(pop, rc);
3478         }
3479         sprintf(logname, "%s/%s", MOUNT_CONFIGS_DIR, fsname);
3480         filp = l_filp_open(logname, O_RDONLY, 0);
3481         if (IS_ERR(filp)) {
3482                 rc = PTR_ERR(filp);
3483                 CERROR("logfile open %s: %d\n", logname, rc);
3484                 GOTO(close1f, rc);
3485         }
3486
3487         while ((rc = lustre_fread(filp, buf, count, &soff)) > 0) {
3488                 rc = lustre_fwrite(bak_filp, buf, count, &doff);
3489                 break;
3490         }
3491
3492         filp_close(filp, 0);
3493 close1f:
3494         filp_close(bak_filp, 0);
3495 pop:
3496         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3497 out:
3498         if (buf)
3499                 OBD_FREE(buf, count);
3500         OBD_FREE(logname, PATH_MAX);
3501         return rc;
3502 }
3503
3504 #endif