Whamcloud - gitweb
LU-1301 mgs: osd api based setup
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MGS
44 #define D_MGS D_CONFIG
45
46 #include <obd.h>
47 #include <obd_lov.h>
48 #include <lustre_param.h>
49 #include <lustre_sec.h>
50 #include <lquota.h>
51 #include <lustre_log.h>
52
53 #include "mgs_internal.h"
54
55 /********************** Class functions ********************/
56
57 /* Caller must list_del and OBD_FREE each dentry from the list */
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *dentry_list)
60 {
61         /* see mds_cleanup_pending */
62         struct lvfs_run_ctxt saved;
63         struct file *file;
64         struct dentry *dentry;
65         struct vfsmount *mnt;
66         int rc = 0;
67         ENTRY;
68
69         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
70         dentry = dget(mgs->mgs_configs_dir_old);
71         if (IS_ERR(dentry))
72                 GOTO(out_pop, rc = PTR_ERR(dentry));
73         mnt = mntget(mgs->mgs_vfsmnt);
74         if (IS_ERR(mnt)) {
75                 l_dput(dentry);
76                 GOTO(out_pop, rc = PTR_ERR(mnt));
77         }
78
79         file = ll_dentry_open(dentry, mnt, O_RDONLY, current_cred());
80         if (IS_ERR(file))
81                 /* dentry_open_it() drops the dentry, mnt refs */
82                 GOTO(out_pop, rc = PTR_ERR(file));
83
84         CFS_INIT_LIST_HEAD(dentry_list);
85         rc = l_readdir(file, dentry_list);
86         filp_close(file, 0);
87         /*  filp_close->fput() drops the dentry, mnt refs */
88
89 out_pop:
90         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
91         RETURN(rc);
92 }
93
94 /******************** DB functions *********************/
95
96 static inline int name_create(char **newname, char *prefix, char *suffix)
97 {
98         LASSERT(newname);
99         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
100         if (!*newname)
101                 return -ENOMEM;
102         sprintf(*newname, "%s%s", prefix, suffix);
103         return 0;
104 }
105
106 static inline void name_destroy(char **name)
107 {
108         if (*name)
109                 OBD_FREE(*name, strlen(*name) + 1);
110         *name = NULL;
111 }
112
113 struct mgs_fsdb_handler_data
114 {
115         struct fs_db   *fsdb;
116         __u32           ver;
117 };
118
119 /* from the (client) config log, figure out:
120         1. which ost's/mdt's are configured (by index)
121         2. what the last config step is
122         3. COMPAT_18 osc name
123 */
124 /* It might be better to have a separate db file, instead of parsing the info
125    out of the client log.  This is slow and potentially error-prone. */
126 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
127                             struct llog_rec_hdr *rec, void *data)
128 {
129         struct mgs_fsdb_handler_data *d = data;
130         struct fs_db *fsdb = d->fsdb;
131         int cfg_len = rec->lrh_len;
132         char *cfg_buf = (char*) (rec + 1);
133         struct lustre_cfg *lcfg;
134         __u32 index;
135         int rc = 0;
136         ENTRY;
137
138         if (rec->lrh_type != OBD_CFG_REC) {
139                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
140                 RETURN(-EINVAL);
141         }
142
143         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
144         if (rc) {
145                 CERROR("Insane cfg\n");
146                 RETURN(rc);
147         }
148
149         lcfg = (struct lustre_cfg *)cfg_buf;
150
151         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
152                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
153
154         /* Figure out ost indicies */
155         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
156         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
157             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
158                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
159                                        NULL, 10);
160                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
161                        lustre_cfg_string(lcfg, 1), index,
162                        lustre_cfg_string(lcfg, 2));
163                 cfs_set_bit(index, fsdb->fsdb_ost_index_map);
164         }
165
166         /* Figure out mdt indicies */
167         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
168         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
169             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
170                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
171                                        &index, NULL);
172                 if (rc != LDD_F_SV_TYPE_MDT) {
173                         CWARN("Unparsable MDC name %s, assuming index 0\n",
174                               lustre_cfg_string(lcfg, 0));
175                         index = 0;
176                 }
177                 rc = 0;
178                 CDEBUG(D_MGS, "MDT index is %u\n", index);
179                 cfs_set_bit(index, fsdb->fsdb_mdt_index_map);
180                 fsdb->fsdb_mdt_count ++;
181         }
182
183         /**
184          * figure out the old config. fsdb_gen = 0 means old log
185          * It is obsoleted and not supported anymore
186          */
187         if (fsdb->fsdb_gen == 0) {
188                 CERROR("Old config format is not supported\n");
189                 RETURN(-EINVAL);
190         }
191
192         /*
193          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
194          */
195         if (!cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
196             lcfg->lcfg_command == LCFG_ATTACH &&
197             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
198                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
199                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
200                         CWARN("MDT using 1.8 OSC name scheme\n");
201                         cfs_set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
202                 }
203         }
204
205         if (lcfg->lcfg_command == LCFG_MARKER) {
206                 struct cfg_marker *marker;
207                 marker = lustre_cfg_buf(lcfg, 1);
208
209                 d->ver = marker->cm_vers;
210
211                 /* Keep track of the latest marker step */
212                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
213         }
214
215         RETURN(rc);
216 }
217
218 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
219 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
220                                   struct mgs_device *mgs,
221                                   struct fs_db *fsdb)
222 {
223         char *logname;
224         struct llog_handle *loghandle;
225         struct lvfs_run_ctxt saved;
226         struct llog_ctxt *ctxt;
227         struct mgs_fsdb_handler_data d = { fsdb, 0 };
228         int rc, rc2;
229         ENTRY;
230
231         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
232         LASSERT(ctxt != NULL);
233         rc = name_create(&logname, fsdb->fsdb_name, "-client");
234         if (rc)
235                 GOTO(out_put, rc);
236         cfs_mutex_lock(&fsdb->fsdb_mutex);
237         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
238         rc = llog_open_create(NULL, ctxt, &loghandle, NULL, logname);
239         if (rc)
240                 GOTO(out_pop, rc);
241
242         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
243         if (rc)
244                 GOTO(out_close, rc);
245
246         if (llog_get_size(loghandle) <= 1)
247                 cfs_set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
248
249         rc = llog_process_or_fork(env, loghandle, mgs_fsdb_handler, (void *)&d,
250                                   NULL, false);
251         CDEBUG(D_INFO, "get_db = %d\n", rc);
252 out_close:
253         rc2 = llog_close(NULL, loghandle);
254         if (!rc)
255                 rc = rc2;
256 out_pop:
257         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
258         cfs_mutex_unlock(&fsdb->fsdb_mutex);
259         name_destroy(&logname);
260 out_put:
261         llog_ctxt_put(ctxt);
262
263         RETURN(rc);
264 }
265
266 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
267 {
268         struct mgs_tgt_srpc_conf *tgtconf;
269
270         /* free target-specific rules */
271         while (fsdb->fsdb_srpc_tgt) {
272                 tgtconf = fsdb->fsdb_srpc_tgt;
273                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
274
275                 LASSERT(tgtconf->mtsc_tgt);
276
277                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
278                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
279                 OBD_FREE_PTR(tgtconf);
280         }
281
282         /* free general rules */
283         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
284 }
285
286 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
287 {
288         struct fs_db *fsdb;
289         cfs_list_t *tmp;
290
291         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
292                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
293                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
294                         return fsdb;
295         }
296         return NULL;
297 }
298
299 /* caller must hold the mgs->mgs_fs_db_lock */
300 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
301                                   struct mgs_device *mgs, char *fsname)
302 {
303         struct fs_db *fsdb;
304         int rc;
305         ENTRY;
306
307         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
308                 CERROR("fsname %s is too long\n", fsname);
309                 RETURN(NULL);
310         }
311
312         OBD_ALLOC_PTR(fsdb);
313         if (!fsdb)
314                 RETURN(NULL);
315
316         strcpy(fsdb->fsdb_name, fsname);
317         cfs_mutex_init(&fsdb->fsdb_mutex);
318         cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
319         fsdb->fsdb_gen = 1;
320
321         if (strcmp(fsname, MGSSELF_NAME) == 0) {
322                 cfs_set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
323         } else {
324                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
325                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
326                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
327                         CERROR("No memory for index maps\n");
328                         GOTO(err, rc = -ENOMEM);
329                 }
330
331                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
332                 if (rc)
333                         GOTO(err, rc);
334                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
335                 if (rc)
336                         GOTO(err, rc);
337
338                 /* initialise data for NID table */
339                 mgs_ir_init_fs(env, mgs, fsdb);
340
341                 lproc_mgs_add_live(mgs, fsdb);
342         }
343
344         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
345
346         RETURN(fsdb);
347 err:
348         if (fsdb->fsdb_ost_index_map)
349                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
350         if (fsdb->fsdb_mdt_index_map)
351                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
352         name_destroy(&fsdb->fsdb_clilov);
353         name_destroy(&fsdb->fsdb_clilmv);
354         OBD_FREE_PTR(fsdb);
355         RETURN(NULL);
356 }
357
358 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
359 {
360         /* wait for anyone with the sem */
361         cfs_mutex_lock(&fsdb->fsdb_mutex);
362         lproc_mgs_del_live(mgs, fsdb);
363         cfs_list_del(&fsdb->fsdb_list);
364
365         /* deinitialize fsr */
366         mgs_ir_fini_fs(mgs, fsdb);
367
368         if (fsdb->fsdb_ost_index_map)
369                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
370         if (fsdb->fsdb_mdt_index_map)
371                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
372         name_destroy(&fsdb->fsdb_clilov);
373         name_destroy(&fsdb->fsdb_clilmv);
374         mgs_free_fsdb_srpc(fsdb);
375         cfs_mutex_unlock(&fsdb->fsdb_mutex);
376         OBD_FREE_PTR(fsdb);
377 }
378
379 int mgs_init_fsdb_list(struct mgs_device *mgs)
380 {
381         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
382         return 0;
383 }
384
385 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
386 {
387         struct fs_db *fsdb;
388         cfs_list_t *tmp, *tmp2;
389         cfs_mutex_lock(&mgs->mgs_mutex);
390         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
391                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
392                 mgs_free_fsdb(mgs, fsdb);
393         }
394         cfs_mutex_unlock(&mgs->mgs_mutex);
395         return 0;
396 }
397
398 int mgs_find_or_make_fsdb(const struct lu_env *env,
399                           struct mgs_device *mgs, char *name,
400                           struct fs_db **dbh)
401 {
402         struct fs_db *fsdb;
403         int rc = 0;
404
405         cfs_mutex_lock(&mgs->mgs_mutex);
406         fsdb = mgs_find_fsdb(mgs, name);
407         if (fsdb) {
408                 cfs_mutex_unlock(&mgs->mgs_mutex);
409                 *dbh = fsdb;
410                 return 0;
411         }
412
413         CDEBUG(D_MGS, "Creating new db\n");
414         fsdb = mgs_new_fsdb(env, mgs, name);
415         cfs_mutex_unlock(&mgs->mgs_mutex);
416         if (!fsdb)
417                 return -ENOMEM;
418
419         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
420                 /* populate the db from the client llog */
421                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
422                 if (rc) {
423                         CERROR("Can't get db from client log %d\n", rc);
424                         mgs_free_fsdb(mgs, fsdb);
425                         return rc;
426                 }
427         }
428
429         /* populate srpc rules from params llog */
430         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
431         if (rc) {
432                 CERROR("Can't get db from params log %d\n", rc);
433                 mgs_free_fsdb(mgs, fsdb);
434                 return rc;
435         }
436
437         *dbh = fsdb;
438
439         return 0;
440 }
441
442 /* 1 = index in use
443    0 = index unused
444    -1= empty client log */
445 int mgs_check_index(const struct lu_env *env,
446                     struct mgs_device *mgs,
447                     struct mgs_target_info *mti)
448 {
449         struct fs_db *fsdb;
450         void *imap;
451         int rc = 0;
452         ENTRY;
453
454         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
455
456         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
457         if (rc) {
458                 CERROR("Can't get db for %s\n", mti->mti_fsname);
459                 RETURN(rc);
460         }
461
462         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
463                 RETURN(-1);
464
465         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
466                 imap = fsdb->fsdb_ost_index_map;
467         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
468                 imap = fsdb->fsdb_mdt_index_map;
469         else
470                 RETURN(-EINVAL);
471
472         if (cfs_test_bit(mti->mti_stripe_index, imap))
473                 RETURN(1);
474         RETURN(0);
475 }
476
477 static __inline__ int next_index(void *index_map, int map_len)
478 {
479         int i;
480         for (i = 0; i < map_len * 8; i++)
481                  if (!cfs_test_bit(i, index_map)) {
482                          return i;
483                  }
484         CERROR("max index %d exceeded.\n", i);
485         return -1;
486 }
487
488 /* Return codes:
489         0  newly marked as in use
490         <0 err
491         +EALREADY for update of an old index */
492 static int mgs_set_index(const struct lu_env *env,
493                          struct mgs_device *mgs,
494                          struct mgs_target_info *mti)
495 {
496         struct fs_db *fsdb;
497         void *imap;
498         int rc = 0;
499         ENTRY;
500
501         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
502         if (rc) {
503                 CERROR("Can't get db for %s\n", mti->mti_fsname);
504                 RETURN(rc);
505         }
506
507         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
508                 imap = fsdb->fsdb_ost_index_map;
509         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
510                 imap = fsdb->fsdb_mdt_index_map;
511                 if (fsdb->fsdb_mdt_count >= MAX_MDT_COUNT) {
512                         LCONSOLE_ERROR_MSG(0x13f, "The max mdt count"
513                                            "is %d\n", (int)MAX_MDT_COUNT);
514                         RETURN(-ERANGE);
515                 }
516         } else {
517                 RETURN(-EINVAL);
518         }
519
520         if (mti->mti_flags & LDD_F_NEED_INDEX) {
521                 rc = next_index(imap, INDEX_MAP_SIZE);
522                 if (rc == -1)
523                         RETURN(-ERANGE);
524                 mti->mti_stripe_index = rc;
525                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
526                         fsdb->fsdb_mdt_count ++;
527         }
528
529         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
530                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
531                                    "but the max index is %d.\n",
532                                    mti->mti_svname, mti->mti_stripe_index,
533                                    INDEX_MAP_SIZE * 8);
534                 RETURN(-ERANGE);
535         }
536
537         if (cfs_test_bit(mti->mti_stripe_index, imap)) {
538                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
539                     !(mti->mti_flags & LDD_F_WRITECONF)) {
540                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
541                                            "%d, but that index is already in "
542                                            "use. Use --writeconf to force\n",
543                                            mti->mti_svname,
544                                            mti->mti_stripe_index);
545                         RETURN(-EADDRINUSE);
546                 } else {
547                         CDEBUG(D_MGS, "Server %s updating index %d\n",
548                                mti->mti_svname, mti->mti_stripe_index);
549                         RETURN(EALREADY);
550                 }
551         }
552
553         cfs_set_bit(mti->mti_stripe_index, imap);
554         cfs_clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
555         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
556                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
557
558         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
559                mti->mti_stripe_index);
560
561         RETURN(0);
562 }
563
564 struct mgs_modify_lookup {
565         struct cfg_marker mml_marker;
566         int               mml_modified;
567 };
568
569 static int mgs_modify_handler(const struct lu_env *env,
570                               struct llog_handle *llh,
571                               struct llog_rec_hdr *rec, void *data)
572 {
573         struct mgs_modify_lookup *mml = data;
574         struct cfg_marker *marker;
575         struct lustre_cfg *lcfg = (struct lustre_cfg *)(rec + 1);
576         int cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
577                 sizeof(struct llog_rec_tail);
578         int rc;
579         ENTRY;
580
581         if (rec->lrh_type != OBD_CFG_REC) {
582                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
583                 RETURN(-EINVAL);
584         }
585
586         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
587         if (rc) {
588                 CERROR("Insane cfg\n");
589                 RETURN(rc);
590         }
591
592         /* We only care about markers */
593         if (lcfg->lcfg_command != LCFG_MARKER)
594                 RETURN(0);
595
596         marker = lustre_cfg_buf(lcfg, 1);
597         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
598             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
599             !(marker->cm_flags & CM_SKIP)) {
600                 /* Found a non-skipped marker match */
601                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
602                        rec->lrh_index, marker->cm_step,
603                        marker->cm_flags, mml->mml_marker.cm_flags,
604                        marker->cm_tgtname, marker->cm_comment);
605                 /* Overwrite the old marker llog entry */
606                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
607                 marker->cm_flags |= mml->mml_marker.cm_flags;
608                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
609                 /* Header and tail are added back to lrh_len in
610                    llog_lvfs_write_rec */
611                 rec->lrh_len = cfg_len;
612                 rc = llog_write_rec(NULL, llh, rec, NULL, 0, (void *)lcfg,
613                                     rec->lrh_index);
614                 if (!rc)
615                          mml->mml_modified++;
616         }
617
618         RETURN(rc);
619 }
620
621 /* Modify an existing config log record (for CM_SKIP or CM_EXCLUDE) */
622 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
623                       struct fs_db *fsdb, struct mgs_target_info *mti,
624                       char *logname, char *devname, char *comment, int flags)
625 {
626         struct llog_handle *loghandle;
627         struct lvfs_run_ctxt saved;
628         struct llog_ctxt *ctxt;
629         struct mgs_modify_lookup *mml;
630         int rc, rc2;
631         ENTRY;
632
633         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
634                flags);
635
636         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
637         LASSERT(ctxt != NULL);
638         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
639         rc = llog_open(NULL, ctxt, &loghandle, NULL, logname,
640                        LLOG_OPEN_EXISTS);
641         if (rc < 0) {
642                 if (rc == -ENOENT)
643                         rc = 0;
644                 GOTO(out_pop, rc);
645         }
646
647         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
648         if (rc)
649                 GOTO(out_close, rc);
650
651         if (llog_get_size(loghandle) <= 1)
652                 GOTO(out_close, rc = 0);
653
654         OBD_ALLOC_PTR(mml);
655         if (!mml)
656                 GOTO(out_close, rc = -ENOMEM);
657         strcpy(mml->mml_marker.cm_comment, comment);
658         strcpy(mml->mml_marker.cm_tgtname, devname);
659         /* Modify mostly means cancel */
660         mml->mml_marker.cm_flags = flags;
661         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
662         mml->mml_modified = 0;
663         rc = llog_process_or_fork(env, loghandle, mgs_modify_handler,
664                                   (void *)mml, NULL, false);
665         if (!rc && !mml->mml_modified)
666                 rc = 1;
667         OBD_FREE_PTR(mml);
668
669 out_close:
670         rc2 = llog_close(NULL, loghandle);
671         if (!rc)
672                 rc = rc2;
673 out_pop:
674         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
675         if (rc < 0)
676                 CERROR("modify %s/%s failed %d\n",
677                        mti->mti_svname, comment, rc);
678         llog_ctxt_put(ctxt);
679         RETURN(rc);
680 }
681
682 /******************** config log recording functions *********************/
683
684 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
685                          struct lustre_cfg *lcfg)
686 {
687         struct lvfs_run_ctxt   saved;
688         struct llog_rec_hdr    rec;
689         int buflen, rc;
690         struct obd_device *obd = llh->lgh_ctxt->loc_obd;
691
692         if (!lcfg || !llh)
693                 return -ENOMEM;
694
695         LASSERT(llh->lgh_ctxt);
696
697         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
698                                 lcfg->lcfg_buflens);
699         rec.lrh_len = llog_data_len(buflen);
700         rec.lrh_type = OBD_CFG_REC;
701
702         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
703         /* idx = -1 means append */
704         rc = llog_write_rec(NULL, llh, &rec, NULL, 0, (void *)lcfg, -1);
705         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
706         if (rc)
707                 CERROR("failed %d\n", rc);
708         return rc;
709 }
710
711 static int record_base(const struct lu_env *env, struct llog_handle *llh,
712                      char *cfgname, lnet_nid_t nid, int cmd,
713                      char *s1, char *s2, char *s3, char *s4)
714 {
715         struct mgs_thread_info *mgi = mgs_env_info(env);
716         struct lustre_cfg     *lcfg;
717         int rc;
718
719         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
720                cmd, s1, s2, s3, s4);
721
722         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
723         if (s1)
724                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
725         if (s2)
726                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
727         if (s3)
728                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
729         if (s4)
730                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
731
732         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
733         if (!lcfg)
734                 return -ENOMEM;
735         lcfg->lcfg_nid = nid;
736
737         rc = record_lcfg(env, llh, lcfg);
738
739         lustre_cfg_free(lcfg);
740
741         if (rc) {
742                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
743                        cmd, s1, s2, s3, s4);
744         }
745         return rc;
746 }
747
748
749 static inline int record_add_uuid(const struct lu_env *env,
750                                   struct llog_handle *llh,
751                                   uint64_t nid, char *uuid)
752 {
753         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
754
755 }
756
757 static inline int record_add_conn(const struct lu_env *env,
758                                   struct llog_handle *llh,
759                                   char *devname, char *uuid)
760 {
761         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
762 }
763
764 static inline int record_attach(const struct lu_env *env,
765                                 struct llog_handle *llh, char *devname,
766                                 char *type, char *uuid)
767 {
768         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
769 }
770
771 static inline int record_setup(const struct lu_env *env,
772                                struct llog_handle *llh, char *devname,
773                                char *s1, char *s2, char *s3, char *s4)
774 {
775         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
776 }
777
778 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
779                             char *devname, struct lov_desc *desc)
780 {
781         struct mgs_thread_info *mgi = mgs_env_info(env);
782         struct lustre_cfg *lcfg;
783         int rc;
784
785         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
786         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
787         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
788         if (!lcfg)
789                 return -ENOMEM;
790         rc = record_lcfg(env, llh, lcfg);
791
792         lustre_cfg_free(lcfg);
793         return rc;
794 }
795
796 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
797                             char *devname, struct lmv_desc *desc)
798 {
799         struct mgs_thread_info *mgi = mgs_env_info(env);
800         struct lustre_cfg *lcfg;
801         int rc;
802
803         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
804         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
805         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
806
807         rc = record_lcfg(env, llh, lcfg);
808
809         lustre_cfg_free(lcfg);
810         return rc;
811 }
812
813 static inline int record_mdc_add(const struct lu_env *env,
814                                  struct llog_handle *llh,
815                                  char *logname, char *mdcuuid,
816                                  char *mdtuuid, char *index,
817                                  char *gen)
818 {
819         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
820                            mdtuuid,index,gen,mdcuuid);
821 }
822
823 static inline int record_lov_add(const struct lu_env *env,
824                                  struct llog_handle *llh,
825                                  char *lov_name, char *ost_uuid,
826                                  char *index, char *gen)
827 {
828         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
829                            ost_uuid,index,gen,0);
830 }
831
832 static inline int record_mount_opt(const struct lu_env *env,
833                                    struct llog_handle *llh,
834                                    char *profile, char *lov_name,
835                                    char *mdc_name)
836 {
837         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
838                            profile,lov_name,mdc_name,0);
839 }
840
841 static int record_marker(const struct lu_env *env,
842                          struct llog_handle *llh,
843                          struct fs_db *fsdb, __u32 flags,
844                          char *tgtname, char *comment)
845 {
846         struct mgs_thread_info *mgi = mgs_env_info(env);
847         struct lustre_cfg *lcfg;
848         int rc;
849
850         if (flags & CM_START)
851                 fsdb->fsdb_gen++;
852         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
853         mgi->mgi_marker.cm_flags = flags;
854         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
855         strncpy(mgi->mgi_marker.cm_tgtname, tgtname,
856                 sizeof(mgi->mgi_marker.cm_tgtname));
857         strncpy(mgi->mgi_marker.cm_comment, comment,
858                 sizeof(mgi->mgi_marker.cm_comment));
859         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
860         mgi->mgi_marker.cm_canceltime = 0;
861         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
862         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
863                             sizeof(mgi->mgi_marker));
864         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
865         if (!lcfg)
866                 return -ENOMEM;
867         rc = record_lcfg(env, llh, lcfg);
868
869         lustre_cfg_free(lcfg);
870         return rc;
871 }
872
873 static int record_start_log(const struct lu_env *env,
874                             struct mgs_device *mgs,
875                             struct llog_handle **llh, char *name)
876 {
877         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
878         struct lvfs_run_ctxt saved;
879         struct llog_ctxt *ctxt;
880         int rc = 0;
881
882         if (*llh)
883                 GOTO(out, rc = -EBUSY);
884
885         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
886         if (!ctxt)
887                 GOTO(out, rc = -ENODEV);
888         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
889
890         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
891         rc = llog_open_create(NULL, ctxt, llh, NULL, name);
892         if (rc)
893                 GOTO(out_ctxt, rc);
894         rc = llog_init_handle(NULL, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
895         if (rc) {
896                 llog_close(NULL, *llh);
897                 *llh = NULL;
898         }
899 out_ctxt:
900         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
901         llog_ctxt_put(ctxt);
902 out:
903         if (rc)
904                 CERROR("Can't start log %s: %d\n", name, rc);
905         RETURN(rc);
906 }
907
908 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
909 {
910         struct lvfs_run_ctxt saved;
911         struct obd_device *obd = (*llh)->lgh_ctxt->loc_obd;
912         int rc = 0;
913
914         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
915
916         rc = llog_close(NULL, *llh);
917         *llh = NULL;
918
919         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
920         RETURN(rc);
921 }
922
923 static int mgs_log_is_empty(const struct lu_env *env,
924                             struct mgs_device *mgs, char *name)
925 {
926         struct lvfs_run_ctxt saved;
927         struct llog_handle *llh;
928         struct llog_ctxt *ctxt;
929         int rc = 0;
930
931         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
932         LASSERT(ctxt != NULL);
933         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
934         rc = llog_open(NULL, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
935         if (rc < 0) {
936                 if (rc == -ENOENT)
937                         rc = 0;
938                 GOTO(out_ctxt, rc);
939         }
940
941         llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
942         if (rc)
943                 GOTO(out_close, rc);
944         rc = llog_get_size(llh);
945
946 out_close:
947         llog_close(NULL, llh);
948 out_ctxt:
949         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
950         llog_ctxt_put(ctxt);
951         /* header is record 1 */
952         return (rc <= 1);
953 }
954
955 /******************** config "macros" *********************/
956
957 /* write an lcfg directly into a log (with markers) */
958 static int mgs_write_log_direct(const struct lu_env *env,
959                                 struct mgs_device *mgs, struct fs_db *fsdb,
960                                 char *logname, struct lustre_cfg *lcfg,
961                                 char *devname, char *comment)
962 {
963         struct llog_handle *llh = NULL;
964         int rc;
965         ENTRY;
966
967         if (!lcfg)
968                 RETURN(-ENOMEM);
969
970         rc = record_start_log(env, mgs, &llh, logname);
971         if (rc)
972                 RETURN(rc);
973
974         /* FIXME These should be a single journal transaction */
975         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
976         if (rc)
977                 GOTO(out_end, rc);
978         rc = record_lcfg(env, llh, lcfg);
979         if (rc)
980                 GOTO(out_end, rc);
981         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
982         if (rc)
983                 GOTO(out_end, rc);
984 out_end:
985         record_end_log(env, &llh);
986         RETURN(rc);
987 }
988
989 /* write the lcfg in all logs for the given fs */
990 int mgs_write_log_direct_all(const struct lu_env *env,
991                              struct mgs_device *mgs,
992                              struct fs_db *fsdb,
993                              struct mgs_target_info *mti,
994                              struct lustre_cfg *lcfg,
995                              char *devname, char *comment,
996                              int server_only)
997 {
998         cfs_list_t dentry_list;
999         struct l_linux_dirent *dirent, *n;
1000         char *fsname = mti->mti_fsname;
1001         char *logname;
1002         int rc = 0, len = strlen(fsname);
1003         ENTRY;
1004
1005         /* We need to set params for any future logs
1006            as well. FIXME Append this file to every new log.
1007            Actually, we should store as params (text), not llogs.  Or
1008            in a database. */
1009         rc = name_create(&logname, fsname, "-params");
1010         if (rc)
1011                 RETURN(rc);
1012         if (mgs_log_is_empty(env, mgs, logname)) {
1013                 struct llog_handle *llh = NULL;
1014                 rc = record_start_log(env, mgs, &llh, logname);
1015                 record_end_log(env, &llh);
1016         }
1017         name_destroy(&logname);
1018         if (rc)
1019                 RETURN(rc);
1020
1021         /* Find all the logs in the CONFIGS directory */
1022         rc = class_dentry_readdir(env, mgs, &dentry_list);
1023         if (rc) {
1024                 CERROR("Can't read %s dir\n", MOUNT_CONFIGS_DIR);
1025                 RETURN(rc);
1026         }
1027
1028         /* Could use fsdb index maps instead of directory listing */
1029         cfs_list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
1030                 cfs_list_del(&dirent->lld_list);
1031                 /* don't write to sptlrpc rule log */
1032                 if (strstr(dirent->lld_name, "-sptlrpc") != NULL)
1033                         goto next;
1034
1035                 /* caller wants write server logs only */
1036                 if (server_only && strstr(dirent->lld_name, "-client") != NULL)
1037                         goto next;
1038
1039                 if (strncmp(fsname, dirent->lld_name, len) == 0) {
1040                         CDEBUG(D_MGS, "Changing log %s\n", dirent->lld_name);
1041                         /* Erase any old settings of this same parameter */
1042                         mgs_modify(env, mgs, fsdb, mti, dirent->lld_name,
1043                                    devname, comment, CM_SKIP);
1044                         /* Write the new one */
1045                         if (lcfg) {
1046                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1047                                                           dirent->lld_name,
1048                                                           lcfg, devname,
1049                                                           comment);
1050                                 if (rc)
1051                                         CERROR("err %d writing log %s\n", rc,
1052                                                dirent->lld_name);
1053                         }
1054                 }
1055 next:
1056                 OBD_FREE(dirent, sizeof(*dirent));
1057         }
1058
1059         RETURN(rc);
1060 }
1061
1062 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1063                                     struct mgs_device *mgs,
1064                                     struct fs_db *fsdb,
1065                                     struct mgs_target_info *mti,
1066                                     char *logname);
1067 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1068                                     struct mgs_device *mgs,
1069                                     struct fs_db *fsdb,
1070                                     struct mgs_target_info *mti,
1071                                     char *logname, char *suffix, char *lovname,
1072                                     enum lustre_sec_part sec_part, int flags);
1073 static int name_create_mdt_and_lov(char **logname, char **lovname,
1074                                    struct fs_db *fsdb, int i);
1075
1076 static int mgs_steal_llog_handler(const struct lu_env *env,
1077                                   struct llog_handle *llh,
1078                                   struct llog_rec_hdr *rec, void *data)
1079 {
1080         struct mgs_device *mgs;
1081         struct mgs_target_info *mti, *tmti;
1082         struct fs_db *fsdb;
1083         int cfg_len = rec->lrh_len;
1084         char *cfg_buf = (char*) (rec + 1);
1085         struct lustre_cfg *lcfg;
1086         int rc = 0;
1087         struct llog_handle *mdt_llh = NULL;
1088         static int got_an_osc_or_mdc = 0;
1089         /* 0: not found any osc/mdc;
1090            1: found osc;
1091            2: found mdc;
1092         */
1093         static int last_step = -1;
1094
1095         ENTRY;
1096
1097         mti = ((struct temp_comp*)data)->comp_mti;
1098         tmti = ((struct temp_comp*)data)->comp_tmti;
1099         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1100         mgs = ((struct temp_comp*)data)->comp_mgs;
1101
1102         if (rec->lrh_type != OBD_CFG_REC) {
1103                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1104                 RETURN(-EINVAL);
1105         }
1106
1107         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1108         if (rc) {
1109                 CERROR("Insane cfg\n");
1110                 RETURN(rc);
1111         }
1112
1113         lcfg = (struct lustre_cfg *)cfg_buf;
1114
1115         if (lcfg->lcfg_command == LCFG_MARKER) {
1116                 struct cfg_marker *marker;
1117                 marker = lustre_cfg_buf(lcfg, 1);
1118                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1119                     (marker->cm_flags & CM_START)){
1120                         got_an_osc_or_mdc = 1;
1121                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1122                                 sizeof(tmti->mti_svname));
1123                         rc = record_start_log(env, mgs, &mdt_llh,
1124                                               mti->mti_svname);
1125                         if (rc)
1126                                 RETURN(rc);
1127                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1128                                            mti->mti_svname,"add osc(copied)");
1129                         record_end_log(env, &mdt_llh);
1130                         last_step = marker->cm_step;
1131                         RETURN(rc);
1132                 }
1133                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1134                     (marker->cm_flags & CM_END)){
1135                         LASSERT(last_step == marker->cm_step);
1136                         last_step = -1;
1137                         got_an_osc_or_mdc = 0;
1138                         rc = record_start_log(env, mgs, &mdt_llh,
1139                                               mti->mti_svname);
1140                         if (rc)
1141                                 RETURN(rc);
1142                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1143                                            mti->mti_svname,"add osc(copied)");
1144                         record_end_log(env, &mdt_llh);
1145                         RETURN(rc);
1146                 }
1147                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1148                     (marker->cm_flags & CM_START)){
1149                         got_an_osc_or_mdc = 2;
1150                         last_step = marker->cm_step;
1151                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1152                                strlen(marker->cm_tgtname));
1153
1154                         RETURN(rc);
1155                 }
1156                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1157                     (marker->cm_flags & CM_END)){
1158                         LASSERT(last_step == marker->cm_step);
1159                         last_step = -1;
1160                         got_an_osc_or_mdc = 0;
1161                         RETURN(rc);
1162                 }
1163         }
1164
1165         if (got_an_osc_or_mdc == 0 || last_step < 0)
1166                 RETURN(rc);
1167
1168         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1169                 uint64_t nodenid;
1170                 nodenid = lcfg->lcfg_nid;
1171
1172                 tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1173                 tmti->mti_nid_count++;
1174
1175                 RETURN(rc);
1176         }
1177
1178         if (lcfg->lcfg_command == LCFG_SETUP) {
1179                 char *target;
1180
1181                 target = lustre_cfg_string(lcfg, 1);
1182                 memcpy(tmti->mti_uuid, target, strlen(target));
1183                 RETURN(rc);
1184         }
1185
1186         /* ignore client side sptlrpc_conf_log */
1187         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1188                 RETURN(rc);
1189
1190         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1191                 int index;
1192
1193                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1194                         RETURN (-EINVAL);
1195
1196                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1197                        strlen(mti->mti_fsname));
1198                 tmti->mti_stripe_index = index;
1199
1200                 rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, tmti,
1201                                               mti->mti_svname);
1202                 memset(tmti, 0, sizeof(*tmti));
1203                 RETURN(rc);
1204         }
1205
1206         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1207                 int index;
1208                 char mdt_index[9];
1209                 char *logname, *lovname;
1210
1211                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1212                                              mti->mti_stripe_index);
1213                 if (rc)
1214                         RETURN(rc);
1215                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1216
1217                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1218                         name_destroy(&logname);
1219                         name_destroy(&lovname);
1220                         RETURN(-EINVAL);
1221                 }
1222
1223                 tmti->mti_stripe_index = index;
1224                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1225                                          mdt_index, lovname,
1226                                          LUSTRE_SP_MDT, 0);
1227                 name_destroy(&logname);
1228                 name_destroy(&lovname);
1229                 RETURN(rc);
1230         }
1231         RETURN(rc);
1232 }
1233
1234 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1235 /* stealed from mgs_get_fsdb_from_llog*/
1236 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1237                                               struct mgs_device *mgs,
1238                                               char *client_name,
1239                                               struct temp_comp* comp)
1240 {
1241         struct llog_handle *loghandle;
1242         struct lvfs_run_ctxt saved;
1243         struct mgs_target_info *tmti;
1244         struct llog_ctxt *ctxt;
1245         int rc;
1246
1247         ENTRY;
1248
1249         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1250         LASSERT(ctxt != NULL);
1251
1252         OBD_ALLOC_PTR(tmti);
1253         if (tmti == NULL)
1254                 GOTO(out_ctxt, rc = -ENOMEM);
1255
1256         comp->comp_tmti = tmti;
1257         comp->comp_mgs = mgs;
1258
1259         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
1260
1261         rc = llog_open(NULL, ctxt, &loghandle, NULL, client_name,
1262                        LLOG_OPEN_EXISTS);
1263         if (rc < 0) {
1264                 if (rc == -ENOENT)
1265                         rc = 0;
1266                 GOTO(out_pop, rc);
1267         }
1268
1269         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
1270         if (rc)
1271                 GOTO(out_close, rc);
1272
1273         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1274                                   (void *)comp, NULL, false);
1275         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1276 out_close:
1277         llog_close(NULL, loghandle);
1278 out_pop:
1279         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
1280         OBD_FREE_PTR(tmti);
1281 out_ctxt:
1282         llog_ctxt_put(ctxt);
1283         RETURN(rc);
1284 }
1285
1286 /* lmv is the second thing for client logs */
1287 /* copied from mgs_write_log_lov. Please refer to that.  */
1288 static int mgs_write_log_lmv(const struct lu_env *env,
1289                              struct mgs_device *mgs,
1290                              struct fs_db *fsdb,
1291                              struct mgs_target_info *mti,
1292                              char *logname, char *lmvname)
1293 {
1294         struct llog_handle *llh = NULL;
1295         struct lmv_desc *lmvdesc;
1296         char *uuid;
1297         int rc = 0;
1298         ENTRY;
1299
1300         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1301
1302         OBD_ALLOC_PTR(lmvdesc);
1303         if (lmvdesc == NULL)
1304                 RETURN(-ENOMEM);
1305         lmvdesc->ld_active_tgt_count = 0;
1306         lmvdesc->ld_tgt_count = 0;
1307         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1308         uuid = (char *)lmvdesc->ld_uuid.uuid;
1309
1310         rc = record_start_log(env, mgs, &llh, logname);
1311         if (rc)
1312                 GOTO(out_free, rc);
1313         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1314         if (rc)
1315                 GOTO(out_end, rc);
1316         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1317         if (rc)
1318                 GOTO(out_end, rc);
1319         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1320         if (rc)
1321                 GOTO(out_end, rc);
1322         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1323         if (rc)
1324                 GOTO(out_end, rc);
1325 out_end:
1326         record_end_log(env, &llh);
1327 out_free:
1328         OBD_FREE_PTR(lmvdesc);
1329         RETURN(rc);
1330 }
1331
1332 /* lov is the first thing in the mdt and client logs */
1333 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1334                              struct fs_db *fsdb, struct mgs_target_info *mti,
1335                              char *logname, char *lovname)
1336 {
1337         struct llog_handle *llh = NULL;
1338         struct lov_desc *lovdesc;
1339         char *uuid;
1340         int rc = 0;
1341         ENTRY;
1342
1343         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1344
1345         /*
1346         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1347         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1348               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1349         */
1350
1351         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1352         OBD_ALLOC_PTR(lovdesc);
1353         if (lovdesc == NULL)
1354                 RETURN(-ENOMEM);
1355         lovdesc->ld_magic = LOV_DESC_MAGIC;
1356         lovdesc->ld_tgt_count = 0;
1357         /* Defaults.  Can be changed later by lcfg config_param */
1358         lovdesc->ld_default_stripe_count = 1;
1359         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1360         lovdesc->ld_default_stripe_size = 1024 * 1024;
1361         lovdesc->ld_default_stripe_offset = -1;
1362         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1363         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1364         /* can these be the same? */
1365         uuid = (char *)lovdesc->ld_uuid.uuid;
1366
1367         /* This should always be the first entry in a log.
1368         rc = mgs_clear_log(obd, logname); */
1369         rc = record_start_log(env, mgs, &llh, logname);
1370         if (rc)
1371                 GOTO(out_free, rc);
1372         /* FIXME these should be a single journal transaction */
1373         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1374         if (rc)
1375                 GOTO(out_end, rc);
1376         rc = record_attach(env, llh, lovname, "lov", uuid);
1377         if (rc)
1378                 GOTO(out_end, rc);
1379         rc = record_lov_setup(env, llh, lovname, lovdesc);
1380         if (rc)
1381                 GOTO(out_end, rc);
1382         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1383         if (rc)
1384                 GOTO(out_end, rc);
1385         EXIT;
1386 out_end:
1387         record_end_log(env, &llh);
1388 out_free:
1389         OBD_FREE_PTR(lovdesc);
1390         return rc;
1391 }
1392
1393 /* add failnids to open log */
1394 static int mgs_write_log_failnids(const struct lu_env *env,
1395                                   struct mgs_target_info *mti,
1396                                   struct llog_handle *llh,
1397                                   char *cliname)
1398 {
1399         char *failnodeuuid = NULL;
1400         char *ptr = mti->mti_params;
1401         lnet_nid_t nid;
1402         int rc = 0;
1403
1404         /*
1405         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1406         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1407         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1408         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1409         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1410         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1411         */
1412
1413         /* Pull failnid info out of params string */
1414         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1415                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1416                         if (failnodeuuid == NULL) {
1417                                 /* We don't know the failover node name,
1418                                    so just use the first nid as the uuid */
1419                                 rc = name_create(&failnodeuuid,
1420                                                  libcfs_nid2str(nid), "");
1421                                 if (rc)
1422                                         return rc;
1423                         }
1424                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1425                                "client %s\n", libcfs_nid2str(nid),
1426                                failnodeuuid, cliname);
1427                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1428                 }
1429                 if (failnodeuuid)
1430                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1431         }
1432
1433         name_destroy(&failnodeuuid);
1434         return rc;
1435 }
1436
1437 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1438                                     struct mgs_device *mgs,
1439                                     struct fs_db *fsdb,
1440                                     struct mgs_target_info *mti,
1441                                     char *logname, char *lmvname)
1442 {
1443         struct llog_handle *llh = NULL;
1444         char *mdcname = NULL;
1445         char *nodeuuid = NULL;
1446         char *mdcuuid = NULL;
1447         char *lmvuuid = NULL;
1448         char index[6];
1449         int i, rc;
1450         ENTRY;
1451
1452         if (mgs_log_is_empty(env, mgs, logname)) {
1453                 CERROR("log is empty! Logical error\n");
1454                 RETURN(-EINVAL);
1455         }
1456
1457         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1458                mti->mti_svname, logname, lmvname);
1459
1460         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1461         if (rc)
1462                 RETURN(rc);
1463         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1464         if (rc)
1465                 GOTO(out_free, rc);
1466         rc = name_create(&mdcuuid, mdcname, "_UUID");
1467         if (rc)
1468                 GOTO(out_free, rc);
1469         rc = name_create(&lmvuuid, lmvname, "_UUID");
1470         if (rc)
1471                 GOTO(out_free, rc);
1472
1473         rc = record_start_log(env, mgs, &llh, logname);
1474         if (rc)
1475                 GOTO(out_free, rc);
1476         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1477                            "add mdc");
1478         if (rc)
1479                 GOTO(out_end, rc);
1480         for (i = 0; i < mti->mti_nid_count; i++) {
1481                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1482                        libcfs_nid2str(mti->mti_nids[i]));
1483
1484                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1485                 if (rc)
1486                         GOTO(out_end, rc);
1487         }
1488
1489         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1490         if (rc)
1491                 GOTO(out_end, rc);
1492         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1493         if (rc)
1494                 GOTO(out_end, rc);
1495         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1496         if (rc)
1497                 GOTO(out_end, rc);
1498         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1499         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1500                             index, "1");
1501         if (rc)
1502                 GOTO(out_end, rc);
1503         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1504                            "add mdc");
1505         if (rc)
1506                 GOTO(out_end, rc);
1507 out_end:
1508         record_end_log(env, &llh);
1509 out_free:
1510         name_destroy(&lmvuuid);
1511         name_destroy(&mdcuuid);
1512         name_destroy(&mdcname);
1513         name_destroy(&nodeuuid);
1514         RETURN(rc);
1515 }
1516
1517 /* add new mdc to already existent MDS */
1518 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1519                                     struct mgs_device *mgs,
1520                                     struct fs_db *fsdb,
1521                                     struct mgs_target_info *mti,
1522                                     char *logname)
1523 {
1524         struct llog_handle *llh = NULL;
1525         char *nodeuuid = NULL;
1526         char *mdcname = NULL;
1527         char *mdcuuid = NULL;
1528         char *mdtuuid = NULL;
1529         int idx = mti->mti_stripe_index;
1530         char index[9];
1531         int i, rc;
1532
1533         ENTRY;
1534         if (mgs_log_is_empty(env, mgs, logname)) {
1535                 CERROR("log is empty! Logical error\n");
1536                 RETURN (-EINVAL);
1537         }
1538
1539         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
1540
1541         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1542         if (rc)
1543                 RETURN(rc);
1544         snprintf(index, sizeof(index), "-mdc%04x", idx);
1545         rc = name_create(&mdcname, logname, index);
1546         if (rc)
1547                 GOTO(out_free, rc);
1548         rc = name_create(&mdcuuid, mdcname, "_UUID");
1549         if (rc)
1550                 GOTO(out_free, rc);
1551         rc = name_create(&mdtuuid, logname, "_UUID");
1552         if (rc)
1553                 GOTO(out_free, rc);
1554
1555         rc = record_start_log(env, mgs, &llh, logname);
1556         if (rc)
1557                 GOTO(out_free, rc);
1558         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
1559         if (rc)
1560                 GOTO(out_end, rc);
1561         for (i = 0; i < mti->mti_nid_count; i++) {
1562                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1563                        libcfs_nid2str(mti->mti_nids[i]));
1564                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1565                 if (rc)
1566                         GOTO(out_end, rc);
1567         }
1568         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
1569         if (rc)
1570                 GOTO(out_end, rc);
1571         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1572         if (rc)
1573                 GOTO(out_end, rc);
1574         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1575         if (rc)
1576                 GOTO(out_end, rc);
1577         snprintf(index, sizeof(index), "%d", idx);
1578
1579         rc = record_mdc_add(env, llh, logname, mdcuuid, mti->mti_uuid,
1580                             index, "1");
1581         if (rc)
1582                 GOTO(out_end, rc);
1583         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add mdc");
1584         if (rc)
1585                 GOTO(out_end, rc);
1586 out_end:
1587         record_end_log(env, &llh);
1588 out_free:
1589         name_destroy(&mdtuuid);
1590         name_destroy(&mdcuuid);
1591         name_destroy(&mdcname);
1592         name_destroy(&nodeuuid);
1593         RETURN(rc);
1594 }
1595
1596 static int mgs_write_log_mdt0(const struct lu_env *env,
1597                               struct mgs_device *mgs,
1598                               struct fs_db *fsdb,
1599                               struct mgs_target_info *mti)
1600 {
1601         char *log = mti->mti_svname;
1602         struct llog_handle *llh = NULL;
1603         char *uuid, *lovname;
1604         char mdt_index[6];
1605         char *ptr = mti->mti_params;
1606         int rc = 0, failout = 0;
1607         ENTRY;
1608
1609         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
1610         if (uuid == NULL)
1611                 RETURN(-ENOMEM);
1612
1613         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1614                 failout = (strncmp(ptr, "failout", 7) == 0);
1615
1616         rc = name_create(&lovname, log, "-mdtlov");
1617         if (rc)
1618                 GOTO(out_free, rc);
1619         if (mgs_log_is_empty(env, mgs, log)) {
1620                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
1621                 if (rc)
1622                         GOTO(out_lod, rc);
1623         }
1624
1625         sprintf(mdt_index, "%d", mti->mti_stripe_index);
1626
1627         rc = record_start_log(env, mgs, &llh, log);
1628         if (rc)
1629                 GOTO(out_lod, rc);
1630
1631         /* add MDT itself */
1632
1633         /* FIXME this whole fn should be a single journal transaction */
1634         sprintf(uuid, "%s_UUID", log);
1635         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
1636         if (rc)
1637                 GOTO(out_lod, rc);
1638         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
1639         if (rc)
1640                 GOTO(out_end, rc);
1641         rc = record_mount_opt(env, llh, log, lovname, NULL);
1642         if (rc)
1643                 GOTO(out_end, rc);
1644         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
1645                         failout ? "n" : "f");
1646         if (rc)
1647                 GOTO(out_end, rc);
1648         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
1649         if (rc)
1650                 GOTO(out_end, rc);
1651 out_end:
1652         record_end_log(env, &llh);
1653 out_lod:
1654         name_destroy(&lovname);
1655 out_free:
1656         OBD_FREE(uuid, sizeof(struct obd_uuid));
1657         RETURN(rc);
1658 }
1659
1660 static inline int name_create_mdt(char **logname, char *fsname, int i)
1661 {
1662         char mdt_index[9];
1663
1664         sprintf(mdt_index, "-MDT%04x", i);
1665         return name_create(logname, fsname, mdt_index);
1666 }
1667
1668 static int name_create_mdt_and_lov(char **logname, char **lovname,
1669                                     struct fs_db *fsdb, int i)
1670 {
1671         int rc;
1672
1673         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
1674         if (rc)
1675                 return rc;
1676         /* COMPAT_180 */
1677         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1678                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
1679         else
1680                 rc = name_create(lovname, *logname, "-mdtlov");
1681         if (rc) {
1682                 name_destroy(logname);
1683                 *logname = NULL;
1684         }
1685         return rc;
1686 }
1687
1688 static inline int name_create_mdt_osc(char **oscname, char *ostname,
1689                                        struct fs_db *fsdb, int i)
1690 {
1691         char suffix[16];
1692
1693         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1694                 sprintf(suffix, "-osc");
1695         else
1696                 sprintf(suffix, "-osc-MDT%04x", i);
1697         return name_create(oscname, ostname, suffix);
1698 }
1699
1700 /* envelope method for all layers log */
1701 static int mgs_write_log_mdt(const struct lu_env *env,
1702                              struct mgs_device *mgs,
1703                              struct fs_db *fsdb,
1704                              struct mgs_target_info *mti)
1705 {
1706         struct mgs_thread_info *mgi = mgs_env_info(env);
1707         struct llog_handle *llh = NULL;
1708         char *cliname;
1709         int rc, i = 0;
1710         ENTRY;
1711
1712         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
1713
1714         if (mti->mti_uuid[0] == '\0') {
1715                 /* Make up our own uuid */
1716                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1717                          "%s_UUID", mti->mti_svname);
1718         }
1719
1720         /* add mdt */
1721         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
1722         if (rc)
1723                 RETURN(rc);
1724         /* Append the mdt info to the client log */
1725         rc = name_create(&cliname, mti->mti_fsname, "-client");
1726         if (rc)
1727                 RETURN(rc);
1728
1729         if (mgs_log_is_empty(env, mgs, cliname)) {
1730                 /* Start client log */
1731                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
1732                                        fsdb->fsdb_clilov);
1733                 if (rc)
1734                         GOTO(out_free, rc);
1735                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
1736                                        fsdb->fsdb_clilmv);
1737                 if (rc)
1738                         GOTO(out_free, rc);
1739         }
1740
1741         /*
1742         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1743         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
1744         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
1745         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1746         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
1747         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
1748         */
1749
1750                 /* copy client info about lov/lmv */
1751                 mgi->mgi_comp.comp_mti = mti;
1752                 mgi->mgi_comp.comp_fsdb = fsdb;
1753
1754                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
1755                                                         &mgi->mgi_comp);
1756                 if (rc)
1757                         GOTO(out_free, rc);
1758                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
1759                                               fsdb->fsdb_clilmv);
1760                 if (rc)
1761                         GOTO(out_free, rc);
1762
1763                 /* add mountopts */
1764                 rc = record_start_log(env, mgs, &llh, cliname);
1765                 if (rc)
1766                         GOTO(out_free, rc);
1767
1768                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
1769                                    "mount opts");
1770                 if (rc)
1771                         GOTO(out_end, rc);
1772                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
1773                                       fsdb->fsdb_clilmv);
1774                 if (rc)
1775                         GOTO(out_end, rc);
1776                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
1777                                    "mount opts");
1778
1779         if (rc)
1780                 GOTO(out_end, rc);
1781         /* for_all_existing_mdt except current one */
1782         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1783                 char *mdtname;
1784                 if (i !=  mti->mti_stripe_index &&
1785                     cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1786                         rc = name_create_mdt(&mdtname, mti->mti_fsname, i);
1787                         if (rc)
1788                                 GOTO(out_end, rc);
1789                         rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, mti, mdtname);
1790                         name_destroy(&mdtname);
1791                         if (rc)
1792                                 GOTO(out_end, rc);
1793                 }
1794         }
1795 out_end:
1796         record_end_log(env, &llh);
1797 out_free:
1798         name_destroy(&cliname);
1799         RETURN(rc);
1800 }
1801
1802 /* Add the ost info to the client/mdt lov */
1803 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1804                                     struct mgs_device *mgs, struct fs_db *fsdb,
1805                                     struct mgs_target_info *mti,
1806                                     char *logname, char *suffix, char *lovname,
1807                                     enum lustre_sec_part sec_part, int flags)
1808 {
1809         struct llog_handle *llh = NULL;
1810         char *nodeuuid = NULL;
1811         char *oscname = NULL;
1812         char *oscuuid = NULL;
1813         char *lovuuid = NULL;
1814         char *svname = NULL;
1815         char index[6];
1816         int i, rc;
1817
1818         ENTRY;
1819         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
1820                mti->mti_svname, logname);
1821
1822         if (mgs_log_is_empty(env, mgs, logname)) {
1823                 CERROR("log is empty! Logical error\n");
1824                 RETURN (-EINVAL);
1825         }
1826
1827         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1828         if (rc)
1829                 RETURN(rc);
1830         rc = name_create(&svname, mti->mti_svname, "-osc");
1831         if (rc)
1832                 GOTO(out_free, rc);
1833         rc = name_create(&oscname, svname, suffix);
1834         if (rc)
1835                 GOTO(out_free, rc);
1836         rc = name_create(&oscuuid, oscname, "_UUID");
1837         if (rc)
1838                 GOTO(out_free, rc);
1839         rc = name_create(&lovuuid, lovname, "_UUID");
1840         if (rc)
1841                 GOTO(out_free, rc);
1842
1843         /*
1844         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1845         multihomed (#4)
1846         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
1847         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
1848         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
1849         failover (#6,7)
1850         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1851         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
1852         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
1853         */
1854
1855         rc = record_start_log(env, mgs, &llh, logname);
1856         if (rc)
1857                 GOTO(out_free, rc);
1858         /* FIXME these should be a single journal transaction */
1859         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
1860                            "add osc");
1861         if (rc)
1862                 GOTO(out_end, rc);
1863         for (i = 0; i < mti->mti_nid_count; i++) {
1864                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
1865                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1866                 if (rc)
1867                         GOTO(out_end, rc);
1868         }
1869         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
1870         if (rc)
1871                 GOTO(out_end, rc);
1872         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
1873         if (rc)
1874                 GOTO(out_end, rc);
1875         rc = mgs_write_log_failnids(env, mti, llh, oscname);
1876         if (rc)
1877                 GOTO(out_end, rc);
1878         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1879         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
1880         if (rc)
1881                 GOTO(out_end, rc);
1882         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
1883                            "add osc");
1884         if (rc)
1885                 GOTO(out_end, rc);
1886 out_end:
1887         record_end_log(env, &llh);
1888 out_free:
1889         name_destroy(&lovuuid);
1890         name_destroy(&oscuuid);
1891         name_destroy(&oscname);
1892         name_destroy(&svname);
1893         name_destroy(&nodeuuid);
1894         RETURN(rc);
1895 }
1896
1897 static int mgs_write_log_ost(const struct lu_env *env,
1898                              struct mgs_device *mgs, struct fs_db *fsdb,
1899                              struct mgs_target_info *mti)
1900 {
1901         struct llog_handle *llh = NULL;
1902         char *logname, *lovname;
1903         char *ptr = mti->mti_params;
1904         int rc, flags = 0, failout = 0, i;
1905         ENTRY;
1906
1907         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
1908
1909         /* The ost startup log */
1910
1911         /* If the ost log already exists, that means that someone reformatted
1912            the ost and it called target_add again. */
1913         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
1914                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
1915                                    "exists, yet the server claims it never "
1916                                    "registered. It may have been reformatted, "
1917                                    "or the index changed. writeconf the MDT to "
1918                                    "regenerate all logs.\n", mti->mti_svname);
1919                 RETURN(-EALREADY);
1920         }
1921
1922         /*
1923         attach obdfilter ost1 ost1_UUID
1924         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
1925         */
1926         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1927                 failout = (strncmp(ptr, "failout", 7) == 0);
1928         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
1929         if (rc)
1930                 RETURN(rc);
1931         /* FIXME these should be a single journal transaction */
1932         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
1933         if (rc)
1934                 GOTO(out_end, rc);
1935         if (*mti->mti_uuid == '\0')
1936                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1937                          "%s_UUID", mti->mti_svname);
1938         rc = record_attach(env, llh, mti->mti_svname,
1939                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
1940         if (rc)
1941                 GOTO(out_end, rc);
1942         rc = record_setup(env, llh, mti->mti_svname,
1943                           "dev"/*ignored*/, "type"/*ignored*/,
1944                           failout ? "n" : "f", 0/*options*/);
1945         if (rc)
1946                 GOTO(out_end, rc);
1947         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
1948         if (rc)
1949                 GOTO(out_end, rc);
1950 out_end:
1951         record_end_log(env, &llh);
1952         if (rc)
1953                 RETURN(rc);
1954         /* We also have to update the other logs where this osc is part of
1955            the lov */
1956
1957         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
1958                 /* If we're upgrading, the old mdt log already has our
1959                    entry. Let's do a fake one for fun. */
1960                 /* Note that we can't add any new failnids, since we don't
1961                    know the old osc names. */
1962                 flags = CM_SKIP | CM_UPGRADE146;
1963
1964         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
1965                 /* If the update flag isn't set, don't update client/mdt
1966                    logs. */
1967                 flags |= CM_SKIP;
1968                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
1969                               "the MDT first to regenerate it.\n",
1970                               mti->mti_svname);
1971         }
1972
1973         /* Add ost to all MDT lov defs */
1974         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1975                 if (cfs_test_bit(i, fsdb->fsdb_mdt_index_map)) {
1976                         char mdt_index[9];
1977
1978                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1979                                                      i);
1980                         if (rc)
1981                                 RETURN(rc);
1982                         sprintf(mdt_index, "-MDT%04x", i);
1983                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
1984                                                       logname, mdt_index,
1985                                                       lovname, LUSTRE_SP_MDT,
1986                                                       flags);
1987                         name_destroy(&logname);
1988                         name_destroy(&lovname);
1989                         if (rc)
1990                                 RETURN(rc);
1991                 }
1992         }
1993
1994         /* Append ost info to the client log */
1995         rc = name_create(&logname, mti->mti_fsname, "-client");
1996         if (rc)
1997                 RETURN(rc);
1998         if (mgs_log_is_empty(env, mgs, logname)) {
1999                 /* Start client log */
2000                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2001                                        fsdb->fsdb_clilov);
2002                 if (rc)
2003                         GOTO(out_free, rc);
2004                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2005                                        fsdb->fsdb_clilmv);
2006                 if (rc)
2007                         GOTO(out_free, rc);
2008         }
2009         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2010                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2011 out_free:
2012         name_destroy(&logname);
2013         RETURN(rc);
2014 }
2015
2016 static __inline__ int mgs_param_empty(char *ptr)
2017 {
2018         char *tmp;
2019
2020         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2021                 return 1;
2022         return 0;
2023 }
2024
2025 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2026                                           struct mgs_device *mgs,
2027                                           struct fs_db *fsdb,
2028                                           struct mgs_target_info *mti,
2029                                           char *logname, char *cliname)
2030 {
2031         int rc;
2032         struct llog_handle *llh = NULL;
2033
2034         if (mgs_param_empty(mti->mti_params)) {
2035                 /* Remove _all_ failnids */
2036                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2037                                 mti->mti_svname, "add failnid", CM_SKIP);
2038                 return rc < 0 ? rc : 0;
2039         }
2040
2041         /* Otherwise failover nids are additive */
2042         rc = record_start_log(env, mgs, &llh, logname);
2043         if (rc)
2044                 return rc;
2045                 /* FIXME this should be a single journal transaction */
2046         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2047                            "add failnid");
2048         if (rc)
2049                 goto out_end;
2050         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2051         if (rc)
2052                 goto out_end;
2053         rc = record_marker(env, llh, fsdb, CM_END,
2054                            mti->mti_svname, "add failnid");
2055 out_end:
2056         record_end_log(env, &llh);
2057         return rc;
2058 }
2059
2060
2061 /* Add additional failnids to an existing log.
2062    The mdc/osc must have been added to logs first */
2063 /* tcp nids must be in dotted-quad ascii -
2064    we can't resolve hostnames from the kernel. */
2065 static int mgs_write_log_add_failnid(const struct lu_env *env,
2066                                      struct mgs_device *mgs,
2067                                      struct fs_db *fsdb,
2068                                      struct mgs_target_info *mti)
2069 {
2070         char *logname, *cliname;
2071         int rc;
2072         ENTRY;
2073
2074         /* FIXME we currently can't erase the failnids
2075          * given when a target first registers, since they aren't part of
2076          * an "add uuid" stanza */
2077
2078         /* Verify that we know about this target */
2079         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2080                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2081                                    "yet. It must be started before failnids "
2082                                    "can be added.\n", mti->mti_svname);
2083                 RETURN(-ENOENT);
2084         }
2085
2086         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2087         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2088                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2089         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2090                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2091         } else {
2092                 RETURN(-EINVAL);
2093         }
2094         if (rc)
2095                 RETURN(rc);
2096         /* Add failover nids to the client log */
2097         rc = name_create(&logname, mti->mti_fsname, "-client");
2098         if (rc) {
2099                 name_destroy(&cliname);
2100                 RETURN(rc);
2101         }
2102         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2103         name_destroy(&logname);
2104         name_destroy(&cliname);
2105         if (rc)
2106                 RETURN(rc);
2107
2108         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2109                 /* Add OST failover nids to the MDT logs as well */
2110                 int i;
2111
2112                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2113                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2114                                 continue;
2115                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2116                         if (rc)
2117                                 RETURN(rc);
2118                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2119                                                  fsdb, i);
2120                         if (rc) {
2121                                 name_destroy(&logname);
2122                                 RETURN(rc);
2123                         }
2124                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2125                                                             mti, logname,
2126                                                             cliname);
2127                         name_destroy(&cliname);
2128                         name_destroy(&logname);
2129                         if (rc)
2130                                 RETURN(rc);
2131                 }
2132         }
2133
2134         RETURN(rc);
2135 }
2136
2137 static int mgs_wlp_lcfg(const struct lu_env *env,
2138                         struct mgs_device *mgs, struct fs_db *fsdb,
2139                         struct mgs_target_info *mti,
2140                         char *logname, struct lustre_cfg_bufs *bufs,
2141                         char *tgtname, char *ptr)
2142 {
2143         char comment[MTI_NAME_MAXLEN];
2144         char *tmp;
2145         struct lustre_cfg *lcfg;
2146         int rc, del;
2147
2148         /* Erase any old settings of this same parameter */
2149         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2150         comment[MTI_NAME_MAXLEN - 1] = 0;
2151         /* But don't try to match the value. */
2152         if ((tmp = strchr(comment, '=')))
2153             *tmp = 0;
2154         /* FIXME we should skip settings that are the same as old values */
2155         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2156         if (rc < 0)
2157                 return rc;
2158         del = mgs_param_empty(ptr);
2159
2160         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2161                       "Sett" : "Modify", tgtname, comment, logname);
2162         if (del)
2163                 return rc;
2164
2165         lustre_cfg_bufs_reset(bufs, tgtname);
2166         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2167         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2168         if (!lcfg)
2169                 return -ENOMEM;
2170         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2171         lustre_cfg_free(lcfg);
2172         return rc;
2173 }
2174
2175 /* write global variable settings into log */
2176 static int mgs_write_log_sys(const struct lu_env *env,
2177                              struct mgs_device *mgs, struct fs_db *fsdb,
2178                              struct mgs_target_info *mti, char *sys, char *ptr)
2179 {
2180         struct mgs_thread_info *mgi = mgs_env_info(env);
2181         struct lustre_cfg *lcfg;
2182         char *tmp, sep;
2183         int rc, cmd, convert = 1;
2184
2185         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2186                 cmd = LCFG_SET_TIMEOUT;
2187         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2188                 cmd = LCFG_SET_LDLM_TIMEOUT;
2189         /* Check for known params here so we can return error to lctl */
2190         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2191                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2192                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2193                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2194                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2195                 cmd = LCFG_PARAM;
2196         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2197                 convert = 0; /* Don't convert string value to integer */
2198                 cmd = LCFG_PARAM;
2199         } else {
2200                 return -EINVAL;
2201         }
2202
2203         if (mgs_param_empty(ptr))
2204                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2205         else
2206                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2207
2208         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2209         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2210         if (!convert && *tmp != '\0')
2211                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2212         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2213         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2214         /* truncate the comment to the parameter name */
2215         ptr = tmp - 1;
2216         sep = *ptr;
2217         *ptr = '\0';
2218         /* modify all servers and clients */
2219         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2220                                       *tmp == '\0' ? NULL : lcfg,
2221                                       mti->mti_fsname, sys, 0);
2222         if (rc == 0 && *tmp != '\0') {
2223                 switch (cmd) {
2224                 case LCFG_SET_TIMEOUT:
2225                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2226                                 class_process_config(lcfg);
2227                         break;
2228                 case LCFG_SET_LDLM_TIMEOUT:
2229                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2230                                 class_process_config(lcfg);
2231                         break;
2232                 default:
2233                         break;
2234                 }
2235         }
2236         *ptr = sep;
2237         lustre_cfg_free(lcfg);
2238         return rc;
2239 }
2240
2241 /* write quota settings into log */
2242 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2243                                struct fs_db *fsdb, struct mgs_target_info *mti,
2244                                char *quota, char *ptr)
2245 {
2246         struct lustre_cfg_bufs bufs;
2247         struct lustre_cfg *lcfg;
2248         char *tmp;
2249         char sep;
2250         int cmd = LCFG_PARAM;
2251         int rc;
2252
2253         /* support only 'meta' and 'data' pools so far */
2254         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2255             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2256                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2257                        "& quota.ost are)\n", ptr);
2258                 return -EINVAL;
2259         }
2260
2261         if (*tmp == '\0') {
2262                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2263         } else {
2264                 CDEBUG(D_MGS, "global '%s'\n", quota);
2265
2266                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2267                     strcmp(tmp, "none") != 0) {
2268                         CERROR("enable option(%s) isn't supported\n", tmp);
2269                         return -EINVAL;
2270                 }
2271         }
2272
2273         lustre_cfg_bufs_reset(&bufs, NULL);
2274         lustre_cfg_bufs_set_string(&bufs, 1, quota);
2275         lcfg = lustre_cfg_new(cmd, &bufs);
2276         /* truncate the comment to the parameter name */
2277         ptr = tmp - 1;
2278         sep = *ptr;
2279         *ptr = '\0';
2280
2281         /* XXX we duplicated quota enable information in all server
2282          *     config logs, it should be moved to a separate config
2283          *     log once we cleanup the config log for global param. */
2284         /* modify all servers */
2285         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2286                                       *tmp == '\0' ? NULL : lcfg,
2287                                       mti->mti_fsname, quota, 1);
2288         *ptr = sep;
2289         lustre_cfg_free(lcfg);
2290         return rc;
2291 }
2292
2293 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2294                                    struct mgs_device *mgs,
2295                                    struct fs_db *fsdb,
2296                                    struct mgs_target_info *mti,
2297                                    char *param)
2298 {
2299         struct mgs_thread_info *mgi = mgs_env_info(env);
2300         struct llog_handle     *llh = NULL;
2301         char                   *logname;
2302         char                   *comment, *ptr;
2303         struct lustre_cfg      *lcfg;
2304         int                     rc, len;
2305         ENTRY;
2306
2307         /* get comment */
2308         ptr = strchr(param, '=');
2309         LASSERT(ptr);
2310         len = ptr - param;
2311
2312         OBD_ALLOC(comment, len + 1);
2313         if (comment == NULL)
2314                 RETURN(-ENOMEM);
2315         strncpy(comment, param, len);
2316         comment[len] = '\0';
2317
2318         /* prepare lcfg */
2319         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2320         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2321         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2322         if (lcfg == NULL)
2323                 GOTO(out_comment, rc = -ENOMEM);
2324
2325         /* construct log name */
2326         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2327         if (rc)
2328                 GOTO(out_lcfg, rc);
2329
2330         if (mgs_log_is_empty(env, mgs, logname)) {
2331                 rc = record_start_log(env, mgs, &llh, logname);
2332                 if (rc)
2333                         GOTO(out, rc);
2334                 record_end_log(env, &llh);
2335         }
2336
2337         /* obsolete old one */
2338         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2339                         comment, CM_SKIP);
2340         if (rc < 0)
2341                 GOTO(out, rc);
2342         /* write the new one */
2343         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2344                                   mti->mti_svname, comment);
2345         if (rc)
2346                 CERROR("err %d writing log %s\n", rc, logname);
2347 out:
2348         name_destroy(&logname);
2349 out_lcfg:
2350         lustre_cfg_free(lcfg);
2351 out_comment:
2352         OBD_FREE(comment, len + 1);
2353         RETURN(rc);
2354 }
2355
2356 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2357                                         char *param)
2358 {
2359         char    *ptr;
2360
2361         /* disable the adjustable udesc parameter for now, i.e. use default
2362          * setting that client always ship udesc to MDT if possible. to enable
2363          * it simply remove the following line */
2364         goto error_out;
2365
2366         ptr = strchr(param, '=');
2367         if (ptr == NULL)
2368                 goto error_out;
2369         *ptr++ = '\0';
2370
2371         if (strcmp(param, PARAM_SRPC_UDESC))
2372                 goto error_out;
2373
2374         if (strcmp(ptr, "yes") == 0) {
2375                 cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2376                 CWARN("Enable user descriptor shipping from client to MDT\n");
2377         } else if (strcmp(ptr, "no") == 0) {
2378                 cfs_clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2379                 CWARN("Disable user descriptor shipping from client to MDT\n");
2380         } else {
2381                 *(ptr - 1) = '=';
2382                 goto error_out;
2383         }
2384         return 0;
2385
2386 error_out:
2387         CERROR("Invalid param: %s\n", param);
2388         return -EINVAL;
2389 }
2390
2391 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2392                                   const char *svname,
2393                                   char *param)
2394 {
2395         struct sptlrpc_rule      rule;
2396         struct sptlrpc_rule_set *rset;
2397         int                      rc;
2398         ENTRY;
2399
2400         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2401                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2402                 RETURN(-EINVAL);
2403         }
2404
2405         if (strncmp(param, PARAM_SRPC_UDESC,
2406                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2407                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2408         }
2409
2410         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2411                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2412                 RETURN(-EINVAL);
2413         }
2414
2415         param += sizeof(PARAM_SRPC_FLVR) - 1;
2416
2417         rc = sptlrpc_parse_rule(param, &rule);
2418         if (rc)
2419                 RETURN(rc);
2420
2421         /* mgs rules implies must be mgc->mgs */
2422         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2423                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2424                      rule.sr_from != LUSTRE_SP_ANY) ||
2425                     (rule.sr_to != LUSTRE_SP_MGS &&
2426                      rule.sr_to != LUSTRE_SP_ANY))
2427                         RETURN(-EINVAL);
2428         }
2429
2430         /* preapre room for this coming rule. svcname format should be:
2431          * - fsname: general rule
2432          * - fsname-tgtname: target-specific rule
2433          */
2434         if (strchr(svname, '-')) {
2435                 struct mgs_tgt_srpc_conf *tgtconf;
2436                 int                       found = 0;
2437
2438                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2439                      tgtconf = tgtconf->mtsc_next) {
2440                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2441                                 found = 1;
2442                                 break;
2443                         }
2444                 }
2445
2446                 if (!found) {
2447                         int name_len;
2448
2449                         OBD_ALLOC_PTR(tgtconf);
2450                         if (tgtconf == NULL)
2451                                 RETURN(-ENOMEM);
2452
2453                         name_len = strlen(svname);
2454
2455                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
2456                         if (tgtconf->mtsc_tgt == NULL) {
2457                                 OBD_FREE_PTR(tgtconf);
2458                                 RETURN(-ENOMEM);
2459                         }
2460                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
2461
2462                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
2463                         fsdb->fsdb_srpc_tgt = tgtconf;
2464                 }
2465
2466                 rset = &tgtconf->mtsc_rset;
2467         } else {
2468                 rset = &fsdb->fsdb_srpc_gen;
2469         }
2470
2471         rc = sptlrpc_rule_set_merge(rset, &rule);
2472
2473         RETURN(rc);
2474 }
2475
2476 static int mgs_srpc_set_param(const struct lu_env *env,
2477                               struct mgs_device *mgs,
2478                               struct fs_db *fsdb,
2479                               struct mgs_target_info *mti,
2480                               char *param)
2481 {
2482         char                   *copy;
2483         int                     rc, copy_size;
2484         ENTRY;
2485
2486 #ifndef HAVE_GSS
2487         RETURN(-EINVAL);
2488 #endif
2489         /* keep a copy of original param, which could be destroied
2490          * during parsing */
2491         copy_size = strlen(param) + 1;
2492         OBD_ALLOC(copy, copy_size);
2493         if (copy == NULL)
2494                 return -ENOMEM;
2495         memcpy(copy, param, copy_size);
2496
2497         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
2498         if (rc)
2499                 goto out_free;
2500
2501         /* previous steps guaranteed the syntax is correct */
2502         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
2503         if (rc)
2504                 goto out_free;
2505
2506         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2507                 /*
2508                  * for mgs rules, make them effective immediately.
2509                  */
2510                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
2511                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
2512                                                  &fsdb->fsdb_srpc_gen);
2513         }
2514
2515 out_free:
2516         OBD_FREE(copy, copy_size);
2517         RETURN(rc);
2518 }
2519
2520 struct mgs_srpc_read_data {
2521         struct fs_db   *msrd_fsdb;
2522         int             msrd_skip;
2523 };
2524
2525 static int mgs_srpc_read_handler(const struct lu_env *env,
2526                                  struct llog_handle *llh,
2527                                  struct llog_rec_hdr *rec, void *data)
2528 {
2529         struct mgs_srpc_read_data *msrd = data;
2530         struct cfg_marker         *marker;
2531         struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
2532         char                      *svname, *param;
2533         int                        cfg_len, rc;
2534         ENTRY;
2535
2536         if (rec->lrh_type != OBD_CFG_REC) {
2537                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2538                 RETURN(-EINVAL);
2539         }
2540
2541         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
2542                   sizeof(struct llog_rec_tail);
2543
2544         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2545         if (rc) {
2546                 CERROR("Insane cfg\n");
2547                 RETURN(rc);
2548         }
2549
2550         if (lcfg->lcfg_command == LCFG_MARKER) {
2551                 marker = lustre_cfg_buf(lcfg, 1);
2552
2553                 if (marker->cm_flags & CM_START &&
2554                     marker->cm_flags & CM_SKIP)
2555                         msrd->msrd_skip = 1;
2556                 if (marker->cm_flags & CM_END)
2557                         msrd->msrd_skip = 0;
2558
2559                 RETURN(0);
2560         }
2561
2562         if (msrd->msrd_skip)
2563                 RETURN(0);
2564
2565         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
2566                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
2567                 RETURN(0);
2568         }
2569
2570         svname = lustre_cfg_string(lcfg, 0);
2571         if (svname == NULL) {
2572                 CERROR("svname is empty\n");
2573                 RETURN(0);
2574         }
2575
2576         param = lustre_cfg_string(lcfg, 1);
2577         if (param == NULL) {
2578                 CERROR("param is empty\n");
2579                 RETURN(0);
2580         }
2581
2582         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
2583         if (rc)
2584                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
2585
2586         RETURN(0);
2587 }
2588
2589 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
2590                                 struct mgs_device *mgs,
2591                                 struct fs_db *fsdb)
2592 {
2593         struct llog_handle        *llh = NULL;
2594         struct lvfs_run_ctxt       saved;
2595         struct llog_ctxt          *ctxt;
2596         char                      *logname;
2597         struct mgs_srpc_read_data  msrd;
2598         int                        rc;
2599         ENTRY;
2600
2601         /* construct log name */
2602         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
2603         if (rc)
2604                 RETURN(rc);
2605
2606         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2607         LASSERT(ctxt != NULL);
2608
2609         if (mgs_log_is_empty(env, mgs, logname))
2610                 GOTO(out, rc = 0);
2611
2612         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
2613
2614         rc = llog_open(NULL, ctxt, &llh, NULL, logname,
2615                        LLOG_OPEN_EXISTS);
2616         if (rc < 0) {
2617                 if (rc == -ENOENT)
2618                         rc = 0;
2619                 GOTO(out_pop, rc);
2620         }
2621
2622         rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
2623         if (rc)
2624                 GOTO(out_close, rc);
2625
2626         if (llog_get_size(llh) <= 1)
2627                 GOTO(out_close, rc = 0);
2628
2629         msrd.msrd_fsdb = fsdb;
2630         msrd.msrd_skip = 0;
2631
2632         rc = llog_process_or_fork(env, llh, mgs_srpc_read_handler,
2633                                   (void *)&msrd, NULL, false);
2634
2635 out_close:
2636         llog_close(NULL, llh);
2637 out_pop:
2638         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
2639 out:
2640         llog_ctxt_put(ctxt);
2641         name_destroy(&logname);
2642
2643         if (rc)
2644                 CERROR("failed to read sptlrpc config database: %d\n", rc);
2645         RETURN(rc);
2646 }
2647
2648 /* Permanent settings of all parameters by writing into the appropriate
2649  * configuration logs.
2650  * A parameter with null value ("<param>='\0'") means to erase it out of
2651  * the logs.
2652  */
2653 static int mgs_write_log_param(const struct lu_env *env,
2654                                struct mgs_device *mgs, struct fs_db *fsdb,
2655                                struct mgs_target_info *mti, char *ptr)
2656 {
2657         struct mgs_thread_info *mgi = mgs_env_info(env);
2658         char *logname;
2659         char *tmp;
2660         int rc = 0, rc2 = 0;
2661         ENTRY;
2662
2663         /* For various parameter settings, we have to figure out which logs
2664            care about them (e.g. both mdt and client for lov settings) */
2665         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2666
2667         /* The params are stored in MOUNT_DATA_FILE and modified via
2668            tunefs.lustre, or set using lctl conf_param */
2669
2670         /* Processed in lustre_start_mgc */
2671         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
2672                 GOTO(end, rc);
2673
2674         /* Processed in ost/mdt */
2675         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
2676                 GOTO(end, rc);
2677
2678         /* Processed in mgs_write_log_ost */
2679         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
2680                 if (mti->mti_flags & LDD_F_PARAM) {
2681                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
2682                                            "changed with tunefs.lustre"
2683                                            "and --writeconf\n", ptr);
2684                         rc = -EPERM;
2685                 }
2686                 GOTO(end, rc);
2687         }
2688
2689         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
2690                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
2691                 GOTO(end, rc);
2692         }
2693
2694         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
2695                 /* Add a failover nidlist */
2696                 rc = 0;
2697                 /* We already processed failovers params for new
2698                    targets in mgs_write_log_target */
2699                 if (mti->mti_flags & LDD_F_PARAM) {
2700                         CDEBUG(D_MGS, "Adding failnode\n");
2701                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
2702                 }
2703                 GOTO(end, rc);
2704         }
2705
2706         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
2707                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
2708                 GOTO(end, rc);
2709         }
2710
2711         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
2712                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
2713                 GOTO(end, rc);
2714         }
2715
2716         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
2717                 /* active=0 means off, anything else means on */
2718                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
2719                 int i;
2720
2721                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2722                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
2723                                            "be (de)activated.\n",
2724                                            mti->mti_svname);
2725                         GOTO(end, rc = -EINVAL);
2726                 }
2727                 LCONSOLE_WARN("Permanently %sactivating %s\n",
2728                               flag ? "de": "re", mti->mti_svname);
2729                 /* Modify clilov */
2730                 rc = name_create(&logname, mti->mti_fsname, "-client");
2731                 if (rc)
2732                         GOTO(end, rc);
2733                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2734                                 mti->mti_svname, "add osc", flag);
2735                 name_destroy(&logname);
2736                 if (rc)
2737                         goto active_err;
2738                 /* Modify mdtlov */
2739                 /* Add to all MDT logs for CMD */
2740                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2741                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2742                                 continue;
2743                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2744                         if (rc)
2745                                 GOTO(end, rc);
2746                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
2747                                         mti->mti_svname, "add osc", flag);
2748                         name_destroy(&logname);
2749                         if (rc)
2750                                 goto active_err;
2751                 }
2752         active_err:
2753                 if (rc) {
2754                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
2755                                            "log (%d). No permanent "
2756                                            "changes were made to the "
2757                                            "config log.\n",
2758                                            mti->mti_svname, rc);
2759                         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
2760                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
2761                                                    " because the log"
2762                                                    "is in the old 1.4"
2763                                                    "style. Consider "
2764                                                    " --writeconf to "
2765                                                    "update the logs.\n");
2766                         GOTO(end, rc);
2767                 }
2768                 /* Fall through to osc proc for deactivating live OSC
2769                    on running MDT / clients. */
2770         }
2771         /* Below here, let obd's XXX_process_config methods handle it */
2772
2773         /* All lov. in proc */
2774         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
2775                 char *mdtlovname;
2776
2777                 CDEBUG(D_MGS, "lov param %s\n", ptr);
2778                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
2779                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
2780                                            "set on the MDT, not %s. "
2781                                            "Ignoring.\n",
2782                                            mti->mti_svname);
2783                         GOTO(end, rc = 0);
2784                 }
2785
2786                 /* Modify mdtlov */
2787                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2788                         GOTO(end, rc = -ENODEV);
2789
2790                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
2791                                              mti->mti_stripe_index);
2792                 if (rc)
2793                         GOTO(end, rc);
2794                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2795                                   &mgi->mgi_bufs, mdtlovname, ptr);
2796                 name_destroy(&logname);
2797                 name_destroy(&mdtlovname);
2798                 if (rc)
2799                         GOTO(end, rc);
2800
2801                 /* Modify clilov */
2802                 rc = name_create(&logname, mti->mti_fsname, "-client");
2803                 if (rc)
2804                         GOTO(end, rc);
2805                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2806                                   fsdb->fsdb_clilov, ptr);
2807                 name_destroy(&logname);
2808                 GOTO(end, rc);
2809         }
2810
2811         /* All osc., mdc., llite. params in proc */
2812         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
2813             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
2814             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
2815                 char *cname;
2816
2817                 if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2818                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
2819                                            " cannot be modified. Consider"
2820                                            " updating the configuration with"
2821                                            " --writeconf\n",
2822                                            mti->mti_svname);
2823                         GOTO(end, rc = -EINVAL);
2824                 }
2825                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
2826                         rc = name_create(&cname, mti->mti_fsname, "-client");
2827                         /* Add the client type to match the obdname in
2828                            class_config_llog_handler */
2829                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2830                         rc = name_create(&cname, mti->mti_svname, "-mdc");
2831                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2832                         rc = name_create(&cname, mti->mti_svname, "-osc");
2833                 } else {
2834                         GOTO(end, rc = -EINVAL);
2835                 }
2836                 if (rc)
2837                         GOTO(end, rc);
2838
2839                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2840
2841                 /* Modify client */
2842                 rc = name_create(&logname, mti->mti_fsname, "-client");
2843                 if (rc) {
2844                         name_destroy(&cname);
2845                         GOTO(end, rc);
2846                 }
2847                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2848                                   cname, ptr);
2849
2850                 /* osc params affect the MDT as well */
2851                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2852                         int i;
2853
2854                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2855                                 if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2856                                         continue;
2857                                 name_destroy(&cname);
2858                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
2859                                                          fsdb, i);
2860                                 name_destroy(&logname);
2861                                 if (rc)
2862                                         break;
2863                                 rc = name_create_mdt(&logname,
2864                                                      mti->mti_fsname, i);
2865                                 if (rc)
2866                                         break;
2867                                 if (!mgs_log_is_empty(env, mgs, logname)) {
2868                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
2869                                                           mti, logname,
2870                                                           &mgi->mgi_bufs,
2871                                                           cname, ptr);
2872                                         if (rc)
2873                                                 break;
2874                                 }
2875                         }
2876                 }
2877                 name_destroy(&logname);
2878                 name_destroy(&cname);
2879                 GOTO(end, rc);
2880         }
2881
2882         /* All mdt. params in proc */
2883         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
2884                 int i;
2885                 __u32 idx;
2886
2887                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2888                 if (strncmp(mti->mti_svname, mti->mti_fsname,
2889                             MTI_NAME_MAXLEN) == 0)
2890                         /* device is unspecified completely? */
2891                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
2892                 else
2893                         rc = server_name2index(mti->mti_svname, &idx, NULL);
2894                 if (rc < 0)
2895                         goto active_err;
2896                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
2897                         goto active_err;
2898                 if (rc & LDD_F_SV_ALL) {
2899                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2900                                 if (!cfs_test_bit(i,
2901                                                   fsdb->fsdb_mdt_index_map))
2902                                         continue;
2903                                 rc = name_create_mdt(&logname,
2904                                                 mti->mti_fsname, i);
2905                                 if (rc)
2906                                         goto active_err;
2907                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2908                                                   logname, &mgi->mgi_bufs,
2909                                                   logname, ptr);
2910                                 name_destroy(&logname);
2911                                 if (rc)
2912                                         goto active_err;
2913                         }
2914                 } else {
2915                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2916                                           mti->mti_svname, &mgi->mgi_bufs,
2917                                           mti->mti_svname, ptr);
2918                         if (rc)
2919                                 goto active_err;
2920                 }
2921                 GOTO(end, rc);
2922         }
2923
2924         /* All mdd., ost. params in proc */
2925         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
2926             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
2927                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2928                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2929                         GOTO(end, rc = -ENODEV);
2930
2931                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2932                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
2933                 GOTO(end, rc);
2934         }
2935
2936         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
2937         rc2 = -ENOSYS;
2938
2939 end:
2940         if (rc)
2941                 CERROR("err %d on param '%s'\n", rc, ptr);
2942
2943         RETURN(rc ?: rc2);
2944 }
2945
2946 /* Not implementing automatic failover nid addition at this time. */
2947 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
2948                       struct mgs_target_info *mti)
2949 {
2950 #if 0
2951         struct fs_db *fsdb;
2952         int rc;
2953         ENTRY;
2954
2955         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
2956         if (rc)
2957                 RETURN(rc);
2958
2959         if (mgs_log_is_empty(obd, mti->mti_svname))
2960                 /* should never happen */
2961                 RETURN(-ENOENT);
2962
2963         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
2964
2965         /* FIXME We can just check mti->params to see if we're already in
2966            the failover list.  Modify mti->params for rewriting back at
2967            server_register_target(). */
2968
2969         cfs_mutex_lock(&fsdb->fsdb_mutex);
2970         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
2971         cfs_mutex_unlock(&fsdb->fsdb_mutex);
2972
2973         RETURN(rc);
2974 #endif
2975         return 0;
2976 }
2977
2978 int mgs_write_log_target(const struct lu_env *env,
2979                          struct mgs_device *mgs,
2980                          struct mgs_target_info *mti,
2981                          struct fs_db *fsdb)
2982 {
2983         int rc = -EINVAL;
2984         char *buf, *params;
2985         ENTRY;
2986
2987         /* set/check the new target index */
2988         rc = mgs_set_index(env, mgs, mti);
2989         if (rc < 0) {
2990                 CERROR("Can't get index (%d)\n", rc);
2991                 RETURN(rc);
2992         }
2993
2994         if (rc == EALREADY) {
2995                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
2996                               mti->mti_stripe_index, mti->mti_svname);
2997                 /* We would like to mark old log sections as invalid
2998                    and add new log sections in the client and mdt logs.
2999                    But if we add new sections, then live clients will
3000                    get repeat setup instructions for already running
3001                    osc's. So don't update the client/mdt logs. */
3002                 mti->mti_flags &= ~LDD_F_UPDATE;
3003         }
3004
3005         cfs_mutex_lock(&fsdb->fsdb_mutex);
3006
3007         if (mti->mti_flags &
3008             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3009                 /* Generate a log from scratch */
3010                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3011                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3012                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3013                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3014                 } else {
3015                         CERROR("Unknown target type %#x, can't create log for "
3016                                "%s\n", mti->mti_flags, mti->mti_svname);
3017                 }
3018                 if (rc) {
3019                         CERROR("Can't write logs for %s (%d)\n",
3020                                mti->mti_svname, rc);
3021                         GOTO(out_up, rc);
3022                 }
3023         } else {
3024                 /* Just update the params from tunefs in mgs_write_log_params */
3025                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3026                 mti->mti_flags |= LDD_F_PARAM;
3027         }
3028
3029         /* allocate temporary buffer, where class_get_next_param will
3030            make copy of a current  parameter */
3031         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3032         if (buf == NULL)
3033                 GOTO(out_up, rc = -ENOMEM);
3034         params = mti->mti_params;
3035         while (params != NULL) {
3036                 rc = class_get_next_param(&params, buf);
3037                 if (rc) {
3038                         if (rc == 1)
3039                                 /* there is no next parameter, that is
3040                                    not an error */
3041                                 rc = 0;
3042                         break;
3043                 }
3044                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3045                        params, buf);
3046                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3047                 if (rc)
3048                         break;
3049         }
3050
3051         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3052
3053 out_up:
3054         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3055         RETURN(rc);
3056 }
3057
3058 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3059 {
3060         struct lvfs_run_ctxt     saved;
3061         struct llog_ctxt        *ctxt;
3062         int                      rc = 0;
3063         struct obd_device *obd = mgs->mgs_obd;
3064
3065         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
3066         if (ctxt == NULL) {
3067                 CERROR("%s: MGS config context doesn't exist\n",
3068                        obd->obd_name);
3069                 rc = -ENODEV;
3070         } else {
3071                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3072                 rc = llog_erase(NULL, ctxt, NULL, name);
3073                 /* llog may not exist */
3074                 if (rc == -ENOENT)
3075                         rc = 0;
3076                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3077                 llog_ctxt_put(ctxt);
3078         }
3079
3080         if (rc)
3081                 CERROR("%s: failed to clear log %s: %d\n", obd->obd_name,
3082                        name, rc);
3083
3084         return rc;
3085 }
3086
3087 /* erase all logs for the given fs */
3088 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3089 {
3090         struct fs_db *fsdb;
3091         cfs_list_t dentry_list;
3092         struct l_linux_dirent *dirent, *n;
3093         int rc, len = strlen(fsname);
3094         char *suffix;
3095         ENTRY;
3096
3097         /* Find all the logs in the CONFIGS directory */
3098         rc = class_dentry_readdir(env, mgs, &dentry_list);
3099         if (rc) {
3100                 CERROR("Can't read %s dir\n", MOUNT_CONFIGS_DIR);
3101                 RETURN(rc);
3102         }
3103
3104         cfs_mutex_lock(&mgs->mgs_mutex);
3105
3106         /* Delete the fs db */
3107         fsdb = mgs_find_fsdb(mgs, fsname);
3108         if (fsdb)
3109                 mgs_free_fsdb(mgs, fsdb);
3110
3111         cfs_list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
3112                 cfs_list_del(&dirent->lld_list);
3113                 suffix = strrchr(dirent->lld_name, '-');
3114                 if (suffix != NULL) {
3115                         if ((len == suffix - dirent->lld_name) &&
3116                             (strncmp(fsname, dirent->lld_name, len) == 0)) {
3117                                 CDEBUG(D_MGS, "Removing log %s\n",
3118                                        dirent->lld_name);
3119                                 mgs_erase_log(env, mgs, dirent->lld_name);
3120                         }
3121                 }
3122                 OBD_FREE(dirent, sizeof(*dirent));
3123         }
3124
3125         cfs_mutex_unlock(&mgs->mgs_mutex);
3126
3127         RETURN(rc);
3128 }
3129
3130 /* from llog_swab */
3131 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3132 {
3133         int i;
3134         ENTRY;
3135
3136         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3137         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3138
3139         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3140         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3141         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3142         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3143
3144         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3145         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3146                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3147                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3148                                i, lcfg->lcfg_buflens[i],
3149                                lustre_cfg_string(lcfg, i));
3150                 }
3151         EXIT;
3152 }
3153
3154 /* Set a permanent (config log) param for a target or fs
3155  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3156  *             buf1 contains the single parameter
3157  */
3158 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3159                  struct lustre_cfg *lcfg, char *fsname)
3160 {
3161         struct fs_db *fsdb;
3162         struct mgs_target_info *mti;
3163         char *devname, *param;
3164         char *ptr, *tmp;
3165         __u32 index;
3166         int rc = 0;
3167         ENTRY;
3168
3169         print_lustre_cfg(lcfg);
3170
3171         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3172         devname = lustre_cfg_string(lcfg, 0);
3173         param = lustre_cfg_string(lcfg, 1);
3174         if (!devname) {
3175                 /* Assume device name embedded in param:
3176                    lustre-OST0000.osc.max_dirty_mb=32 */
3177                 ptr = strchr(param, '.');
3178                 if (ptr) {
3179                         devname = param;
3180                         *ptr = 0;
3181                         param = ptr + 1;
3182                 }
3183         }
3184         if (!devname) {
3185                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3186                 RETURN(-ENOSYS);
3187         }
3188
3189         /* Extract fsname */
3190         ptr = strrchr(devname, '-');
3191         memset(fsname, 0, MTI_NAME_MAXLEN);
3192         if (ptr && (server_name2index(ptr, &index, NULL) >= 0)) {
3193                 /* param related to llite isn't allowed to set by OST or MDT */
3194                 if (strncmp(param, PARAM_LLITE, sizeof(PARAM_LLITE)) == 0)
3195                         RETURN(-EINVAL);
3196
3197                 strncpy(fsname, devname, ptr - devname);
3198         } else {
3199                 /* assume devname is the fsname */
3200                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3201         }
3202         fsname[MTI_NAME_MAXLEN - 1] = 0;
3203         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3204
3205         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3206         if (rc)
3207                 RETURN(rc);
3208         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3209             cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3210                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3211                        "is '%s'\n", fsname, devname);
3212                 mgs_free_fsdb(mgs, fsdb);
3213                 RETURN(-EINVAL);
3214         }
3215
3216         /* Create a fake mti to hold everything */
3217         OBD_ALLOC_PTR(mti);
3218         if (!mti)
3219                 GOTO(out, rc = -ENOMEM);
3220         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3221         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3222         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3223         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3224         if (rc < 0)
3225                 /* Not a valid server; may be only fsname */
3226                 rc = 0;
3227         else
3228                 /* Strip -osc or -mdc suffix from svname */
3229                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3230                                      mti->mti_svname))
3231                         GOTO(out, rc = -EINVAL);
3232
3233         mti->mti_flags = rc | LDD_F_PARAM;
3234
3235         cfs_mutex_lock(&fsdb->fsdb_mutex);
3236         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3237         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3238
3239         /*
3240          * Revoke lock so everyone updates.  Should be alright if
3241          * someone was already reading while we were updating the logs,
3242          * so we don't really need to hold the lock while we're
3243          * writing (above).
3244          */
3245         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3246 out:
3247         OBD_FREE_PTR(mti);
3248         RETURN(rc);
3249 }
3250
3251 static int mgs_write_log_pool(const struct lu_env *env,
3252                               struct mgs_device *mgs, char *logname,
3253                               struct fs_db *fsdb, char *lovname,
3254                               enum lcfg_command_type cmd,
3255                               char *poolname, char *fsname,
3256                               char *ostname, char *comment)
3257 {
3258         struct llog_handle *llh = NULL;
3259         int rc;
3260
3261         rc = record_start_log(env, mgs, &llh, logname);
3262         if (rc)
3263                 return rc;
3264         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3265         if (rc)
3266                 goto out;
3267         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3268         if (rc)
3269                 goto out;
3270         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3271 out:
3272         record_end_log(env, &llh);
3273         return rc;
3274 }
3275
3276 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3277                  enum lcfg_command_type cmd, char *fsname,
3278                  char *poolname, char *ostname)
3279 {
3280         struct fs_db *fsdb;
3281         char *lovname;
3282         char *logname;
3283         char *label = NULL, *canceled_label = NULL;
3284         int label_sz;
3285         struct mgs_target_info *mti = NULL;
3286         int rc, i;
3287         ENTRY;
3288
3289         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3290         if (rc) {
3291                 CERROR("Can't get db for %s\n", fsname);
3292                 RETURN(rc);
3293         }
3294         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3295                 CERROR("%s is not defined\n", fsname);
3296                 mgs_free_fsdb(mgs, fsdb);
3297                 RETURN(-EINVAL);
3298         }
3299
3300         label_sz = 10 + strlen(fsname) + strlen(poolname);
3301
3302         /* check if ostname match fsname */
3303         if (ostname != NULL) {
3304                 char *ptr;
3305
3306                 ptr = strrchr(ostname, '-');
3307                 if ((ptr == NULL) ||
3308                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3309                         RETURN(-EINVAL);
3310                 label_sz += strlen(ostname);
3311         }
3312
3313         OBD_ALLOC(label, label_sz);
3314         if (label == NULL)
3315                 RETURN(-ENOMEM);
3316
3317         switch(cmd) {
3318         case LCFG_POOL_NEW:
3319                 sprintf(label,
3320                         "new %s.%s", fsname, poolname);
3321                 break;
3322         case LCFG_POOL_ADD:
3323                 sprintf(label,
3324                         "add %s.%s.%s", fsname, poolname, ostname);
3325                 break;
3326         case LCFG_POOL_REM:
3327                 OBD_ALLOC(canceled_label, label_sz);
3328                 if (canceled_label == NULL)
3329                         GOTO(out_label, rc = -ENOMEM);
3330                 sprintf(label,
3331                         "rem %s.%s.%s", fsname, poolname, ostname);
3332                 sprintf(canceled_label,
3333                         "add %s.%s.%s", fsname, poolname, ostname);
3334                 break;
3335         case LCFG_POOL_DEL:
3336                 OBD_ALLOC(canceled_label, label_sz);
3337                 if (canceled_label == NULL)
3338                         GOTO(out_label, rc = -ENOMEM);
3339                 sprintf(label,
3340                         "del %s.%s", fsname, poolname);
3341                 sprintf(canceled_label,
3342                         "new %s.%s", fsname, poolname);
3343                 break;
3344         default:
3345                 break;
3346         }
3347
3348         cfs_mutex_lock(&fsdb->fsdb_mutex);
3349
3350         if (canceled_label != NULL) {
3351                 OBD_ALLOC_PTR(mti);
3352                 if (mti == NULL)
3353                         GOTO(out_cancel, rc = -ENOMEM);
3354         }
3355
3356         /* write pool def to all MDT logs */
3357         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3358                  if (cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3359                         rc = name_create_mdt_and_lov(&logname, &lovname,
3360                                                      fsdb, i);
3361                         if (rc) {
3362                                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3363                                 GOTO(out_mti, rc);
3364                         }
3365                         if (canceled_label != NULL) {
3366                                 strcpy(mti->mti_svname, "lov pool");
3367                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3368                                                 lovname, canceled_label,
3369                                                 CM_SKIP);
3370                         }
3371
3372                         if (rc >= 0)
3373                                 rc = mgs_write_log_pool(env, mgs, logname,
3374                                                         fsdb, lovname, cmd,
3375                                                         fsname, poolname,
3376                                                         ostname, label);
3377                         name_destroy(&logname);
3378                         name_destroy(&lovname);
3379                         if (rc) {
3380                                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3381                                 GOTO(out_mti, rc);
3382                         }
3383                 }
3384         }
3385
3386         rc = name_create(&logname, fsname, "-client");
3387         if (rc) {
3388                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3389                 GOTO(out_mti, rc);
3390         }
3391         if (canceled_label != NULL) {
3392                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3393                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3394                 if (rc < 0) {
3395                         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3396                         name_destroy(&logname);
3397                         GOTO(out_mti, rc);
3398                 }
3399         }
3400
3401         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3402                                 cmd, fsname, poolname, ostname, label);
3403         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3404         name_destroy(&logname);
3405         /* request for update */
3406         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3407
3408         EXIT;
3409 out_mti:
3410         if (mti != NULL)
3411                 OBD_FREE_PTR(mti);
3412 out_cancel:
3413         if (canceled_label != NULL)
3414                 OBD_FREE(canceled_label, label_sz);
3415 out_label:
3416         OBD_FREE(label, label_sz);
3417         return rc;
3418 }
3419
3420 #if 0
3421 /******************** unused *********************/
3422 static int mgs_backup_llog(struct obd_device *obd, char* fsname)
3423 {
3424         struct file *filp, *bak_filp;
3425         struct lvfs_run_ctxt saved;
3426         char *logname, *buf;
3427         loff_t soff = 0 , doff = 0;
3428         int count = 4096, len;
3429         int rc = 0;
3430
3431         OBD_ALLOC(logname, PATH_MAX);
3432         if (logname == NULL)
3433                 return -ENOMEM;
3434
3435         OBD_ALLOC(buf, count);
3436         if (!buf)
3437                 GOTO(out , rc = -ENOMEM);
3438
3439         len = snprintf(logname, PATH_MAX, "%s/%s.bak",
3440                        MOUNT_CONFIGS_DIR, fsname);
3441
3442         if (len >= PATH_MAX - 1) {
3443                 GOTO(out, -ENAMETOOLONG);
3444         }
3445
3446         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3447
3448         bak_filp = l_filp_open(logname, O_RDWR|O_CREAT|O_TRUNC, 0660);
3449         if (IS_ERR(bak_filp)) {
3450                 rc = PTR_ERR(bak_filp);
3451                 CERROR("backup logfile open %s: %d\n", logname, rc);
3452                 GOTO(pop, rc);
3453         }
3454         sprintf(logname, "%s/%s", MOUNT_CONFIGS_DIR, fsname);
3455         filp = l_filp_open(logname, O_RDONLY, 0);
3456         if (IS_ERR(filp)) {
3457                 rc = PTR_ERR(filp);
3458                 CERROR("logfile open %s: %d\n", logname, rc);
3459                 GOTO(close1f, rc);
3460         }
3461
3462         while ((rc = lustre_fread(filp, buf, count, &soff)) > 0) {
3463                 rc = lustre_fwrite(bak_filp, buf, count, &doff);
3464                 break;
3465         }
3466
3467         filp_close(filp, 0);
3468 close1f:
3469         filp_close(bak_filp, 0);
3470 pop:
3471         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3472 out:
3473         if (buf)
3474                 OBD_FREE(buf, count);
3475         OBD_FREE(logname, PATH_MAX);
3476         return rc;
3477 }
3478
3479 #endif