Whamcloud - gitweb
d1a9b2c54bd913a39790db1562af7af5db14217d
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MGS
44 #define D_MGS D_CONFIG
45
46 #ifdef __KERNEL__
47 #include <linux/module.h>
48 #include <linux/pagemap.h>
49 #include <linux/fs.h>
50 #endif
51
52 #include <obd.h>
53 #include <obd_lov.h>
54 #include <obd_class.h>
55 #include <lustre_log.h>
56 #include <obd_ost.h>
57 #include <libcfs/list.h>
58 #include <linux/lvfs.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include <lustre_param.h>
62 #include <lustre_sec.h>
63 #include <lquota.h>
64 #include "mgs_internal.h"
65
66 /********************** Class functions ********************/
67
68 /* Caller must list_del and OBD_FREE each dentry from the list */
69 int class_dentry_readdir(const struct lu_env *env,
70                          struct mgs_device *mgs, cfs_list_t *dentry_list)
71 {
72         /* see mds_cleanup_pending */
73         struct lvfs_run_ctxt saved;
74         struct file *file;
75         struct dentry *dentry;
76         struct vfsmount *mnt;
77         int rc = 0;
78         ENTRY;
79
80         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
81         dentry = dget(mgs->mgs_configs_dir);
82         if (IS_ERR(dentry))
83                 GOTO(out_pop, rc = PTR_ERR(dentry));
84         mnt = mntget(mgs->mgs_vfsmnt);
85         if (IS_ERR(mnt)) {
86                 l_dput(dentry);
87                 GOTO(out_pop, rc = PTR_ERR(mnt));
88         }
89
90         file = ll_dentry_open(dentry, mnt, O_RDONLY, current_cred());
91         if (IS_ERR(file))
92                 /* dentry_open_it() drops the dentry, mnt refs */
93                 GOTO(out_pop, rc = PTR_ERR(file));
94
95         CFS_INIT_LIST_HEAD(dentry_list);
96         rc = l_readdir(file, dentry_list);
97         filp_close(file, 0);
98         /*  filp_close->fput() drops the dentry, mnt refs */
99
100 out_pop:
101         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
102         RETURN(rc);
103 }
104
105 /******************** DB functions *********************/
106
107 static inline int name_create(char **newname, char *prefix, char *suffix)
108 {
109         LASSERT(newname);
110         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
111         if (!*newname)
112                 return -ENOMEM;
113         sprintf(*newname, "%s%s", prefix, suffix);
114         return 0;
115 }
116
117 static inline void name_destroy(char **name)
118 {
119         if (*name)
120                 OBD_FREE(*name, strlen(*name) + 1);
121         *name = NULL;
122 }
123
124 struct mgs_fsdb_handler_data
125 {
126         struct fs_db   *fsdb;
127         __u32           ver;
128 };
129
130 /* from the (client) config log, figure out:
131         1. which ost's/mdt's are configured (by index)
132         2. what the last config step is
133         3. COMPAT_146 lov name
134         4. COMPAT_146 mdt lov name
135         5. COMPAT_146 mdc name
136         6. COMPAT_18 osc name
137 */
138 /* It might be better to have a separate db file, instead of parsing the info
139    out of the client log.  This is slow and potentially error-prone. */
140 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
141                             struct llog_rec_hdr *rec, void *data)
142 {
143         struct mgs_fsdb_handler_data *d = data;
144         struct fs_db *fsdb = d->fsdb;
145         int cfg_len = rec->lrh_len;
146         char *cfg_buf = (char*) (rec + 1);
147         struct lustre_cfg *lcfg;
148         __u32 index;
149         int rc = 0;
150         ENTRY;
151
152         if (rec->lrh_type != OBD_CFG_REC) {
153                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
154                 RETURN(-EINVAL);
155         }
156
157         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
158         if (rc) {
159                 CERROR("Insane cfg\n");
160                 RETURN(rc);
161         }
162
163         lcfg = (struct lustre_cfg *)cfg_buf;
164
165         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
166                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
167
168         /* Figure out ost indicies */
169         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
170         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
171             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
172                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
173                                        NULL, 10);
174                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
175                        lustre_cfg_string(lcfg, 1), index,
176                        lustre_cfg_string(lcfg, 2));
177                 cfs_set_bit(index, fsdb->fsdb_ost_index_map);
178         }
179
180         /* Figure out mdt indicies */
181         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
182         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
183             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
184                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
185                                        &index, NULL);
186                 if (rc != LDD_F_SV_TYPE_MDT) {
187                         CWARN("Unparsable MDC name %s, assuming index 0\n",
188                               lustre_cfg_string(lcfg, 0));
189                         index = 0;
190                 }
191                 rc = 0;
192                 CDEBUG(D_MGS, "MDT index is %u\n", index);
193                 cfs_set_bit(index, fsdb->fsdb_mdt_index_map);
194                 fsdb->fsdb_mdt_count ++;
195         }
196
197         /* COMPAT_146 */
198         /* figure out the old LOV name. fsdb_gen = 0 means old log */
199         /* #01 L attach 0:lov_mdsA 1:lov 2:cdbe9_lov_mdsA_dc8cf7f3bb */
200         if ((fsdb->fsdb_gen == 0) && (lcfg->lcfg_command == LCFG_ATTACH) &&
201             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_LOV_NAME) == 0)) {
202                 cfs_set_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags);
203                 name_destroy(&fsdb->fsdb_clilov);
204                 rc = name_create(&fsdb->fsdb_clilov,
205                                  lustre_cfg_string(lcfg, 0), "");
206                 if (rc)
207                         RETURN(rc);
208                 CDEBUG(D_MGS, "client lov name is %s\n", fsdb->fsdb_clilov);
209         }
210
211         /* figure out the old MDT lov name from the MDT uuid */
212         if ((fsdb->fsdb_gen == 0) && (lcfg->lcfg_command == LCFG_SETUP) &&
213             (strncmp(lustre_cfg_string(lcfg, 0), "MDC_", 4) == 0)) {
214                 char *ptr;
215                 cfs_set_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags);
216                 ptr = strstr(lustre_cfg_string(lcfg, 1), "_UUID");
217                 if (!ptr) {
218                         CERROR("Can't parse MDT uuid %s\n",
219                                lustre_cfg_string(lcfg, 1));
220                         RETURN(-EINVAL);
221                 }
222                 *ptr = '\0';
223                 name_destroy(&fsdb->fsdb_mdtlov);
224                 rc = name_create(&fsdb->fsdb_mdtlov,
225                                  "lov_", lustre_cfg_string(lcfg, 1));
226                 if (rc)
227                         RETURN(rc);
228                 name_destroy(&fsdb->fsdb_mdc);
229                 rc = name_create(&fsdb->fsdb_mdc,
230                                  lustre_cfg_string(lcfg, 0), "");
231                 if (rc)
232                         RETURN(rc);
233                 CDEBUG(D_MGS, "MDT lov name is %s\n", fsdb->fsdb_mdtlov);
234         }
235         /* end COMPAT_146 */
236
237         /*
238          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
239          */
240         if (!cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
241             lcfg->lcfg_command == LCFG_ATTACH &&
242             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
243                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
244                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
245                         CWARN("MDT using 1.8 OSC name scheme\n");
246                         cfs_set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
247                 }
248         }
249
250         if (lcfg->lcfg_command == LCFG_MARKER) {
251                 struct cfg_marker *marker;
252                 marker = lustre_cfg_buf(lcfg, 1);
253
254                 d->ver = marker->cm_vers;
255
256                 /* Keep track of the latest marker step */
257                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
258         }
259
260         RETURN(rc);
261 }
262
263 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
264 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
265                                   struct mgs_device *mgs,
266                                   struct fs_db *fsdb)
267 {
268         char *logname;
269         struct llog_handle *loghandle;
270         struct lvfs_run_ctxt saved;
271         struct llog_ctxt *ctxt;
272         struct mgs_fsdb_handler_data d = { fsdb, 0 };
273         int rc, rc2;
274         ENTRY;
275
276         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
277         LASSERT(ctxt != NULL);
278         name_create(&logname, fsdb->fsdb_name, "-client");
279         cfs_mutex_lock(&fsdb->fsdb_mutex);
280         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
281         rc = llog_open_create(NULL, ctxt, &loghandle, NULL, logname);
282         if (rc)
283                 GOTO(out_pop, rc);
284
285         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
286         if (rc)
287                 GOTO(out_close, rc);
288
289         if (llog_get_size(loghandle) <= 1)
290                 cfs_set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
291
292         rc = llog_process(NULL, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
293         CDEBUG(D_INFO, "get_db = %d\n", rc);
294 out_close:
295         rc2 = llog_close(NULL, loghandle);
296         if (!rc)
297                 rc = rc2;
298 out_pop:
299         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
300         cfs_mutex_unlock(&fsdb->fsdb_mutex);
301         name_destroy(&logname);
302         llog_ctxt_put(ctxt);
303
304         RETURN(rc);
305 }
306
307 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
308 {
309         struct mgs_tgt_srpc_conf *tgtconf;
310
311         /* free target-specific rules */
312         while (fsdb->fsdb_srpc_tgt) {
313                 tgtconf = fsdb->fsdb_srpc_tgt;
314                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
315
316                 LASSERT(tgtconf->mtsc_tgt);
317
318                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
319                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
320                 OBD_FREE_PTR(tgtconf);
321         }
322
323         /* free general rules */
324         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
325 }
326
327 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
328 {
329         struct fs_db *fsdb;
330         cfs_list_t *tmp;
331
332         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
333                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
334                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
335                         return fsdb;
336         }
337         return NULL;
338 }
339
340 /* caller must hold the mgs->mgs_fs_db_lock */
341 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
342                                   struct mgs_device *mgs, char *fsname)
343 {
344         struct fs_db *fsdb;
345         int rc;
346         ENTRY;
347
348         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
349                 CERROR("fsname %s is too long\n", fsname);
350                 RETURN(NULL);
351         }
352
353         OBD_ALLOC_PTR(fsdb);
354         if (!fsdb)
355                 RETURN(NULL);
356
357         strcpy(fsdb->fsdb_name, fsname);
358         cfs_mutex_init(&fsdb->fsdb_mutex);
359         cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
360
361         if (strcmp(fsname, MGSSELF_NAME) == 0) {
362                 cfs_set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
363         } else {
364                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
365                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
366                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
367                         CERROR("No memory for index maps\n");
368                         GOTO(err, 0);
369                 }
370
371                 rc = name_create(&fsdb->fsdb_mdtlov, fsname, "-mdtlov");
372                 if (rc)
373                         GOTO(err, rc);
374                 rc = name_create(&fsdb->fsdb_mdtlmv, fsname, "-mdtlmv");
375                 if (rc)
376                         GOTO(err, rc);
377                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
378                 if (rc)
379                         GOTO(err, rc);
380                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
381                 if (rc)
382                         GOTO(err, rc);
383
384                 /* initialise data for NID table */
385                 mgs_ir_init_fs(env, mgs, fsdb);
386
387                 lproc_mgs_add_live(mgs, fsdb);
388         }
389
390         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
391
392         RETURN(fsdb);
393 err:
394         if (fsdb->fsdb_ost_index_map)
395                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
396         if (fsdb->fsdb_mdt_index_map)
397                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
398         name_destroy(&fsdb->fsdb_clilov);
399         name_destroy(&fsdb->fsdb_clilmv);
400         name_destroy(&fsdb->fsdb_mdtlov);
401         name_destroy(&fsdb->fsdb_mdtlmv);
402         OBD_FREE_PTR(fsdb);
403         RETURN(NULL);
404 }
405
406 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
407 {
408         /* wait for anyone with the sem */
409         cfs_mutex_lock(&fsdb->fsdb_mutex);
410         lproc_mgs_del_live(mgs, fsdb);
411         cfs_list_del(&fsdb->fsdb_list);
412
413         /* deinitialize fsr */
414         mgs_ir_fini_fs(mgs, fsdb);
415
416         if (fsdb->fsdb_ost_index_map)
417                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
418         if (fsdb->fsdb_mdt_index_map)
419                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
420         name_destroy(&fsdb->fsdb_clilov);
421         name_destroy(&fsdb->fsdb_clilmv);
422         name_destroy(&fsdb->fsdb_mdtlov);
423         name_destroy(&fsdb->fsdb_mdtlmv);
424         name_destroy(&fsdb->fsdb_mdc);
425         mgs_free_fsdb_srpc(fsdb);
426         cfs_mutex_unlock(&fsdb->fsdb_mutex);
427         OBD_FREE_PTR(fsdb);
428 }
429
430 int mgs_init_fsdb_list(struct mgs_device *mgs)
431 {
432         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
433         return 0;
434 }
435
436 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
437 {
438         struct fs_db *fsdb;
439         cfs_list_t *tmp, *tmp2;
440         cfs_mutex_lock(&mgs->mgs_mutex);
441         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
442                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
443                 mgs_free_fsdb(mgs, fsdb);
444         }
445         cfs_mutex_unlock(&mgs->mgs_mutex);
446         return 0;
447 }
448
449 int mgs_find_or_make_fsdb(const struct lu_env *env,
450                           struct mgs_device *mgs, char *name,
451                           struct fs_db **dbh)
452 {
453         struct fs_db *fsdb;
454         int rc = 0;
455
456         cfs_mutex_lock(&mgs->mgs_mutex);
457         fsdb = mgs_find_fsdb(mgs, name);
458         if (fsdb) {
459                 cfs_mutex_unlock(&mgs->mgs_mutex);
460                 *dbh = fsdb;
461                 return 0;
462         }
463
464         CDEBUG(D_MGS, "Creating new db\n");
465         fsdb = mgs_new_fsdb(env, mgs, name);
466         cfs_mutex_unlock(&mgs->mgs_mutex);
467         if (!fsdb)
468                 return -ENOMEM;
469
470         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
471                 /* populate the db from the client llog */
472                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
473                 if (rc) {
474                         CERROR("Can't get db from client log %d\n", rc);
475                         mgs_free_fsdb(mgs, fsdb);
476                         return rc;
477                 }
478         }
479
480         /* populate srpc rules from params llog */
481         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
482         if (rc) {
483                 CERROR("Can't get db from params log %d\n", rc);
484                 mgs_free_fsdb(mgs, fsdb);
485                 return rc;
486         }
487
488         *dbh = fsdb;
489
490         return 0;
491 }
492
493 /* 1 = index in use
494    0 = index unused
495    -1= empty client log */
496 int mgs_check_index(const struct lu_env *env,
497                     struct mgs_device *mgs,
498                     struct mgs_target_info *mti)
499 {
500         struct fs_db *fsdb;
501         void *imap;
502         int rc = 0;
503         ENTRY;
504
505         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
506
507         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
508         if (rc) {
509                 CERROR("Can't get db for %s\n", mti->mti_fsname);
510                 RETURN(rc);
511         }
512
513         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
514                 RETURN(-1);
515
516         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
517                 imap = fsdb->fsdb_ost_index_map;
518         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
519                 imap = fsdb->fsdb_mdt_index_map;
520         else
521                 RETURN(-EINVAL);
522
523         if (cfs_test_bit(mti->mti_stripe_index, imap))
524                 RETURN(1);
525         RETURN(0);
526 }
527
528 static __inline__ int next_index(void *index_map, int map_len)
529 {
530         int i;
531         for (i = 0; i < map_len * 8; i++)
532                  if (!cfs_test_bit(i, index_map)) {
533                          return i;
534                  }
535         CERROR("max index %d exceeded.\n", i);
536         return -1;
537 }
538
539 /* Return codes:
540         0  newly marked as in use
541         <0 err
542         +EALREADY for update of an old index */
543 static int mgs_set_index(const struct lu_env *env,
544                          struct mgs_device *mgs,
545                          struct mgs_target_info *mti)
546 {
547         struct fs_db *fsdb;
548         void *imap;
549         int rc = 0;
550         ENTRY;
551
552         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
553         if (rc) {
554                 CERROR("Can't get db for %s\n", mti->mti_fsname);
555                 RETURN(rc);
556         }
557
558         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
559                 imap = fsdb->fsdb_ost_index_map;
560         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
561                 imap = fsdb->fsdb_mdt_index_map;
562                 if (fsdb->fsdb_mdt_count >= MAX_MDT_COUNT) {
563                         LCONSOLE_ERROR_MSG(0x13f, "The max mdt count"
564                                            "is %d\n", (int)MAX_MDT_COUNT);
565                         RETURN(-ERANGE);
566                 }
567         } else {
568                 RETURN(-EINVAL);
569         }
570
571         if (mti->mti_flags & LDD_F_NEED_INDEX) {
572                 rc = next_index(imap, INDEX_MAP_SIZE);
573                 if (rc == -1)
574                         RETURN(-ERANGE);
575                 mti->mti_stripe_index = rc;
576                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
577                         fsdb->fsdb_mdt_count ++;
578         }
579
580         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
581                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
582                                    "but the max index is %d.\n",
583                                    mti->mti_svname, mti->mti_stripe_index,
584                                    INDEX_MAP_SIZE * 8);
585                 RETURN(-ERANGE);
586         }
587
588         if (cfs_test_bit(mti->mti_stripe_index, imap)) {
589                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
590                     !(mti->mti_flags & LDD_F_WRITECONF)) {
591                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
592                                            "%d, but that index is already in "
593                                            "use. Use --writeconf to force\n",
594                                            mti->mti_svname,
595                                            mti->mti_stripe_index);
596                         RETURN(-EADDRINUSE);
597                 } else {
598                         CDEBUG(D_MGS, "Server %s updating index %d\n",
599                                mti->mti_svname, mti->mti_stripe_index);
600                         RETURN(EALREADY);
601                 }
602         }
603
604         cfs_set_bit(mti->mti_stripe_index, imap);
605         cfs_clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
606         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
607                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
608
609         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
610                mti->mti_stripe_index);
611
612         RETURN(0);
613 }
614
615 struct mgs_modify_lookup {
616         struct cfg_marker mml_marker;
617         int               mml_modified;
618 };
619
620 static int mgs_modify_handler(const struct lu_env *env,
621                               struct llog_handle *llh,
622                               struct llog_rec_hdr *rec, void *data)
623 {
624         struct mgs_modify_lookup *mml = data;
625         struct cfg_marker *marker;
626         struct lustre_cfg *lcfg = (struct lustre_cfg *)(rec + 1);
627         int cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
628                 sizeof(struct llog_rec_tail);
629         int rc;
630         ENTRY;
631
632         if (rec->lrh_type != OBD_CFG_REC) {
633                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
634                 RETURN(-EINVAL);
635         }
636
637         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
638         if (rc) {
639                 CERROR("Insane cfg\n");
640                 RETURN(rc);
641         }
642
643         /* We only care about markers */
644         if (lcfg->lcfg_command != LCFG_MARKER)
645                 RETURN(0);
646
647         marker = lustre_cfg_buf(lcfg, 1);
648         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
649             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
650             !(marker->cm_flags & CM_SKIP)) {
651                 /* Found a non-skipped marker match */
652                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
653                        rec->lrh_index, marker->cm_step,
654                        marker->cm_flags, mml->mml_marker.cm_flags,
655                        marker->cm_tgtname, marker->cm_comment);
656                 /* Overwrite the old marker llog entry */
657                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
658                 marker->cm_flags |= mml->mml_marker.cm_flags;
659                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
660                 /* Header and tail are added back to lrh_len in
661                    llog_lvfs_write_rec */
662                 rec->lrh_len = cfg_len;
663                 rc = llog_write_rec(NULL, llh, rec, NULL, 0, (void *)lcfg,
664                                     rec->lrh_index);
665                 if (!rc)
666                          mml->mml_modified++;
667         }
668
669         RETURN(rc);
670 }
671
672 /* Modify an existing config log record (for CM_SKIP or CM_EXCLUDE) */
673 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
674                       struct fs_db *fsdb, struct mgs_target_info *mti,
675                       char *logname, char *devname, char *comment, int flags)
676 {
677         struct llog_handle *loghandle;
678         struct lvfs_run_ctxt saved;
679         struct llog_ctxt *ctxt;
680         struct mgs_modify_lookup *mml;
681         int rc, rc2;
682         ENTRY;
683
684         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
685                flags);
686
687         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
688         LASSERT(ctxt != NULL);
689         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
690         rc = llog_open(NULL, ctxt, &loghandle, NULL, logname,
691                        LLOG_OPEN_EXISTS);
692         if (rc < 0) {
693                 if (rc == -ENOENT)
694                         rc = 0;
695                 GOTO(out_pop, rc);
696         }
697
698         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
699         if (rc)
700                 GOTO(out_close, rc);
701
702         if (llog_get_size(loghandle) <= 1)
703                 GOTO(out_close, rc = 0);
704
705         OBD_ALLOC_PTR(mml);
706         if (!mml)
707                 GOTO(out_close, rc = -ENOMEM);
708         strcpy(mml->mml_marker.cm_comment, comment);
709         strcpy(mml->mml_marker.cm_tgtname, devname);
710         /* Modify mostly means cancel */
711         mml->mml_marker.cm_flags = flags;
712         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
713         mml->mml_modified = 0;
714         rc = llog_process(NULL, loghandle, mgs_modify_handler, (void *)mml,
715                           NULL);
716         if (!rc && !mml->mml_modified)
717                 rc = -ENODEV;
718         OBD_FREE_PTR(mml);
719
720 out_close:
721         rc2 = llog_close(NULL, loghandle);
722         if (!rc)
723                 rc = rc2;
724 out_pop:
725         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
726         if (rc && rc != -ENODEV)
727                 CERROR("modify %s/%s failed %d\n",
728                        mti->mti_svname, comment, rc);
729         llog_ctxt_put(ctxt);
730         RETURN(rc);
731 }
732
733 /******************** config log recording functions *********************/
734
735 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
736                          struct lustre_cfg *lcfg)
737 {
738         struct lvfs_run_ctxt   saved;
739         struct llog_rec_hdr    rec;
740         int buflen, rc;
741         struct obd_device *obd = llh->lgh_ctxt->loc_obd;
742
743         if (!lcfg || !llh)
744                 return -ENOMEM;
745
746         LASSERT(llh->lgh_ctxt);
747
748         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
749                                 lcfg->lcfg_buflens);
750         rec.lrh_len = llog_data_len(buflen);
751         rec.lrh_type = OBD_CFG_REC;
752
753         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
754         /* idx = -1 means append */
755         rc = llog_write_rec(NULL, llh, &rec, NULL, 0, (void *)lcfg, -1);
756         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
757         if (rc)
758                 CERROR("failed %d\n", rc);
759         return rc;
760 }
761
762 static int record_base(const struct lu_env *env, struct llog_handle *llh,
763                      char *cfgname, lnet_nid_t nid, int cmd,
764                      char *s1, char *s2, char *s3, char *s4)
765 {
766         struct lustre_cfg_bufs bufs;
767         struct lustre_cfg     *lcfg;
768         int rc;
769
770         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
771                cmd, s1, s2, s3, s4);
772
773         lustre_cfg_bufs_reset(&bufs, cfgname);
774         if (s1)
775                 lustre_cfg_bufs_set_string(&bufs, 1, s1);
776         if (s2)
777                 lustre_cfg_bufs_set_string(&bufs, 2, s2);
778         if (s3)
779                 lustre_cfg_bufs_set_string(&bufs, 3, s3);
780         if (s4)
781                 lustre_cfg_bufs_set_string(&bufs, 4, s4);
782
783         lcfg = lustre_cfg_new(cmd, &bufs);
784         if (!lcfg)
785                 return -ENOMEM;
786         lcfg->lcfg_nid = nid;
787
788         rc = record_lcfg(env, llh, lcfg);
789
790         lustre_cfg_free(lcfg);
791
792         if (rc) {
793                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
794                        cmd, s1, s2, s3, s4);
795         }
796         return rc;
797 }
798
799
800 static inline int record_add_uuid(const struct lu_env *env,
801                                   struct llog_handle *llh,
802                                   uint64_t nid, char *uuid)
803 {
804         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
805
806 }
807
808 static inline int record_add_conn(const struct lu_env *env,
809                                   struct llog_handle *llh,
810                                   char *devname, char *uuid)
811 {
812         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
813 }
814
815 static inline int record_attach(const struct lu_env *env,
816                                 struct llog_handle *llh, char *devname,
817                                 char *type, char *uuid)
818 {
819         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
820 }
821
822 static inline int record_setup(const struct lu_env *env,
823                                struct llog_handle *llh, char *devname,
824                                char *s1, char *s2, char *s3, char *s4)
825 {
826         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
827 }
828
829 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
830                             char *devname, struct lov_desc *desc)
831 {
832         struct lustre_cfg_bufs bufs;
833         struct lustre_cfg *lcfg;
834         int rc;
835
836         lustre_cfg_bufs_reset(&bufs, devname);
837         lustre_cfg_bufs_set(&bufs, 1, desc, sizeof(*desc));
838         lcfg = lustre_cfg_new(LCFG_SETUP, &bufs);
839         if (!lcfg)
840                 return -ENOMEM;
841         rc = record_lcfg(env, llh, lcfg);
842
843         lustre_cfg_free(lcfg);
844         return rc;
845 }
846
847 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
848                             char *devname, struct lmv_desc *desc)
849 {
850         struct lustre_cfg_bufs bufs;
851         struct lustre_cfg *lcfg;
852         int rc;
853
854         lustre_cfg_bufs_reset(&bufs, devname);
855         lustre_cfg_bufs_set(&bufs, 1, desc, sizeof(*desc));
856         lcfg = lustre_cfg_new(LCFG_SETUP, &bufs);
857
858         rc = record_lcfg(env, llh, lcfg);
859
860         lustre_cfg_free(lcfg);
861         return rc;
862 }
863
864 static inline int record_mdc_add(const struct lu_env *env,
865                                  struct llog_handle *llh,
866                                  char *logname, char *mdcuuid,
867                                  char *mdtuuid, char *index,
868                                  char *gen)
869 {
870         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
871                            mdtuuid,index,gen,mdcuuid);
872 }
873
874 static inline int record_lov_add(const struct lu_env *env,
875                                  struct llog_handle *llh,
876                                  char *lov_name, char *ost_uuid,
877                                  char *index, char *gen)
878 {
879         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
880                            ost_uuid,index,gen,0);
881 }
882
883 static inline int record_mount_opt(const struct lu_env *env,
884                                    struct llog_handle *llh,
885                                    char *profile, char *lov_name,
886                                    char *mdc_name)
887 {
888         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
889                            profile,lov_name,mdc_name,0);
890 }
891
892 static int record_marker(const struct lu_env *env,
893                          struct llog_handle *llh,
894                          struct fs_db *fsdb, __u32 flags,
895                          char *tgtname, char *comment)
896 {
897         struct cfg_marker marker;
898         struct lustre_cfg_bufs bufs;
899         struct lustre_cfg *lcfg;
900         int rc;
901
902         if (flags & CM_START)
903                 fsdb->fsdb_gen++;
904         marker.cm_step = fsdb->fsdb_gen;
905         marker.cm_flags = flags;
906         marker.cm_vers = LUSTRE_VERSION_CODE;
907         strncpy(marker.cm_tgtname, tgtname, sizeof(marker.cm_tgtname));
908         strncpy(marker.cm_comment, comment, sizeof(marker.cm_comment));
909         marker.cm_createtime = cfs_time_current_sec();
910         marker.cm_canceltime = 0;
911         lustre_cfg_bufs_reset(&bufs, NULL);
912         lustre_cfg_bufs_set(&bufs, 1, &marker, sizeof(marker));
913         lcfg = lustre_cfg_new(LCFG_MARKER, &bufs);
914         if (!lcfg)
915                 return -ENOMEM;
916         rc = record_lcfg(env, llh, lcfg);
917
918         lustre_cfg_free(lcfg);
919         return rc;
920 }
921
922 static int record_start_log(const struct lu_env *env,
923                             struct mgs_device *mgs,
924                             struct llog_handle **llh, char *name)
925 {
926         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
927         struct lvfs_run_ctxt saved;
928         struct llog_ctxt *ctxt;
929         int rc = 0;
930
931         if (*llh)
932                 GOTO(out, rc = -EBUSY);
933
934         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
935         if (!ctxt)
936                 GOTO(out, rc = -ENODEV);
937         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
938
939         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
940         rc = llog_open_create(NULL, ctxt, llh, NULL, name);
941         if (rc)
942                 GOTO(out_ctxt, rc);
943         rc = llog_init_handle(NULL, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
944         if (rc) {
945                 llog_close(NULL, *llh);
946                 *llh = NULL;
947         }
948 out_ctxt:
949         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
950         llog_ctxt_put(ctxt);
951 out:
952         if (rc)
953                 CERROR("Can't start log %s: %d\n", name, rc);
954         RETURN(rc);
955 }
956
957 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
958 {
959         struct lvfs_run_ctxt saved;
960         struct obd_device *obd = (*llh)->lgh_ctxt->loc_obd;
961         int rc = 0;
962
963         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
964
965         rc = llog_close(NULL, *llh);
966         *llh = NULL;
967
968         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
969         RETURN(rc);
970 }
971
972 static int mgs_log_is_empty(const struct lu_env *env,
973                             struct mgs_device *mgs, char *name)
974 {
975         struct lvfs_run_ctxt saved;
976         struct llog_handle *llh;
977         struct llog_ctxt *ctxt;
978         int rc = 0;
979
980         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
981         LASSERT(ctxt != NULL);
982         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
983         rc = llog_open(NULL, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
984         if (rc < 0) {
985                 if (rc == -ENOENT)
986                         rc = 0;
987                 GOTO(out_ctxt, rc);
988         }
989
990         llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
991         if (rc)
992                 GOTO(out_close, rc);
993         rc = llog_get_size(llh);
994
995 out_close:
996         llog_close(NULL, llh);
997 out_ctxt:
998         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
999         llog_ctxt_put(ctxt);
1000         /* header is record 1 */
1001         return (rc <= 1);
1002 }
1003
1004 /******************** config "macros" *********************/
1005
1006 /* write an lcfg directly into a log (with markers) */
1007 static int mgs_write_log_direct(const struct lu_env *env,
1008                                 struct mgs_device *mgs, struct fs_db *fsdb,
1009                                 char *logname, struct lustre_cfg *lcfg,
1010                                 char *devname, char *comment)
1011 {
1012         struct llog_handle *llh = NULL;
1013         int rc;
1014         ENTRY;
1015
1016         if (!lcfg)
1017                 RETURN(-ENOMEM);
1018
1019         rc = record_start_log(env, mgs, &llh, logname);
1020         if (rc)
1021                 RETURN(rc);
1022
1023         /* FIXME These should be a single journal transaction */
1024         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1025
1026         rc = record_lcfg(env, llh, lcfg);
1027
1028         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1029         rc = record_end_log(env, &llh);
1030
1031         RETURN(rc);
1032 }
1033
1034 /* write the lcfg in all logs for the given fs */
1035 int mgs_write_log_direct_all(const struct lu_env *env,
1036                              struct mgs_device *mgs,
1037                              struct fs_db *fsdb,
1038                              struct mgs_target_info *mti,
1039                              struct lustre_cfg *lcfg,
1040                              char *devname, char *comment,
1041                              int server_only)
1042 {
1043         cfs_list_t dentry_list;
1044         struct l_linux_dirent *dirent, *n;
1045         char *fsname = mti->mti_fsname;
1046         char *logname;
1047         int rc = 0, len = strlen(fsname);
1048         ENTRY;
1049
1050         /* We need to set params for any future logs
1051            as well. FIXME Append this file to every new log.
1052            Actually, we should store as params (text), not llogs.  Or
1053            in a database. */
1054         name_create(&logname, fsname, "-params");
1055         if (mgs_log_is_empty(env, mgs, logname)) {
1056                 struct llog_handle *llh = NULL;
1057                 rc = record_start_log(env, mgs, &llh, logname);
1058                 record_end_log(env, &llh);
1059         }
1060         name_destroy(&logname);
1061         if (rc)
1062                 RETURN(rc);
1063
1064         /* Find all the logs in the CONFIGS directory */
1065         rc = class_dentry_readdir(env, mgs, &dentry_list);
1066         if (rc) {
1067                 CERROR("Can't read %s dir\n", MOUNT_CONFIGS_DIR);
1068                 RETURN(rc);
1069         }
1070
1071         /* Could use fsdb index maps instead of directory listing */
1072         cfs_list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
1073                 cfs_list_del(&dirent->lld_list);
1074                 /* don't write to sptlrpc rule log */
1075                 if (strstr(dirent->lld_name, "-sptlrpc") != NULL)
1076                         goto next;
1077
1078                 /* caller wants write server logs only */
1079                 if (server_only && strstr(dirent->lld_name, "-client") != NULL)
1080                         goto next;
1081
1082                 if (strncmp(fsname, dirent->lld_name, len) == 0) {
1083                         CDEBUG(D_MGS, "Changing log %s\n", dirent->lld_name);
1084                         /* Erase any old settings of this same parameter */
1085                         mgs_modify(env, mgs, fsdb, mti, dirent->lld_name,
1086                                    devname, comment, CM_SKIP);
1087                         /* Write the new one */
1088                         if (lcfg) {
1089                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1090                                                           dirent->lld_name,
1091                                                           lcfg, devname,
1092                                                           comment);
1093                                 if (rc)
1094                                         CERROR("err %d writing log %s\n", rc,
1095                                                dirent->lld_name);
1096                         }
1097                 }
1098 next:
1099                 OBD_FREE(dirent, sizeof(*dirent));
1100         }
1101
1102         RETURN(rc);
1103 }
1104
1105 struct temp_comp
1106 {
1107         struct mgs_target_info   *comp_tmti;
1108         struct mgs_target_info   *comp_mti;
1109         struct fs_db             *comp_fsdb;
1110         struct mgs_device        *comp_mgs;
1111         const struct lu_env      *comp_env;
1112 };
1113
1114 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1115                                     struct mgs_device *mgs,
1116                                     struct fs_db *fsdb,
1117                                     struct mgs_target_info *mti,
1118                                     char *logname);
1119 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1120                                     struct mgs_device *mgs,
1121                                     struct fs_db *fsdb,
1122                                     struct mgs_target_info *mti,
1123                                     char *logname, char *suffix, char *lovname,
1124                                     enum lustre_sec_part sec_part, int flags);
1125 static void name_create_mdt_and_lov(char **logname, char **lovname,
1126                                     struct fs_db *fsdb, int i);
1127
1128 static int mgs_steal_llog_handler(const struct lu_env *env,
1129                                   struct llog_handle *llh,
1130                                   struct llog_rec_hdr *rec, void *data)
1131 {
1132         struct mgs_device *mgs;
1133         struct mgs_target_info *mti, *tmti;
1134         struct fs_db *fsdb;
1135         int cfg_len = rec->lrh_len;
1136         char *cfg_buf = (char*) (rec + 1);
1137         struct lustre_cfg *lcfg;
1138         int rc = 0;
1139         struct llog_handle *mdt_llh = NULL;
1140         static int got_an_osc_or_mdc = 0;
1141         /* 0: not found any osc/mdc;
1142            1: found osc;
1143            2: found mdc;
1144         */
1145         static int last_step = -1;
1146
1147         ENTRY;
1148
1149         mti = ((struct temp_comp*)data)->comp_mti;
1150         tmti = ((struct temp_comp*)data)->comp_tmti;
1151         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1152         mgs = ((struct temp_comp*)data)->comp_mgs;
1153
1154         if (rec->lrh_type != OBD_CFG_REC) {
1155                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1156                 RETURN(-EINVAL);
1157         }
1158
1159         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1160         if (rc) {
1161                 CERROR("Insane cfg\n");
1162                 RETURN(rc);
1163         }
1164
1165         lcfg = (struct lustre_cfg *)cfg_buf;
1166
1167         if (lcfg->lcfg_command == LCFG_MARKER) {
1168                 struct cfg_marker *marker;
1169                 marker = lustre_cfg_buf(lcfg, 1);
1170                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1171                     (marker->cm_flags & CM_START)){
1172                         got_an_osc_or_mdc = 1;
1173                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1174                                 sizeof(tmti->mti_svname));
1175                         rc = record_start_log(env, mgs, &mdt_llh,
1176                                               mti->mti_svname);
1177                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1178                                            mti->mti_svname,"add osc(copied)");
1179                         rc = record_end_log(env, &mdt_llh);
1180                         last_step = marker->cm_step;
1181                         RETURN(rc);
1182                 }
1183                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1184                     (marker->cm_flags & CM_END)){
1185                         LASSERT(last_step == marker->cm_step);
1186                         last_step = -1;
1187                         got_an_osc_or_mdc = 0;
1188                         rc = record_start_log(env, mgs, &mdt_llh,
1189                                               mti->mti_svname);
1190                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1191                                            mti->mti_svname,"add osc(copied)");
1192                         rc = record_end_log(env, &mdt_llh);
1193                         RETURN(rc);
1194                 }
1195                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1196                     (marker->cm_flags & CM_START)){
1197                         got_an_osc_or_mdc = 2;
1198                         last_step = marker->cm_step;
1199                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1200                                strlen(marker->cm_tgtname));
1201
1202                         RETURN(rc);
1203                 }
1204                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1205                     (marker->cm_flags & CM_END)){
1206                         LASSERT(last_step == marker->cm_step);
1207                         last_step = -1;
1208                         got_an_osc_or_mdc = 0;
1209                         RETURN(rc);
1210                 }
1211         }
1212
1213         if (got_an_osc_or_mdc == 0 || last_step < 0)
1214                 RETURN(rc);
1215
1216         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1217                 uint64_t nodenid;
1218                 nodenid = lcfg->lcfg_nid;
1219
1220                 tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1221                 tmti->mti_nid_count++;
1222
1223                 RETURN(rc);
1224         }
1225
1226         if (lcfg->lcfg_command == LCFG_SETUP) {
1227                 char *target;
1228
1229                 target = lustre_cfg_string(lcfg, 1);
1230                 memcpy(tmti->mti_uuid, target, strlen(target));
1231                 RETURN(rc);
1232         }
1233
1234         /* ignore client side sptlrpc_conf_log */
1235         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1236                 RETURN(rc);
1237
1238         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1239                 int index;
1240
1241                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1242                         RETURN (-EINVAL);
1243
1244                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1245                        strlen(mti->mti_fsname));
1246                 tmti->mti_stripe_index = index;
1247
1248                 mgs_write_log_mdc_to_mdt(env, mgs, fsdb,
1249                                          tmti, mti->mti_svname);
1250                 memset(tmti, 0, sizeof(*tmti));
1251                 RETURN(rc);
1252         }
1253
1254         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1255                 int index;
1256                 char mdt_index[9];
1257                 char *logname, *lovname;
1258
1259                 name_create_mdt_and_lov(&logname, &lovname, fsdb,
1260                                         mti->mti_stripe_index);
1261                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1262
1263                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1264                         name_destroy(&logname);
1265                         name_destroy(&lovname);
1266                         RETURN(-EINVAL);
1267                 }
1268
1269                 tmti->mti_stripe_index = index;
1270                 mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1271                                          mdt_index, lovname,
1272                                          LUSTRE_SP_MDT, 0);
1273                 name_destroy(&logname);
1274                 name_destroy(&lovname);
1275                 RETURN(rc);
1276         }
1277         RETURN(rc);
1278 }
1279
1280 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1281 /* stealed from mgs_get_fsdb_from_llog*/
1282 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1283                                               struct mgs_device *mgs,
1284                                               char *client_name,
1285                                               struct temp_comp* comp)
1286 {
1287         struct llog_handle *loghandle;
1288         struct lvfs_run_ctxt saved;
1289         struct mgs_target_info *tmti;
1290         struct llog_ctxt *ctxt;
1291         int rc;
1292
1293         ENTRY;
1294
1295         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1296         LASSERT(ctxt != NULL);
1297
1298         OBD_ALLOC_PTR(tmti);
1299         if (tmti == NULL)
1300                 GOTO(out_ctxt, rc = -ENOMEM);
1301
1302         comp->comp_tmti = tmti;
1303         comp->comp_mgs = mgs;
1304
1305         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
1306
1307         rc = llog_open(NULL, ctxt, &loghandle, NULL, client_name,
1308                        LLOG_OPEN_EXISTS);
1309         if (rc < 0) {
1310                 if (rc == -ENOENT)
1311                         rc = 0;
1312                 GOTO(out_pop, rc);
1313         }
1314
1315         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
1316         if (rc)
1317                 GOTO(out_close, rc);
1318
1319         rc = llog_process(NULL, loghandle, mgs_steal_llog_handler,
1320                           (void *)comp, NULL);
1321         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1322 out_close:
1323         llog_close(NULL, loghandle);
1324 out_pop:
1325         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
1326         OBD_FREE_PTR(tmti);
1327 out_ctxt:
1328         llog_ctxt_put(ctxt);
1329         RETURN(rc);
1330 }
1331
1332 /* lmv is the second thing for client logs */
1333 /* copied from mgs_write_log_lov. Please refer to that.  */
1334 static int mgs_write_log_lmv(const struct lu_env *env,
1335                              struct mgs_device *mgs,
1336                              struct fs_db *fsdb,
1337                              struct mgs_target_info *mti,
1338                              char *logname, char *lmvname)
1339 {
1340         struct llog_handle *llh = NULL;
1341         struct lmv_desc *lmvdesc;
1342         char *uuid;
1343         int rc = 0;
1344         ENTRY;
1345
1346         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1347
1348         OBD_ALLOC_PTR(lmvdesc);
1349         if (lmvdesc == NULL)
1350                 RETURN(-ENOMEM);
1351         lmvdesc->ld_active_tgt_count = 0;
1352         lmvdesc->ld_tgt_count = 0;
1353         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1354         uuid = (char *)lmvdesc->ld_uuid.uuid;
1355
1356         rc = record_start_log(env, mgs, &llh, logname);
1357         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1358         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1359         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1360         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1361         rc = record_end_log(env, &llh);
1362
1363         OBD_FREE_PTR(lmvdesc);
1364         RETURN(rc);
1365 }
1366
1367 /* lov is the first thing in the mdt and client logs */
1368 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1369                              struct fs_db *fsdb, struct mgs_target_info *mti,
1370                              char *logname, char *lovname)
1371 {
1372         struct llog_handle *llh = NULL;
1373         struct lov_desc *lovdesc;
1374         char *uuid;
1375         int rc = 0;
1376         ENTRY;
1377
1378         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1379
1380         /*
1381         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1382         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1383               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1384         */
1385
1386         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1387         OBD_ALLOC_PTR(lovdesc);
1388         if (lovdesc == NULL)
1389                 RETURN(-ENOMEM);
1390         lovdesc->ld_magic = LOV_DESC_MAGIC;
1391         lovdesc->ld_tgt_count = 0;
1392         /* Defaults.  Can be changed later by lcfg config_param */
1393         lovdesc->ld_default_stripe_count = 1;
1394         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1395         lovdesc->ld_default_stripe_size = 1024 * 1024;
1396         lovdesc->ld_default_stripe_offset = -1;
1397         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1398         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1399         /* can these be the same? */
1400         uuid = (char *)lovdesc->ld_uuid.uuid;
1401
1402         /* This should always be the first entry in a log.
1403         rc = mgs_clear_log(obd, logname); */
1404         rc = record_start_log(env, mgs, &llh, logname);
1405         if (rc)
1406                 GOTO(out, rc);
1407         /* FIXME these should be a single journal transaction */
1408         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1409         rc = record_attach(env, llh, lovname, "lov", uuid);
1410         rc = record_lov_setup(env, llh, lovname, lovdesc);
1411         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1412         rc = record_end_log(env, &llh);
1413
1414         EXIT;
1415 out:
1416         OBD_FREE_PTR(lovdesc);
1417         return rc;
1418 }
1419
1420 /* add failnids to open log */
1421 static int mgs_write_log_failnids(const struct lu_env *env,
1422                                   struct mgs_target_info *mti,
1423                                   struct llog_handle *llh,
1424                                   char *cliname)
1425 {
1426         char *failnodeuuid = NULL;
1427         char *ptr = mti->mti_params;
1428         lnet_nid_t nid;
1429         int rc = 0;
1430
1431         /*
1432         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1433         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1434         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1435         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1436         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1437         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1438         */
1439
1440         /* Pull failnid info out of params string */
1441         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1442                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1443                         if (failnodeuuid == NULL) {
1444                                 /* We don't know the failover node name,
1445                                    so just use the first nid as the uuid */
1446                                 rc = name_create(&failnodeuuid,
1447                                                  libcfs_nid2str(nid), "");
1448                                 if (rc)
1449                                         return rc;
1450                         }
1451                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1452                                "client %s\n", libcfs_nid2str(nid),
1453                                failnodeuuid, cliname);
1454                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1455                 }
1456                 if (failnodeuuid) {
1457                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1458                         name_destroy(&failnodeuuid);
1459                         failnodeuuid = NULL;
1460                 }
1461         }
1462
1463         return rc;
1464 }
1465
1466 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1467                                     struct mgs_device *mgs,
1468                                     struct fs_db *fsdb,
1469                                     struct mgs_target_info *mti,
1470                                     char *logname, char *lmvname)
1471 {
1472         struct llog_handle *llh = NULL;
1473         char *mdcname, *nodeuuid, *mdcuuid, *lmvuuid;
1474         char index[6];
1475         int i, rc;
1476         ENTRY;
1477
1478         if (mgs_log_is_empty(env, mgs, logname)) {
1479                 CERROR("log is empty! Logical error\n");
1480                 RETURN(-EINVAL);
1481         }
1482
1483         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1484                mti->mti_svname, logname, lmvname);
1485
1486         name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1487         name_create(&mdcname, mti->mti_svname, "-mdc");
1488         name_create(&mdcuuid, mdcname, "_UUID");
1489         name_create(&lmvuuid, lmvname, "_UUID");
1490
1491         rc = record_start_log(env, mgs, &llh, logname);
1492         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1493                            "add mdc");
1494
1495         for (i = 0; i < mti->mti_nid_count; i++) {
1496                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1497                        libcfs_nid2str(mti->mti_nids[i]));
1498
1499                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1500         }
1501
1502         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1503         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1504         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1505         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1506         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1507                             index, "1");
1508         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1509                            "add mdc");
1510         rc = record_end_log(env, &llh);
1511
1512         name_destroy(&lmvuuid);
1513         name_destroy(&mdcuuid);
1514         name_destroy(&mdcname);
1515         name_destroy(&nodeuuid);
1516         RETURN(rc);
1517 }
1518
1519 /* add new mdc to already existent MDS */
1520 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1521                                     struct mgs_device *mgs,
1522                                     struct fs_db *fsdb,
1523                                     struct mgs_target_info *mti,
1524                                     char *logname)
1525 {
1526         struct llog_handle *llh = NULL;
1527         char *nodeuuid, *mdcname, *mdcuuid, *mdtuuid;
1528         int idx = mti->mti_stripe_index;
1529         char index[9];
1530         int i, rc;
1531
1532         ENTRY;
1533         if (mgs_log_is_empty(env, mgs, logname)) {
1534                 CERROR("log is empty! Logical error\n");
1535                 RETURN (-EINVAL);
1536         }
1537
1538         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
1539
1540         name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1541         snprintf(index, sizeof(index), "-mdc%04x", idx);
1542         name_create(&mdcname, logname, index);
1543         name_create(&mdcuuid, mdcname, "_UUID");
1544         name_create(&mdtuuid, logname, "_UUID");
1545
1546         rc = record_start_log(env, mgs, &llh, logname);
1547         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
1548         for (i = 0; i < mti->mti_nid_count; i++) {
1549                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1550                        libcfs_nid2str(mti->mti_nids[i]));
1551                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1552         }
1553         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
1554         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1555         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1556         snprintf(index, sizeof(index), "%d", idx);
1557
1558         rc = record_mdc_add(env, llh, logname, mdcuuid, mti->mti_uuid,
1559                             index, "1");
1560         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add mdc");
1561         rc = record_end_log(env, &llh);
1562
1563         name_destroy(&mdcuuid);
1564         name_destroy(&mdcname);
1565         name_destroy(&nodeuuid);
1566         name_destroy(&mdtuuid);
1567         RETURN(rc);
1568 }
1569
1570 static int mgs_write_log_mdt0(const struct lu_env *env,
1571                               struct mgs_device *mgs,
1572                               struct fs_db *fsdb,
1573                               struct mgs_target_info *mti)
1574 {
1575         char *log = mti->mti_svname;
1576         struct llog_handle *llh = NULL;
1577         char *uuid, *lovname;
1578         char mdt_index[6];
1579         char *ptr = mti->mti_params;
1580         int rc = 0, failout = 0;
1581         ENTRY;
1582
1583         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
1584         if (uuid == NULL)
1585                 RETURN(-ENOMEM);
1586
1587         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1588                 failout = (strncmp(ptr, "failout", 7) == 0);
1589
1590         name_create(&lovname, log, "-mdtlov");
1591         if (mgs_log_is_empty(env, mgs, log))
1592                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
1593
1594         sprintf(uuid, "%s_UUID", log);
1595         sprintf(mdt_index, "%d", mti->mti_stripe_index);
1596
1597         /* add MDT itself */
1598         rc = record_start_log(env, mgs, &llh, log);
1599         if (rc)
1600                 GOTO(out, rc);
1601
1602         /* FIXME this whole fn should be a single journal transaction */
1603         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
1604         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
1605         rc = record_mount_opt(env, llh, log, lovname, NULL);
1606         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
1607                         failout ? "n" : "f");
1608         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
1609         rc = record_end_log(env, &llh);
1610 out:
1611         name_destroy(&lovname);
1612         OBD_FREE(uuid, sizeof(struct obd_uuid));
1613         RETURN(rc);
1614 }
1615
1616 static inline void name_create_mdt(char **logname, char *fsname, int i)
1617 {
1618         char mdt_index[9];
1619
1620         sprintf(mdt_index, "-MDT%04x", i);
1621         name_create(logname, fsname, mdt_index);
1622 }
1623
1624 static void name_create_mdt_and_lov(char **logname, char **lovname,
1625                                     struct fs_db *fsdb, int i)
1626 {
1627         name_create_mdt(logname, fsdb->fsdb_name, i);
1628         /* COMPAT_180 */
1629         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1630                 name_create(lovname, fsdb->fsdb_name, "-mdtlov");
1631         else
1632                 name_create(lovname, *logname, "-mdtlov");
1633 }
1634
1635 static inline void name_create_mdt_osc(char **oscname, char *ostname,
1636                                        struct fs_db *fsdb, int i)
1637 {
1638         char suffix[16];
1639
1640         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1641                 sprintf(suffix, "-osc");
1642         else
1643                 sprintf(suffix, "-osc-MDT%04x", i);
1644         name_create(oscname, ostname, suffix);
1645 }
1646
1647 /* envelope method for all layers log */
1648 static int mgs_write_log_mdt(const struct lu_env *env,
1649                              struct mgs_device *mgs,
1650                              struct fs_db *fsdb,
1651                              struct mgs_target_info *mti)
1652 {
1653         struct llog_handle *llh = NULL;
1654         char *cliname;
1655         struct temp_comp comp = { 0 };
1656         int rc, i = 0;
1657         ENTRY;
1658
1659         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
1660
1661 #if 0
1662         /* COMPAT_146 */
1663         if (mti->mti_flags & LDD_F_UPGRADE14) {
1664                 /* We're starting with an old uuid.  Assume old name for lov
1665                    as well since the lov entry already exists in the log. */
1666                 CDEBUG(D_MGS, "old mds uuid %s\n", mti->mti_uuid);
1667                 if (strncmp(mti->mti_uuid, fsdb->fsdb_mdtlov + 4,
1668                             strlen(fsdb->fsdb_mdtlov) - 4) != 0) {
1669                         CERROR("old mds uuid %s doesn't match log %s (%s)\n",
1670                                mti->mti_uuid, fsdb->fsdb_mdtlov,
1671                                fsdb->fsdb_mdtlov + 4);
1672                         RETURN(-EINVAL);
1673                 }
1674         }
1675         /* end COMPAT_146 */
1676 #endif
1677         if (mti->mti_uuid[0] == '\0') {
1678                 /* Make up our own uuid */
1679                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1680                          "%s_UUID", mti->mti_svname);
1681         }
1682
1683         /* add mdt */
1684         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
1685
1686         /* Append the mdt info to the client log */
1687         name_create(&cliname, mti->mti_fsname, "-client");
1688
1689         if (mgs_log_is_empty(env, mgs, cliname)) {
1690                 /* Start client log */
1691                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
1692                                        fsdb->fsdb_clilov);
1693                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
1694                                        fsdb->fsdb_clilmv);
1695         }
1696
1697         /*
1698         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1699         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
1700         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
1701         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1702         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
1703         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
1704         */
1705
1706 #if 0
1707         /* COMPAT_146 */
1708         if (mti->mti_flags & LDD_F_UPGRADE14) {
1709                 rc = record_start_log(obd, &llh, cliname);
1710                 if (rc)
1711                         GOTO(out, rc);
1712
1713                 rc = record_marker(obd, llh, fsdb, CM_START,
1714                                    mti->mti_svname,"add mdc");
1715
1716                 /* Old client log already has MDC entry, but needs mount opt
1717                    for new client name (lustre-client) */
1718                 /* FIXME Old MDT log already has an old mount opt
1719                    which we should remove (currently handled by
1720                    class_del_profiles()) */
1721                 rc = record_mount_opt(obd, llh, cliname, fsdb->fsdb_clilov,
1722                                       fsdb->fsdb_mdc);
1723                 /* end COMPAT_146 */
1724
1725                 rc = record_marker(obd, llh, fsdb, CM_END,
1726                                    mti->mti_svname, "add mdc");
1727         } else
1728 #endif
1729         {
1730                 /* copy client info about lov/lmv */
1731                 comp.comp_mti = mti;
1732                 comp.comp_fsdb = fsdb;
1733
1734                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
1735                                                         &comp);
1736
1737                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
1738                                               fsdb->fsdb_clilmv);
1739                 /* add mountopts */
1740                 rc = record_start_log(env, mgs, &llh, cliname);
1741                 if (rc)
1742                         GOTO(out, rc);
1743
1744                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
1745                                    "mount opts");
1746                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
1747                                       fsdb->fsdb_clilmv);
1748                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
1749                                    "mount opts");
1750         }
1751
1752         rc = record_end_log(env, &llh);
1753 out:
1754         name_destroy(&cliname);
1755
1756         // for_all_existing_mdt except current one
1757         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1758                 char *mdtname;
1759                 if (i !=  mti->mti_stripe_index &&
1760                     cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1761                         name_create_mdt(&mdtname, mti->mti_fsname, i);
1762                         rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb,
1763                                                       mti, mdtname);
1764                         name_destroy(&mdtname);
1765                 }
1766         }
1767
1768         RETURN(rc);
1769 }
1770
1771 /* Add the ost info to the client/mdt lov */
1772 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1773                                     struct mgs_device *mgs, struct fs_db *fsdb,
1774                                     struct mgs_target_info *mti,
1775                                     char *logname, char *suffix, char *lovname,
1776                                     enum lustre_sec_part sec_part, int flags)
1777 {
1778         struct llog_handle *llh = NULL;
1779         char *nodeuuid, *oscname, *oscuuid, *lovuuid, *svname;
1780         char index[6];
1781         int i, rc;
1782
1783         ENTRY;
1784         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
1785                mti->mti_svname, logname);
1786
1787         if (mgs_log_is_empty(env, mgs, logname)) {
1788                 CERROR("log is empty! Logical error\n");
1789                 RETURN (-EINVAL);
1790         }
1791
1792         name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1793         name_create(&svname, mti->mti_svname, "-osc");
1794         name_create(&oscname, svname, suffix);
1795         name_create(&oscuuid, oscname, "_UUID");
1796         name_create(&lovuuid, lovname, "_UUID");
1797
1798         /*
1799         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1800         multihomed (#4)
1801         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
1802         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
1803         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
1804         failover (#6,7)
1805         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1806         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
1807         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
1808         */
1809
1810         rc = record_start_log(env, mgs, &llh, logname);
1811         if (rc)
1812                 GOTO(out, rc);
1813         /* FIXME these should be a single journal transaction */
1814         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
1815                            "add osc");
1816         for (i = 0; i < mti->mti_nid_count; i++) {
1817                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
1818                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1819         }
1820         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
1821         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
1822         rc = mgs_write_log_failnids(env, mti, llh, oscname);
1823         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1824         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
1825         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
1826                            "add osc");
1827         rc = record_end_log(env, &llh);
1828 out:
1829         name_destroy(&lovuuid);
1830         name_destroy(&oscuuid);
1831         name_destroy(&oscname);
1832         name_destroy(&svname);
1833         name_destroy(&nodeuuid);
1834         RETURN(rc);
1835 }
1836
1837 static int mgs_write_log_ost(const struct lu_env *env,
1838                              struct mgs_device *mgs, struct fs_db *fsdb,
1839                              struct mgs_target_info *mti)
1840 {
1841         struct llog_handle *llh = NULL;
1842         char *logname, *lovname;
1843         char *ptr = mti->mti_params;
1844         int rc, flags = 0, failout = 0, i;
1845         ENTRY;
1846
1847         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
1848
1849         /* The ost startup log */
1850
1851         /* If the ost log already exists, that means that someone reformatted
1852            the ost and it called target_add again. */
1853         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
1854                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
1855                                    "exists, yet the server claims it never "
1856                                    "registered. It may have been reformatted, "
1857                                    "or the index changed. writeconf the MDT to "
1858                                    "regenerate all logs.\n", mti->mti_svname);
1859                 RETURN(-EALREADY);
1860         }
1861
1862         /*
1863         attach obdfilter ost1 ost1_UUID
1864         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
1865         */
1866         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1867                 failout = (strncmp(ptr, "failout", 7) == 0);
1868         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
1869         if (rc)
1870                 RETURN(rc);
1871         /* FIXME these should be a single journal transaction */
1872         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
1873         if (*mti->mti_uuid == '\0')
1874                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1875                          "%s_UUID", mti->mti_svname);
1876         rc = record_attach(env, llh, mti->mti_svname,
1877                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
1878         rc = record_setup(env, llh, mti->mti_svname,
1879                           "dev"/*ignored*/, "type"/*ignored*/,
1880                           failout ? "n" : "f", 0/*options*/);
1881         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
1882         rc = record_end_log(env, &llh);
1883
1884         /* We also have to update the other logs where this osc is part of
1885            the lov */
1886
1887         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
1888                 /* If we're upgrading, the old mdt log already has our
1889                    entry. Let's do a fake one for fun. */
1890                 /* Note that we can't add any new failnids, since we don't
1891                    know the old osc names. */
1892                 flags = CM_SKIP | CM_UPGRADE146;
1893
1894         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
1895                 /* If the update flag isn't set, don't update client/mdt
1896                    logs. */
1897                 flags |= CM_SKIP;
1898                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
1899                               "the MDT first to regenerate it.\n",
1900                               mti->mti_svname);
1901         }
1902
1903         /* Add ost to all MDT lov defs */
1904         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1905                 if (cfs_test_bit(i, fsdb->fsdb_mdt_index_map)) {
1906                         char mdt_index[9];
1907
1908                         name_create_mdt_and_lov(&logname, &lovname, fsdb, i);
1909                         sprintf(mdt_index, "-MDT%04x", i);
1910                         mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname,
1911                                                  mdt_index, lovname,
1912                                                  LUSTRE_SP_MDT, flags);
1913                         name_destroy(&logname);
1914                         name_destroy(&lovname);
1915                 }
1916         }
1917
1918         /* Append ost info to the client log */
1919         name_create(&logname, mti->mti_fsname, "-client");
1920         if (mgs_log_is_empty(env, mgs, logname)) {
1921                 /* Start client log */
1922                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
1923                                        fsdb->fsdb_clilov);
1924                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
1925                                        fsdb->fsdb_clilmv);
1926         }
1927         mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
1928                                  fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
1929         name_destroy(&logname);
1930         RETURN(rc);
1931 }
1932
1933 static __inline__ int mgs_param_empty(char *ptr)
1934 {
1935         char *tmp;
1936
1937         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
1938                 return 1;
1939         return 0;
1940 }
1941
1942 static int mgs_write_log_failnid_internal(const struct lu_env *env,
1943                                           struct mgs_device *mgs,
1944                                           struct fs_db *fsdb,
1945                                           struct mgs_target_info *mti,
1946                                           char *logname, char *cliname)
1947 {
1948         int rc;
1949         struct llog_handle *llh = NULL;
1950
1951         if (mgs_param_empty(mti->mti_params)) {
1952                 /* Remove _all_ failnids */
1953                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
1954                                 mti->mti_svname, "add failnid", CM_SKIP);
1955                 return rc;
1956         }
1957
1958         /* Otherwise failover nids are additive */
1959         rc = record_start_log(env, mgs, &llh, logname);
1960         if (!rc) {
1961                 /* FIXME this should be a single journal transaction */
1962                 rc = record_marker(env, llh, fsdb, CM_START,
1963                                    mti->mti_svname, "add failnid");
1964                 rc = mgs_write_log_failnids(env, mti, llh, cliname);
1965                 rc = record_marker(env, llh, fsdb, CM_END,
1966                                    mti->mti_svname, "add failnid");
1967                 rc = record_end_log(env, &llh);
1968         }
1969
1970         return rc;
1971 }
1972
1973
1974 /* Add additional failnids to an existing log.
1975    The mdc/osc must have been added to logs first */
1976 /* tcp nids must be in dotted-quad ascii -
1977    we can't resolve hostnames from the kernel. */
1978 static int mgs_write_log_add_failnid(const struct lu_env *env,
1979                                      struct mgs_device *mgs,
1980                                      struct fs_db *fsdb,
1981                                      struct mgs_target_info *mti)
1982 {
1983         char *logname, *cliname;
1984         int rc;
1985         ENTRY;
1986
1987         /* FIXME we currently can't erase the failnids
1988          * given when a target first registers, since they aren't part of
1989          * an "add uuid" stanza */
1990
1991         /* Verify that we know about this target */
1992         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
1993                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
1994                                    "yet. It must be started before failnids "
1995                                    "can be added.\n", mti->mti_svname);
1996                 RETURN(-ENOENT);
1997         }
1998
1999         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2000         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2001                 name_create(&cliname, mti->mti_svname, "-mdc");
2002         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2003                 name_create(&cliname, mti->mti_svname, "-osc");
2004         } else {
2005                 RETURN(-EINVAL);
2006         }
2007
2008         /* Add failover nids to the client log */
2009         name_create(&logname, mti->mti_fsname, "-client");
2010         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2011         name_destroy(&logname);
2012         name_destroy(&cliname);
2013
2014         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2015                 /* Add OST failover nids to the MDT logs as well */
2016                 int i;
2017
2018                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2019                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2020                                 continue;
2021                         name_create_mdt(&logname, mti->mti_fsname, i);
2022                         name_create_mdt_osc(&cliname, mti->mti_svname, fsdb, i);
2023                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb, mti,
2024                                                             logname, cliname);
2025                         name_destroy(&cliname);
2026                         name_destroy(&logname);
2027                 }
2028         }
2029
2030         RETURN(rc);
2031 }
2032
2033 static int mgs_wlp_lcfg(const struct lu_env *env,
2034                         struct mgs_device *mgs, struct fs_db *fsdb,
2035                         struct mgs_target_info *mti,
2036                         char *logname, struct lustre_cfg_bufs *bufs,
2037                         char *tgtname, char *ptr)
2038 {
2039         char comment[MTI_NAME_MAXLEN];
2040         char *tmp;
2041         struct lustre_cfg *lcfg;
2042         int rc, del;
2043
2044         /* Erase any old settings of this same parameter */
2045         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2046         comment[MTI_NAME_MAXLEN - 1] = 0;
2047         /* But don't try to match the value. */
2048         if ((tmp = strchr(comment, '=')))
2049             *tmp = 0;
2050         /* FIXME we should skip settings that are the same as old values */
2051         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2052         del = mgs_param_empty(ptr);
2053
2054         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2055                       "Sett" : "Modify", tgtname, comment, logname);
2056         if (del)
2057                 return rc;
2058
2059         lustre_cfg_bufs_reset(bufs, tgtname);
2060         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2061         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2062         if (!lcfg)
2063                 return -ENOMEM;
2064         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2065         lustre_cfg_free(lcfg);
2066         return rc;
2067 }
2068
2069 /* write global variable settings into log */
2070 static int mgs_write_log_sys(const struct lu_env *env,
2071                              struct mgs_device *mgs, struct fs_db *fsdb,
2072                              struct mgs_target_info *mti, char *sys, char *ptr)
2073 {
2074         struct lustre_cfg_bufs bufs;
2075         struct lustre_cfg *lcfg;
2076         char *tmp, sep;
2077         int rc, cmd, convert = 1;
2078
2079         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2080                 cmd = LCFG_SET_TIMEOUT;
2081         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2082                 cmd = LCFG_SET_LDLM_TIMEOUT;
2083         /* Check for known params here so we can return error to lctl */
2084         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2085                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2086                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2087                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2088                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2089                 cmd = LCFG_PARAM;
2090         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2091                 convert = 0; /* Don't convert string value to integer */
2092                 cmd = LCFG_PARAM;
2093         } else {
2094                 return -EINVAL;
2095         }
2096
2097         if (mgs_param_empty(ptr))
2098                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2099         else
2100                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2101
2102         lustre_cfg_bufs_reset(&bufs, NULL);
2103         lustre_cfg_bufs_set_string(&bufs, 1, sys);
2104         if (!convert && *tmp != '\0')
2105                 lustre_cfg_bufs_set_string(&bufs, 2, tmp);
2106         lcfg = lustre_cfg_new(cmd, &bufs);
2107         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2108         /* truncate the comment to the parameter name */
2109         ptr = tmp - 1;
2110         sep = *ptr;
2111         *ptr = '\0';
2112         /* modify all servers and clients */
2113         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2114                                       *tmp == '\0' ? NULL : lcfg,
2115                                       mti->mti_fsname, sys, 0);
2116         if (rc == 0 && *tmp != '\0') {
2117                 switch (cmd) {
2118                 case LCFG_SET_TIMEOUT:
2119                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2120                                 class_process_config(lcfg);
2121                         break;
2122                 case LCFG_SET_LDLM_TIMEOUT:
2123                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2124                                 class_process_config(lcfg);
2125                         break;
2126                 default:
2127                         break;
2128                 }
2129         }
2130         *ptr = sep;
2131         lustre_cfg_free(lcfg);
2132         return rc;
2133 }
2134
2135 /* write quota settings into log */
2136 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2137                                struct fs_db *fsdb, struct mgs_target_info *mti,
2138                                char *quota, char *ptr)
2139 {
2140         struct lustre_cfg_bufs bufs;
2141         struct lustre_cfg *lcfg;
2142         char *tmp;
2143         char sep;
2144         int cmd = LCFG_PARAM;
2145         int rc;
2146
2147         /* support only 'meta' and 'data' pools so far */
2148         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2149             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2150                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2151                        "& quota.ost are)\n", ptr);
2152                 return -EINVAL;
2153         }
2154
2155         if (*tmp == '\0') {
2156                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2157         } else {
2158                 CDEBUG(D_MGS, "global '%s'\n", quota);
2159
2160                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2161                     strcmp(tmp, "none") != 0) {
2162                         CERROR("enable option(%s) isn't supported\n", tmp);
2163                         return -EINVAL;
2164                 }
2165         }
2166
2167         lustre_cfg_bufs_reset(&bufs, NULL);
2168         lustre_cfg_bufs_set_string(&bufs, 1, quota);
2169         lcfg = lustre_cfg_new(cmd, &bufs);
2170         /* truncate the comment to the parameter name */
2171         ptr = tmp - 1;
2172         sep = *ptr;
2173         *ptr = '\0';
2174
2175         /* XXX we duplicated quota enable information in all server
2176          *     config logs, it should be moved to a separate config
2177          *     log once we cleanup the config log for global param. */
2178         /* modify all servers */
2179         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2180                                       *tmp == '\0' ? NULL : lcfg,
2181                                       mti->mti_fsname, quota, 1);
2182         *ptr = sep;
2183         lustre_cfg_free(lcfg);
2184         return rc;
2185 }
2186
2187 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2188                                    struct mgs_device *mgs,
2189                                    struct fs_db *fsdb,
2190                                    struct mgs_target_info *mti,
2191                                    char *param)
2192 {
2193         struct llog_handle     *llh = NULL;
2194         char                   *logname;
2195         char                   *comment, *ptr;
2196         struct lustre_cfg_bufs  bufs;
2197         struct lustre_cfg      *lcfg;
2198         int                     rc, len;
2199         ENTRY;
2200
2201         /* get comment */
2202         ptr = strchr(param, '=');
2203         LASSERT(ptr);
2204         len = ptr - param;
2205
2206         OBD_ALLOC(comment, len + 1);
2207         if (comment == NULL)
2208                 RETURN(-ENOMEM);
2209         strncpy(comment, param, len);
2210         comment[len] = '\0';
2211
2212         /* prepare lcfg */
2213         lustre_cfg_bufs_reset(&bufs, mti->mti_svname);
2214         lustre_cfg_bufs_set_string(&bufs, 1, param);
2215         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &bufs);
2216         if (lcfg == NULL)
2217                 GOTO(out_comment, rc = -ENOMEM);
2218
2219         /* construct log name */
2220         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2221         if (rc)
2222                 GOTO(out_lcfg, rc);
2223
2224         if (mgs_log_is_empty(env, mgs, logname)) {
2225                 rc = record_start_log(env, mgs, &llh, logname);
2226                 record_end_log(env, &llh);
2227                 if (rc)
2228                         GOTO(out, rc);
2229         }
2230
2231         /* obsolete old one */
2232         mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2233                    comment, CM_SKIP);
2234
2235         /* write the new one */
2236         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2237                                   mti->mti_svname, comment);
2238         if (rc)
2239                 CERROR("err %d writing log %s\n", rc, logname);
2240
2241 out:
2242         name_destroy(&logname);
2243 out_lcfg:
2244         lustre_cfg_free(lcfg);
2245 out_comment:
2246         OBD_FREE(comment, len + 1);
2247         RETURN(rc);
2248 }
2249
2250 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2251                                         char *param)
2252 {
2253         char    *ptr;
2254
2255         /* disable the adjustable udesc parameter for now, i.e. use default
2256          * setting that client always ship udesc to MDT if possible. to enable
2257          * it simply remove the following line */
2258         goto error_out;
2259
2260         ptr = strchr(param, '=');
2261         if (ptr == NULL)
2262                 goto error_out;
2263         *ptr++ = '\0';
2264
2265         if (strcmp(param, PARAM_SRPC_UDESC))
2266                 goto error_out;
2267
2268         if (strcmp(ptr, "yes") == 0) {
2269                 cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2270                 CWARN("Enable user descriptor shipping from client to MDT\n");
2271         } else if (strcmp(ptr, "no") == 0) {
2272                 cfs_clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2273                 CWARN("Disable user descriptor shipping from client to MDT\n");
2274         } else {
2275                 *(ptr - 1) = '=';
2276                 goto error_out;
2277         }
2278         return 0;
2279
2280 error_out:
2281         CERROR("Invalid param: %s\n", param);
2282         return -EINVAL;
2283 }
2284
2285 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2286                                   const char *svname,
2287                                   char *param)
2288 {
2289         struct sptlrpc_rule      rule;
2290         struct sptlrpc_rule_set *rset;
2291         int                      rc;
2292         ENTRY;
2293
2294         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2295                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2296                 RETURN(-EINVAL);
2297         }
2298
2299         if (strncmp(param, PARAM_SRPC_UDESC,
2300                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2301                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2302         }
2303
2304         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2305                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2306                 RETURN(-EINVAL);
2307         }
2308
2309         param += sizeof(PARAM_SRPC_FLVR) - 1;
2310
2311         rc = sptlrpc_parse_rule(param, &rule);
2312         if (rc)
2313                 RETURN(rc);
2314
2315         /* mgs rules implies must be mgc->mgs */
2316         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2317                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2318                      rule.sr_from != LUSTRE_SP_ANY) ||
2319                     (rule.sr_to != LUSTRE_SP_MGS &&
2320                      rule.sr_to != LUSTRE_SP_ANY))
2321                         RETURN(-EINVAL);
2322         }
2323
2324         /* preapre room for this coming rule. svcname format should be:
2325          * - fsname: general rule
2326          * - fsname-tgtname: target-specific rule
2327          */
2328         if (strchr(svname, '-')) {
2329                 struct mgs_tgt_srpc_conf *tgtconf;
2330                 int                       found = 0;
2331
2332                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2333                      tgtconf = tgtconf->mtsc_next) {
2334                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2335                                 found = 1;
2336                                 break;
2337                         }
2338                 }
2339
2340                 if (!found) {
2341                         int name_len;
2342
2343                         OBD_ALLOC_PTR(tgtconf);
2344                         if (tgtconf == NULL)
2345                                 RETURN(-ENOMEM);
2346
2347                         name_len = strlen(svname);
2348
2349                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
2350                         if (tgtconf->mtsc_tgt == NULL) {
2351                                 OBD_FREE_PTR(tgtconf);
2352                                 RETURN(-ENOMEM);
2353                         }
2354                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
2355
2356                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
2357                         fsdb->fsdb_srpc_tgt = tgtconf;
2358                 }
2359
2360                 rset = &tgtconf->mtsc_rset;
2361         } else {
2362                 rset = &fsdb->fsdb_srpc_gen;
2363         }
2364
2365         rc = sptlrpc_rule_set_merge(rset, &rule);
2366
2367         RETURN(rc);
2368 }
2369
2370 static int mgs_srpc_set_param(const struct lu_env *env,
2371                               struct mgs_device *mgs,
2372                               struct fs_db *fsdb,
2373                               struct mgs_target_info *mti,
2374                               char *param)
2375 {
2376         char                   *copy;
2377         int                     rc, copy_size;
2378         ENTRY;
2379
2380 #ifndef HAVE_GSS
2381         RETURN(-EINVAL);
2382 #endif
2383         /* keep a copy of original param, which could be destroied
2384          * during parsing */
2385         copy_size = strlen(param) + 1;
2386         OBD_ALLOC(copy, copy_size);
2387         if (copy == NULL)
2388                 return -ENOMEM;
2389         memcpy(copy, param, copy_size);
2390
2391         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
2392         if (rc)
2393                 goto out_free;
2394
2395         /* previous steps guaranteed the syntax is correct */
2396         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
2397         if (rc)
2398                 goto out_free;
2399
2400         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2401                 /*
2402                  * for mgs rules, make them effective immediately.
2403                  */
2404                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
2405                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
2406                                                  &fsdb->fsdb_srpc_gen);
2407         }
2408
2409 out_free:
2410         OBD_FREE(copy, copy_size);
2411         RETURN(rc);
2412 }
2413
2414 struct mgs_srpc_read_data {
2415         struct fs_db   *msrd_fsdb;
2416         int             msrd_skip;
2417 };
2418
2419 static int mgs_srpc_read_handler(const struct lu_env *env,
2420                                  struct llog_handle *llh,
2421                                  struct llog_rec_hdr *rec, void *data)
2422 {
2423         struct mgs_srpc_read_data *msrd = data;
2424         struct cfg_marker         *marker;
2425         struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
2426         char                      *svname, *param;
2427         int                        cfg_len, rc;
2428         ENTRY;
2429
2430         if (rec->lrh_type != OBD_CFG_REC) {
2431                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2432                 RETURN(-EINVAL);
2433         }
2434
2435         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
2436                   sizeof(struct llog_rec_tail);
2437
2438         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2439         if (rc) {
2440                 CERROR("Insane cfg\n");
2441                 RETURN(rc);
2442         }
2443
2444         if (lcfg->lcfg_command == LCFG_MARKER) {
2445                 marker = lustre_cfg_buf(lcfg, 1);
2446
2447                 if (marker->cm_flags & CM_START &&
2448                     marker->cm_flags & CM_SKIP)
2449                         msrd->msrd_skip = 1;
2450                 if (marker->cm_flags & CM_END)
2451                         msrd->msrd_skip = 0;
2452
2453                 RETURN(0);
2454         }
2455
2456         if (msrd->msrd_skip)
2457                 RETURN(0);
2458
2459         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
2460                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
2461                 RETURN(0);
2462         }
2463
2464         svname = lustre_cfg_string(lcfg, 0);
2465         if (svname == NULL) {
2466                 CERROR("svname is empty\n");
2467                 RETURN(0);
2468         }
2469
2470         param = lustre_cfg_string(lcfg, 1);
2471         if (param == NULL) {
2472                 CERROR("param is empty\n");
2473                 RETURN(0);
2474         }
2475
2476         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
2477         if (rc)
2478                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
2479
2480         RETURN(0);
2481 }
2482
2483 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
2484                                 struct mgs_device *mgs,
2485                                 struct fs_db *fsdb)
2486 {
2487         struct llog_handle        *llh = NULL;
2488         struct lvfs_run_ctxt       saved;
2489         struct llog_ctxt          *ctxt;
2490         char                      *logname;
2491         struct mgs_srpc_read_data  msrd;
2492         int                        rc;
2493         ENTRY;
2494
2495         /* construct log name */
2496         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
2497         if (rc)
2498                 RETURN(rc);
2499
2500         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2501         LASSERT(ctxt != NULL);
2502
2503         if (mgs_log_is_empty(env, mgs, logname))
2504                 GOTO(out, rc = 0);
2505
2506         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
2507
2508         rc = llog_open(NULL, ctxt, &llh, NULL, logname,
2509                        LLOG_OPEN_EXISTS);
2510         if (rc < 0) {
2511                 if (rc == -ENOENT)
2512                         rc = 0;
2513                 GOTO(out_pop, rc);
2514         }
2515
2516         rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
2517         if (rc)
2518                 GOTO(out_close, rc);
2519
2520         if (llog_get_size(llh) <= 1)
2521                 GOTO(out_close, rc = 0);
2522
2523         msrd.msrd_fsdb = fsdb;
2524         msrd.msrd_skip = 0;
2525
2526         rc = llog_process(NULL, llh, mgs_srpc_read_handler, (void *)&msrd,
2527                           NULL);
2528
2529 out_close:
2530         llog_close(NULL, llh);
2531 out_pop:
2532         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
2533 out:
2534         llog_ctxt_put(ctxt);
2535         name_destroy(&logname);
2536
2537         if (rc)
2538                 CERROR("failed to read sptlrpc config database: %d\n", rc);
2539         RETURN(rc);
2540 }
2541
2542 /* Permanent settings of all parameters by writing into the appropriate
2543  * configuration logs.
2544  * A parameter with null value ("<param>='\0'") means to erase it out of
2545  * the logs.
2546  */
2547 static int mgs_write_log_param(const struct lu_env *env,
2548                                struct mgs_device *mgs, struct fs_db *fsdb,
2549                                struct mgs_target_info *mti, char *ptr)
2550 {
2551         struct lustre_cfg_bufs bufs;
2552         char *logname;
2553         char *tmp;
2554         int rc = 0, rc2 = 0;
2555         ENTRY;
2556
2557         /* For various parameter settings, we have to figure out which logs
2558            care about them (e.g. both mdt and client for lov settings) */
2559         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2560
2561         /* The params are stored in MOUNT_DATA_FILE and modified via
2562            tunefs.lustre, or set using lctl conf_param */
2563
2564         /* Processed in lustre_start_mgc */
2565         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
2566                 GOTO(end, rc);
2567
2568         /* Processed in ost/mdt */
2569         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
2570                 GOTO(end, rc);
2571
2572         /* Processed in mgs_write_log_ost */
2573         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
2574                 if (mti->mti_flags & LDD_F_PARAM) {
2575                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
2576                                            "changed with tunefs.lustre"
2577                                            "and --writeconf\n", ptr);
2578                         rc = -EPERM;
2579                 }
2580                 GOTO(end, rc);
2581         }
2582
2583         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
2584                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
2585                 GOTO(end, rc);
2586         }
2587
2588         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
2589                 /* Add a failover nidlist */
2590                 rc = 0;
2591                 /* We already processed failovers params for new
2592                    targets in mgs_write_log_target */
2593                 if (mti->mti_flags & LDD_F_PARAM) {
2594                         CDEBUG(D_MGS, "Adding failnode\n");
2595                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
2596                 }
2597                 GOTO(end, rc);
2598         }
2599
2600         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
2601                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
2602                 GOTO(end, rc);
2603         }
2604
2605         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
2606                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
2607                 GOTO(end, rc);
2608         }
2609
2610         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
2611                 /* active=0 means off, anything else means on */
2612                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
2613                 int i;
2614
2615                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2616                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
2617                                            "be (de)activated.\n",
2618                                            mti->mti_svname);
2619                         GOTO(end, rc = -EINVAL);
2620                 }
2621                 LCONSOLE_WARN("Permanently %sactivating %s\n",
2622                               flag ? "de": "re", mti->mti_svname);
2623                 /* Modify clilov */
2624                 name_create(&logname, mti->mti_fsname, "-client");
2625                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2626                                 mti->mti_svname, "add osc", flag);
2627                 name_destroy(&logname);
2628                 if (rc)
2629                         goto active_err;
2630                 /* Modify mdtlov */
2631                 /* Add to all MDT logs for CMD */
2632                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2633                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2634                                 continue;
2635                         name_create_mdt(&logname, mti->mti_fsname, i);
2636                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
2637                                         mti->mti_svname, "add osc", flag);
2638                         name_destroy(&logname);
2639                         if (rc)
2640                                 goto active_err;
2641                 }
2642         active_err:
2643                 if (rc) {
2644                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
2645                                            "log (%d). No permanent "
2646                                            "changes were made to the "
2647                                            "config log.\n",
2648                                            mti->mti_svname, rc);
2649                         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
2650                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
2651                                                    " because the log"
2652                                                    "is in the old 1.4"
2653                                                    "style. Consider "
2654                                                    " --writeconf to "
2655                                                    "update the logs.\n");
2656                         GOTO(end, rc);
2657                 }
2658                 /* Fall through to osc proc for deactivating live OSC
2659                    on running MDT / clients. */
2660         }
2661         /* Below here, let obd's XXX_process_config methods handle it */
2662
2663         /* All lov. in proc */
2664         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
2665                 char *mdtlovname;
2666
2667                 CDEBUG(D_MGS, "lov param %s\n", ptr);
2668                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
2669                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
2670                                            "set on the MDT, not %s. "
2671                                            "Ignoring.\n",
2672                                            mti->mti_svname);
2673                         GOTO(end, rc = 0);
2674                 }
2675
2676                 /* Modify mdtlov */
2677                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2678                         GOTO(end, rc = -ENODEV);
2679
2680                 name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
2681                                         mti->mti_stripe_index);
2682                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2683                                   &bufs, mdtlovname, ptr);
2684                 name_destroy(&logname);
2685                 name_destroy(&mdtlovname);
2686                 if (rc)
2687                         GOTO(end, rc);
2688
2689                 /* Modify clilov */
2690                 name_create(&logname, mti->mti_fsname, "-client");
2691                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &bufs,
2692                                   fsdb->fsdb_clilov, ptr);
2693                 name_destroy(&logname);
2694                 GOTO(end, rc);
2695         }
2696
2697         /* All osc., mdc., llite. params in proc */
2698         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
2699             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
2700             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
2701                 char *cname;
2702                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
2703                         name_create(&cname, mti->mti_fsname, "-client");
2704                         /* Add the client type to match the obdname in
2705                            class_config_llog_handler */
2706                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2707                         /* COMPAT_146 */
2708                         if (fsdb->fsdb_mdc)
2709                                 name_create(&cname, fsdb->fsdb_mdc, "");
2710                         else
2711                                 name_create(&cname, mti->mti_svname,
2712                                             "-mdc");
2713                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2714                         /* COMPAT_146 */
2715                         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2716                                 LCONSOLE_ERROR_MSG(0x148, "Upgraded "
2717                                                    "client logs for %s"
2718                                                    " cannot be "
2719                                                    "modified. Consider"
2720                                                    " updating the "
2721                                                    "configuration with"
2722                                                    " --writeconf\n",
2723                                                    mti->mti_svname);
2724                                 /* We don't know the names of all the
2725                                    old oscs*/
2726                                 GOTO(end, rc = -EINVAL);
2727                         }
2728                         name_create(&cname, mti->mti_svname, "-osc");
2729                 } else {
2730                         GOTO(end, rc = -EINVAL);
2731                 }
2732
2733                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2734
2735                 /* Modify client */
2736                 name_create(&logname, mti->mti_fsname, "-client");
2737                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &bufs,
2738                                   cname, ptr);
2739
2740                 /* osc params affect the MDT as well */
2741                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2742                         int i;
2743
2744                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2745                                 if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2746                                         continue;
2747                                 name_destroy(&cname);
2748                                 name_create_mdt_osc(&cname, mti->mti_svname,
2749                                                     fsdb, i);
2750                                 name_destroy(&logname);
2751                                 name_create_mdt(&logname, mti->mti_fsname, i);
2752                                 if (!mgs_log_is_empty(env, mgs, logname))
2753                                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2754                                                           logname, &bufs, cname,
2755                                                           ptr);
2756                                 if (rc)
2757                                         break;
2758                         }
2759                 }
2760                 name_destroy(&logname);
2761                 name_destroy(&cname);
2762                 GOTO(end, rc);
2763         }
2764
2765         /* All mdt. params in proc */
2766         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
2767                 int i;
2768                 __u32 idx;
2769
2770                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2771                 if (strncmp(mti->mti_svname, mti->mti_fsname,
2772                             MTI_NAME_MAXLEN) == 0)
2773                         /* device is unspecified completely? */
2774                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
2775                 else
2776                         rc = server_name2index(mti->mti_svname, &idx, NULL);
2777                 if (rc < 0)
2778                         goto active_err;
2779                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
2780                         goto active_err;
2781                 if (rc & LDD_F_SV_ALL) {
2782                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2783                                 if (!cfs_test_bit(i,
2784                                                   fsdb->fsdb_mdt_index_map))
2785                                         continue;
2786                                 name_create_mdt(&logname, mti->mti_fsname, i);
2787                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2788                                                   logname, &bufs,
2789                                                   logname, ptr);
2790                                 name_destroy(&logname);
2791                                 if (rc)
2792                                         goto active_err;
2793                         }
2794                 } else {
2795                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2796                                           mti->mti_svname, &bufs,
2797                                           mti->mti_svname, ptr);
2798                         if (rc)
2799                                 goto active_err;
2800                 }
2801                 GOTO(end, rc);
2802         }
2803
2804         /* All mdd., ost. params in proc */
2805         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
2806             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
2807                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2808                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2809                         GOTO(end, rc = -ENODEV);
2810
2811                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2812                                   &bufs, mti->mti_svname, ptr);
2813                 GOTO(end, rc);
2814         }
2815
2816         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
2817         rc2 = -ENOSYS;
2818
2819 end:
2820         if (rc)
2821                 CERROR("err %d on param '%s'\n", rc, ptr);
2822
2823         RETURN(rc ?: rc2);
2824 }
2825
2826 /* Not implementing automatic failover nid addition at this time. */
2827 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
2828                       struct mgs_target_info *mti)
2829 {
2830 #if 0
2831         struct fs_db *fsdb;
2832         int rc;
2833         ENTRY;
2834
2835         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
2836         if (rc)
2837                 RETURN(rc);
2838
2839         if (mgs_log_is_empty(obd, mti->mti_svname))
2840                 /* should never happen */
2841                 RETURN(-ENOENT);
2842
2843         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
2844
2845         /* FIXME We can just check mti->params to see if we're already in
2846            the failover list.  Modify mti->params for rewriting back at
2847            server_register_target(). */
2848
2849         cfs_mutex_lock(&fsdb->fsdb_mutex);
2850         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
2851         cfs_mutex_unlock(&fsdb->fsdb_mutex);
2852
2853         RETURN(rc);
2854 #endif
2855         return 0;
2856 }
2857
2858 int mgs_write_log_target(const struct lu_env *env,
2859                          struct mgs_device *mgs,
2860                          struct mgs_target_info *mti,
2861                          struct fs_db *fsdb)
2862 {
2863         int rc = -EINVAL;
2864         char *buf, *params;
2865         ENTRY;
2866
2867         /* set/check the new target index */
2868         rc = mgs_set_index(env, mgs, mti);
2869         if (rc < 0) {
2870                 CERROR("Can't get index (%d)\n", rc);
2871                 RETURN(rc);
2872         }
2873
2874         /* COMPAT_146 */
2875         if (mti->mti_flags & LDD_F_UPGRADE14) {
2876                 if (rc == EALREADY) {
2877                         LCONSOLE_INFO("Found index %d for %s 1.4 log, "
2878                                       "upgrading\n", mti->mti_stripe_index,
2879                                       mti->mti_svname);
2880                 } else {
2881                         LCONSOLE_ERROR_MSG(0x149, "Failed to find %s in the old"
2882                                            " client log. Apparently it is not "
2883                                            "part of this filesystem, or the old"
2884                                            " log is wrong.\nUse 'writeconf' on "
2885                                            "the MDT to force log regeneration."
2886                                            "\n", mti->mti_svname);
2887                         /* Not in client log?  Upgrade anyhow...*/
2888                         /* Argument against upgrading: reformat MDT,
2889                            upgrade OST, then OST will start but will be SKIPped
2890                            in client logs.  Maybe error now is better. */
2891                         /* RETURN(-EINVAL); */
2892                 }
2893                 /* end COMPAT_146 */
2894         } else {
2895                 if (rc == EALREADY) {
2896                         LCONSOLE_WARN("Found index %d for %s, updating log\n",
2897                                       mti->mti_stripe_index, mti->mti_svname);
2898                         /* We would like to mark old log sections as invalid
2899                            and add new log sections in the client and mdt logs.
2900                            But if we add new sections, then live clients will
2901                            get repeat setup instructions for already running
2902                            osc's. So don't update the client/mdt logs. */
2903                         mti->mti_flags &= ~LDD_F_UPDATE;
2904                 }
2905         }
2906
2907         cfs_mutex_lock(&fsdb->fsdb_mutex);
2908
2909         if (mti->mti_flags &
2910             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
2911                 /* Generate a log from scratch */
2912                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2913                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
2914                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2915                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
2916                 } else {
2917                         CERROR("Unknown target type %#x, can't create log for "
2918                                "%s\n", mti->mti_flags, mti->mti_svname);
2919                 }
2920                 if (rc) {
2921                         CERROR("Can't write logs for %s (%d)\n",
2922                                mti->mti_svname, rc);
2923                         GOTO(out_up, rc);
2924                 }
2925         } else {
2926                 /* Just update the params from tunefs in mgs_write_log_params */
2927                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
2928                 mti->mti_flags |= LDD_F_PARAM;
2929         }
2930
2931         /* allocate temporary buffer, where class_get_next_param will
2932            make copy of a current  parameter */
2933         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
2934         if (buf == NULL)
2935                 GOTO(out_up, rc = -ENOMEM);
2936         params = mti->mti_params;
2937         while (params != NULL) {
2938                 rc = class_get_next_param(&params, buf);
2939                 if (rc) {
2940                         if (rc == 1)
2941                                 /* there is no next parameter, that is
2942                                    not an error */
2943                                 rc = 0;
2944                         break;
2945                 }
2946                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
2947                        params, buf);
2948                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
2949                 if (rc)
2950                         break;
2951         }
2952
2953         OBD_FREE(buf, strlen(mti->mti_params) + 1);
2954
2955 out_up:
2956         cfs_mutex_unlock(&fsdb->fsdb_mutex);
2957         RETURN(rc);
2958 }
2959
2960 /* COMPAT_146 */
2961 /* verify that we can handle the old config logs */
2962 int mgs_upgrade_sv_14(const struct lu_env *env, struct mgs_device *mgs,
2963                       struct mgs_target_info *mti, struct fs_db *fsdb)
2964 {
2965         int rc = 0;
2966         ENTRY;
2967
2968         /* Create ost log normally, as servers register.  Servers
2969            register with their old uuids (from last_rcvd), so old
2970            (MDT and client) logs should work.
2971          - new MDT won't know about old OSTs, only the ones that have
2972            registered, so we need the old MDT log to get the LOV right
2973            in order for old clients to work.
2974          - Old clients connect to the MDT, not the MGS, for their logs, and
2975            will therefore receive the old client log from the MDT /LOGS dir.
2976          - Old clients can continue to use and connect to old or new OSTs
2977          - New clients will contact the MGS for their log
2978         */
2979
2980         LCONSOLE_INFO("upgrading server %s from pre-1.6\n", mti->mti_svname);
2981         server_mti_print("upgrade", mti);
2982
2983         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
2984                 LCONSOLE_ERROR_MSG(0x14a, "The old client log %s-client is "
2985                                    "missing.  Was tunefs.lustre successful?\n",
2986                                    mti->mti_fsname);
2987                 RETURN(-ENOENT);
2988         }
2989
2990         if (fsdb->fsdb_gen == 0) {
2991                 /* There were no markers in the client log, meaning we have
2992                    not updated the logs for this fs */
2993                 CDEBUG(D_MGS, "found old, unupdated client log\n");
2994         }
2995
2996         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2997                 if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2998                         LCONSOLE_ERROR_MSG(0x14b, "The old MDT log %s is "
2999                                            "missing. Was tunefs.lustre "
3000                                            "successful?\n",
3001                                            mti->mti_svname);
3002                         RETURN(-ENOENT);
3003                 }
3004                 /* We're starting with an old uuid.  Assume old name for lov
3005                    as well since the lov entry already exists in the log. */
3006                 CDEBUG(D_MGS, "old mds uuid %s\n", mti->mti_uuid);
3007                 if (strncmp(mti->mti_uuid, fsdb->fsdb_mdtlov + 4,
3008                             strlen(fsdb->fsdb_mdtlov) - 4) != 0) {
3009                         CERROR("old mds uuid %s doesn't match log %s (%s)\n",
3010                                mti->mti_uuid, fsdb->fsdb_mdtlov,
3011                                fsdb->fsdb_mdtlov + 4);
3012                         RETURN(-EINVAL);
3013                 }
3014         }
3015
3016         if (!cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3017                 LCONSOLE_ERROR_MSG(0x14c, "%s-client is supposedly an old "
3018                                    "log, but no old LOV or MDT was found. "
3019                                    "Consider updating the configuration with"
3020                                    " --writeconf.\n", mti->mti_fsname);
3021         }
3022
3023         RETURN(rc);
3024 }
3025 /* end COMPAT_146 */
3026
3027 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3028 {
3029         struct lvfs_run_ctxt     saved;
3030         struct llog_ctxt        *ctxt;
3031         int                      rc = 0;
3032         struct obd_device *obd = mgs->mgs_obd;
3033
3034         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
3035         if (ctxt == NULL) {
3036                 CERROR("%s: MGS config context doesn't exist\n",
3037                        obd->obd_name);
3038                 rc = -ENODEV;
3039         } else {
3040                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3041                 rc = llog_erase(NULL, ctxt, NULL, name);
3042                 /* llog may not exist */
3043                 if (rc == -ENOENT)
3044                         rc = 0;
3045                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3046                 llog_ctxt_put(ctxt);
3047         }
3048
3049         if (rc)
3050                 CERROR("%s: failed to clear log %s: %d\n", obd->obd_name,
3051                        name, rc);
3052
3053         return rc;
3054 }
3055
3056 /* erase all logs for the given fs */
3057 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3058 {
3059         struct fs_db *fsdb;
3060         cfs_list_t dentry_list;
3061         struct l_linux_dirent *dirent, *n;
3062         int rc, len = strlen(fsname);
3063         char *suffix;
3064         ENTRY;
3065
3066         /* Find all the logs in the CONFIGS directory */
3067         rc = class_dentry_readdir(env, mgs, &dentry_list);
3068         if (rc) {
3069                 CERROR("Can't read %s dir\n", MOUNT_CONFIGS_DIR);
3070                 RETURN(rc);
3071         }
3072
3073         cfs_mutex_lock(&mgs->mgs_mutex);
3074
3075         /* Delete the fs db */
3076         fsdb = mgs_find_fsdb(mgs, fsname);
3077         if (fsdb)
3078                 mgs_free_fsdb(mgs, fsdb);
3079
3080         cfs_list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
3081                 cfs_list_del(&dirent->lld_list);
3082                 suffix = strrchr(dirent->lld_name, '-');
3083                 if (suffix != NULL) {
3084                         if ((len == suffix - dirent->lld_name) &&
3085                             (strncmp(fsname, dirent->lld_name, len) == 0)) {
3086                                 CDEBUG(D_MGS, "Removing log %s\n",
3087                                        dirent->lld_name);
3088                                 mgs_erase_log(env, mgs, dirent->lld_name);
3089                         }
3090                 }
3091                 OBD_FREE(dirent, sizeof(*dirent));
3092         }
3093
3094         cfs_mutex_unlock(&mgs->mgs_mutex);
3095
3096         RETURN(rc);
3097 }
3098
3099 /* from llog_swab */
3100 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3101 {
3102         int i;
3103         ENTRY;
3104
3105         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3106         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3107
3108         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3109         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3110         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3111         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3112
3113         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3114         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3115                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3116                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3117                                i, lcfg->lcfg_buflens[i],
3118                                lustre_cfg_string(lcfg, i));
3119                 }
3120         EXIT;
3121 }
3122
3123 /* Set a permanent (config log) param for a target or fs
3124  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3125  *             buf1 contains the single parameter
3126  */
3127 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3128                  struct lustre_cfg *lcfg, char *fsname)
3129 {
3130         struct fs_db *fsdb;
3131         struct mgs_target_info *mti;
3132         char *devname, *param;
3133         char *ptr, *tmp;
3134         __u32 index;
3135         int rc = 0;
3136         ENTRY;
3137
3138         print_lustre_cfg(lcfg);
3139
3140         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3141         devname = lustre_cfg_string(lcfg, 0);
3142         param = lustre_cfg_string(lcfg, 1);
3143         if (!devname) {
3144                 /* Assume device name embedded in param:
3145                    lustre-OST0000.osc.max_dirty_mb=32 */
3146                 ptr = strchr(param, '.');
3147                 if (ptr) {
3148                         devname = param;
3149                         *ptr = 0;
3150                         param = ptr + 1;
3151                 }
3152         }
3153         if (!devname) {
3154                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3155                 RETURN(-ENOSYS);
3156         }
3157
3158         /* Extract fsname */
3159         ptr = strrchr(devname, '-');
3160         memset(fsname, 0, MTI_NAME_MAXLEN);
3161         if (ptr && (server_name2index(ptr, &index, NULL) >= 0)) {
3162                 /* param related to llite isn't allowed to set by OST or MDT */
3163                 if (strncmp(param, PARAM_LLITE, sizeof(PARAM_LLITE)) == 0)
3164                         RETURN(-EINVAL);
3165
3166                 strncpy(fsname, devname, ptr - devname);
3167         } else {
3168                 /* assume devname is the fsname */
3169                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3170         }
3171         fsname[MTI_NAME_MAXLEN - 1] = 0;
3172         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3173
3174         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3175         if (rc)
3176                 RETURN(rc);
3177         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3178             cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3179                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3180                        "is '%s'\n", fsname, devname);
3181                 mgs_free_fsdb(mgs, fsdb);
3182                 RETURN(-EINVAL);
3183         }
3184
3185         /* Create a fake mti to hold everything */
3186         OBD_ALLOC_PTR(mti);
3187         if (!mti)
3188                 GOTO(out, rc = -ENOMEM);
3189         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3190         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3191         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3192         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3193         if (rc < 0)
3194                 /* Not a valid server; may be only fsname */
3195                 rc = 0;
3196         else
3197                 /* Strip -osc or -mdc suffix from svname */
3198                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3199                                      mti->mti_svname))
3200                         GOTO(out, rc = -EINVAL);
3201
3202         mti->mti_flags = rc | LDD_F_PARAM;
3203
3204         cfs_mutex_lock(&fsdb->fsdb_mutex);
3205         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3206         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3207
3208         /*
3209          * Revoke lock so everyone updates.  Should be alright if
3210          * someone was already reading while we were updating the logs,
3211          * so we don't really need to hold the lock while we're
3212          * writing (above).
3213          */
3214         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3215 out:
3216         OBD_FREE_PTR(mti);
3217         RETURN(rc);
3218 }
3219
3220 static int mgs_write_log_pool(const struct lu_env *env,
3221                               struct mgs_device *mgs, char *logname,
3222                               struct fs_db *fsdb, char *lovname,
3223                               enum lcfg_command_type cmd,
3224                               char *poolname, char *fsname,
3225                               char *ostname, char *comment)
3226 {
3227         struct llog_handle *llh = NULL;
3228         int rc;
3229
3230         rc = record_start_log(env, mgs, &llh, logname);
3231         if (rc)
3232                 return rc;
3233         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3234         record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3235         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3236         rc = record_end_log(env, &llh);
3237
3238         return rc;
3239 }
3240
3241 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3242                  enum lcfg_command_type cmd, char *fsname,
3243                  char *poolname, char *ostname)
3244 {
3245         struct fs_db *fsdb;
3246         char *lovname;
3247         char *logname;
3248         char *label = NULL, *canceled_label = NULL;
3249         int label_sz;
3250         struct mgs_target_info *mti = NULL;
3251         int rc, i;
3252         ENTRY;
3253
3254         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3255         if (rc) {
3256                 CERROR("Can't get db for %s\n", fsname);
3257                 RETURN(rc);
3258         }
3259         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3260                 CERROR("%s is not defined\n", fsname);
3261                 mgs_free_fsdb(mgs, fsdb);
3262                 RETURN(-EINVAL);
3263         }
3264
3265         label_sz = 10 + strlen(fsname) + strlen(poolname);
3266
3267         /* check if ostname match fsname */
3268         if (ostname != NULL) {
3269                 char *ptr;
3270
3271                 ptr = strrchr(ostname, '-');
3272                 if ((ptr == NULL) ||
3273                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3274                         RETURN(-EINVAL);
3275                 label_sz += strlen(ostname);
3276         }
3277
3278         OBD_ALLOC(label, label_sz);
3279         if (label == NULL)
3280                 GOTO(out, rc = -ENOMEM);
3281
3282         switch(cmd) {
3283         case LCFG_POOL_NEW:
3284                 sprintf(label,
3285                         "new %s.%s", fsname, poolname);
3286                 break;
3287         case LCFG_POOL_ADD:
3288                 sprintf(label,
3289                         "add %s.%s.%s", fsname, poolname, ostname);
3290                 break;
3291         case LCFG_POOL_REM:
3292                 OBD_ALLOC(canceled_label, label_sz);
3293                 if (canceled_label == NULL)
3294                          GOTO(out, rc = -ENOMEM);
3295                 sprintf(label,
3296                         "rem %s.%s.%s", fsname, poolname, ostname);
3297                 sprintf(canceled_label,
3298                         "add %s.%s.%s", fsname, poolname, ostname);
3299                 break;
3300         case LCFG_POOL_DEL:
3301                 OBD_ALLOC(canceled_label, label_sz);
3302                 if (canceled_label == NULL)
3303                          GOTO(out, rc = -ENOMEM);
3304                 sprintf(label,
3305                         "del %s.%s", fsname, poolname);
3306                 sprintf(canceled_label,
3307                         "new %s.%s", fsname, poolname);
3308                 break;
3309         default:
3310                 break;
3311         }
3312
3313         cfs_mutex_lock(&fsdb->fsdb_mutex);
3314
3315         if (canceled_label != NULL) {
3316                 OBD_ALLOC_PTR(mti);
3317                 if (mti == NULL)
3318                         GOTO(out, rc = -ENOMEM);
3319         }
3320
3321         /* write pool def to all MDT logs */
3322         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3323                  if (cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3324                         name_create_mdt_and_lov(&logname, &lovname, fsdb, i);
3325
3326                         if (canceled_label != NULL) {
3327                                 strcpy(mti->mti_svname, "lov pool");
3328                                 mgs_modify(env, mgs, fsdb, mti, logname,
3329                                            lovname, canceled_label,
3330                                            CM_SKIP);
3331                         }
3332
3333                         mgs_write_log_pool(env, mgs, logname, fsdb, lovname,
3334                                            cmd, fsname, poolname, ostname,
3335                                            label);
3336                         name_destroy(&logname);
3337                         name_destroy(&lovname);
3338                 }
3339         }
3340
3341         name_create(&logname, fsname, "-client");
3342         if (canceled_label != NULL)
3343                 mgs_modify(env, mgs, fsdb, mti, logname, fsdb->fsdb_clilov,
3344                            canceled_label, CM_SKIP);
3345
3346         mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3347                            cmd, fsname, poolname, ostname, label);
3348         name_destroy(&logname);
3349
3350         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3351         /* request for update */
3352         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3353
3354         EXIT;
3355 out:
3356         if (label != NULL)
3357                 OBD_FREE(label, label_sz);
3358
3359         if (canceled_label != NULL)
3360                 OBD_FREE(canceled_label, label_sz);
3361
3362         if (mti != NULL)
3363                 OBD_FREE_PTR(mti);
3364
3365         return rc;
3366 }
3367
3368 #if 0
3369 /******************** unused *********************/
3370 static int mgs_backup_llog(struct obd_device *obd, char* fsname)
3371 {
3372         struct file *filp, *bak_filp;
3373         struct lvfs_run_ctxt saved;
3374         char *logname, *buf;
3375         loff_t soff = 0 , doff = 0;
3376         int count = 4096, len;
3377         int rc = 0;
3378
3379         OBD_ALLOC(logname, PATH_MAX);
3380         if (logname == NULL)
3381                 return -ENOMEM;
3382
3383         OBD_ALLOC(buf, count);
3384         if (!buf)
3385                 GOTO(out , rc = -ENOMEM);
3386
3387         len = snprintf(logname, PATH_MAX, "%s/%s.bak",
3388                        MOUNT_CONFIGS_DIR, fsname);
3389
3390         if (len >= PATH_MAX - 1) {
3391                 GOTO(out, -ENAMETOOLONG);
3392         }
3393
3394         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3395
3396         bak_filp = l_filp_open(logname, O_RDWR|O_CREAT|O_TRUNC, 0660);
3397         if (IS_ERR(bak_filp)) {
3398                 rc = PTR_ERR(bak_filp);
3399                 CERROR("backup logfile open %s: %d\n", logname, rc);
3400                 GOTO(pop, rc);
3401         }
3402         sprintf(logname, "%s/%s", MOUNT_CONFIGS_DIR, fsname);
3403         filp = l_filp_open(logname, O_RDONLY, 0);
3404         if (IS_ERR(filp)) {
3405                 rc = PTR_ERR(filp);
3406                 CERROR("logfile open %s: %d\n", logname, rc);
3407                 GOTO(close1f, rc);
3408         }
3409
3410         while ((rc = lustre_fread(filp, buf, count, &soff)) > 0) {
3411                 rc = lustre_fwrite(bak_filp, buf, count, &doff);
3412                 break;
3413         }
3414
3415         filp_close(filp, 0);
3416 close1f:
3417         filp_close(bak_filp, 0);
3418 pop:
3419         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3420 out:
3421         if (buf)
3422                 OBD_FREE(buf, count);
3423         OBD_FREE(logname, PATH_MAX);
3424         return rc;
3425 }
3426
3427 #endif