Whamcloud - gitweb
b=15699
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 #define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MGS
47 #define D_MGS D_CONFIG /*|D_WARNING*/
48
49 #ifdef __KERNEL__
50 #include <linux/module.h>
51 #include <linux/pagemap.h>
52 #include <linux/fs.h>
53 #endif
54
55 #include <obd.h>
56 #include <obd_lov.h>
57 #include <obd_class.h>
58 #include <lustre_log.h>
59 #include <obd_ost.h>
60 #include <libcfs/list.h>
61 #include <linux/lvfs.h>
62 #include <lustre_fsfilt.h>
63 #include <lustre_disk.h>
64 #include <lustre_param.h>
65 #include <lustre_sec.h>
66 #include "mgs_internal.h"
67
68 static int mgs_get_fsdb_srpc_from_llog(struct obd_device *obd,
69                                        struct fs_db *fsdb);
70 static int mgs_get_srpc_conf_log(struct fs_db *fsdb, const char *tgt,
71                                  enum lustre_sec_part from,
72                                  enum lustre_sec_part to,
73                                  struct sptlrpc_conf_log *log);
74
75 /********************** Class functions ********************/
76
77 /* Caller must list_del and OBD_FREE each dentry from the list */
78 int class_dentry_readdir(struct obd_device *obd, struct dentry *dir,
79                                 struct vfsmount *inmnt,
80                                 struct list_head *dentry_list){
81         /* see mds_cleanup_pending */
82         struct lvfs_run_ctxt saved;
83         struct file *file;
84         struct dentry *dentry;
85         struct vfsmount *mnt;
86         int rc = 0;
87         ENTRY;
88
89         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
90         dentry = dget(dir);
91         if (IS_ERR(dentry))
92                 GOTO(out_pop, rc = PTR_ERR(dentry));
93         mnt = mntget(inmnt);
94         if (IS_ERR(mnt)) {
95                 l_dput(dentry);
96                 GOTO(out_pop, rc = PTR_ERR(mnt));
97         }
98
99         file = dentry_open(dentry, mnt, O_RDONLY);
100         if (IS_ERR(file))
101                 /* dentry_open_it() drops the dentry, mnt refs */
102                 GOTO(out_pop, rc = PTR_ERR(file));
103
104         CFS_INIT_LIST_HEAD(dentry_list);
105         rc = l_readdir(file, dentry_list);
106         filp_close(file, 0);
107         /*  filp_close->fput() drops the dentry, mnt refs */
108
109 out_pop:
110         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
111         RETURN(rc);
112 }
113
114 /******************** DB functions *********************/
115
116 static inline int name_create(char **newname, char *prefix, char *suffix)
117 {
118         LASSERT(newname);
119         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
120         if (!*newname) 
121                 return -ENOMEM;
122         sprintf(*newname, "%s%s", prefix, suffix);
123         return 0;
124 }
125
126 static inline void name_destroy(char **name)
127 {        
128         if (*name)
129                 OBD_FREE(*name, strlen(*name) + 1);
130         *name = NULL;
131 }
132
133 /* from the (client) config log, figure out:
134         1. which ost's/mdt's are configured (by index)
135         2. what the last config step is
136         3. COMPAT_146 lov name
137         4. COMPAT_146 mdt lov name
138         5. COMPAT_146 mdc name 
139 */
140 /* It might be better to have a separate db file, instead of parsing the info
141    out of the client log.  This is slow and potentially error-prone. */
142 static int mgs_fsdb_handler(struct llog_handle *llh, struct llog_rec_hdr *rec, 
143                             void *data)
144 {
145         struct fs_db *fsdb = (struct fs_db *)data;
146         int cfg_len = rec->lrh_len;
147         char *cfg_buf = (char*) (rec + 1);
148         struct lustre_cfg *lcfg;
149         __u32 index;
150         int rc = 0;
151         ENTRY;
152
153         if (rec->lrh_type != OBD_CFG_REC) {
154                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
155                 RETURN(-EINVAL);
156         }
157
158         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
159         if (rc) {
160                 CERROR("Insane cfg\n");
161                 RETURN(rc);
162         }
163
164         lcfg = (struct lustre_cfg *)cfg_buf;
165
166         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
167                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
168
169         /* Figure out ost indicies */
170         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
171         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
172             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
173                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
174                                        NULL, 10);
175                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
176                        lustre_cfg_string(lcfg, 1), index,
177                        lustre_cfg_string(lcfg, 2));
178                 set_bit(index, fsdb->fsdb_ost_index_map);
179         }
180
181         /* Figure out mdt indicies */
182         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
183         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
184             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
185                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
186                                        &index, NULL);
187                 if (rc != LDD_F_SV_TYPE_MDT) {
188                         CWARN("Unparsable MDC name %s, assuming index 0\n",
189                               lustre_cfg_string(lcfg, 0));
190                         index = 0;
191                 }
192                 rc = 0;
193                 CDEBUG(D_MGS, "MDT index is %u\n", index);
194                 set_bit(index, fsdb->fsdb_mdt_index_map);
195         }
196
197         /* COMPAT_146 */
198         /* figure out the old LOV name. fsdb_gen = 0 means old log */
199         /* #01 L attach 0:lov_mdsA 1:lov 2:cdbe9_lov_mdsA_dc8cf7f3bb */
200         if ((fsdb->fsdb_gen == 0) && (lcfg->lcfg_command == LCFG_ATTACH) &&
201             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_LOV_NAME) == 0)) {
202                 fsdb->fsdb_flags |= FSDB_OLDLOG14;
203                 name_destroy(&fsdb->fsdb_clilov);
204                 rc = name_create(&fsdb->fsdb_clilov, 
205                                  lustre_cfg_string(lcfg, 0), "");
206                 if (rc) 
207                         RETURN(rc);
208                 CDEBUG(D_MGS, "client lov name is %s\n", fsdb->fsdb_clilov);
209         }
210
211         /* figure out the old MDT lov name from the MDT uuid */
212         if ((fsdb->fsdb_gen == 0) && (lcfg->lcfg_command == LCFG_SETUP) &&
213             (strncmp(lustre_cfg_string(lcfg, 0), "MDC_", 4) == 0)) {
214                 char *ptr;
215                 fsdb->fsdb_flags |= FSDB_OLDLOG14;
216                 ptr = strstr(lustre_cfg_string(lcfg, 1), "_UUID");
217                 if (!ptr) {
218                         CERROR("Can't parse MDT uuid %s\n", 
219                                lustre_cfg_string(lcfg, 1));
220                         RETURN(-EINVAL);
221                 }
222                 *ptr = '\0';
223                 name_destroy(&fsdb->fsdb_mdtlov);
224                 rc = name_create(&fsdb->fsdb_mdtlov, 
225                                  "lov_", lustre_cfg_string(lcfg, 1));
226                 if (rc) 
227                         RETURN(rc);
228                 name_destroy(&fsdb->fsdb_mdc);
229                 rc = name_create(&fsdb->fsdb_mdc, 
230                                  lustre_cfg_string(lcfg, 0), "");
231                 if (rc) 
232                         RETURN(rc);
233                 CDEBUG(D_MGS, "MDT lov name is %s\n", fsdb->fsdb_mdtlov);
234         }
235         /* end COMPAT_146 */
236
237         /* Keep track of the latest marker step */
238         if (lcfg->lcfg_command == LCFG_MARKER) {
239                 struct cfg_marker *marker;
240                 marker = lustre_cfg_buf(lcfg, 1);
241                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
242         }
243
244         RETURN(rc);
245 }
246
247 /* fsdb->fsdb_sem is already held  in mgs_find_or_make_fsdb*/
248 static int mgs_get_fsdb_from_llog(struct obd_device *obd, struct fs_db *fsdb)
249 {
250         char *logname;
251         struct llog_handle *loghandle;
252         struct lvfs_run_ctxt saved;
253         struct llog_ctxt *ctxt;
254         int rc, rc2;
255         ENTRY;
256
257         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
258         LASSERT(ctxt != NULL);
259         name_create(&logname, fsdb->fsdb_name, "-client");
260         down(&fsdb->fsdb_sem);
261         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
262         rc = llog_create(ctxt, &loghandle, NULL, logname);
263         if (rc)
264                 GOTO(out_pop, rc);
265
266         rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
267         if (rc)
268                 GOTO(out_close, rc);
269
270         if (llog_get_size(loghandle) <= 1)
271                 fsdb->fsdb_flags |= FSDB_LOG_EMPTY;
272
273         rc = llog_process(loghandle, mgs_fsdb_handler, (void *)fsdb, NULL);
274         CDEBUG(D_INFO, "get_db = %d\n", rc);
275 out_close:
276         rc2 = llog_close(loghandle);
277         if (!rc)
278                 rc = rc2;
279 out_pop:
280         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
281         up(&fsdb->fsdb_sem);
282         name_destroy(&logname);
283         llog_ctxt_put(ctxt);
284
285         RETURN(rc);
286 }
287
288 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
289 {
290         struct mgs_tgt_srpc_conf *tgtconf;
291
292         /* free target-specific rules */
293         while (fsdb->fsdb_srpc_tgt) {
294                 tgtconf = fsdb->fsdb_srpc_tgt;
295                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
296
297                 LASSERT(tgtconf->mtsc_tgt);
298
299                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
300                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
301                 OBD_FREE_PTR(tgtconf);
302         }
303
304         /* free general rules */
305         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
306 }
307
308 static struct fs_db *mgs_find_fsdb(struct obd_device *obd, char *fsname)
309 {
310         struct mgs_obd *mgs = &obd->u.mgs;
311         struct fs_db *fsdb;
312         struct list_head *tmp;
313
314         list_for_each(tmp, &mgs->mgs_fs_db_list) {
315                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
316                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
317                         return fsdb;
318         }
319         return NULL;
320 }
321
322 /* caller must hold the mgs->mgs_fs_db_lock */
323 static struct fs_db *mgs_new_fsdb(struct obd_device *obd, char *fsname)
324 {
325         struct mgs_obd *mgs = &obd->u.mgs;
326         struct fs_db *fsdb;
327         int rc;
328         ENTRY;
329
330         OBD_ALLOC_PTR(fsdb);
331         if (!fsdb)
332                 RETURN(NULL);
333
334         OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
335         OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
336         if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
337                 CERROR("No memory for index maps\n");
338                 GOTO(err, 0);
339         }
340
341         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
342         fsdb->fsdb_name[sizeof(fsdb->fsdb_name) - 1] = 0;
343         rc = name_create(&fsdb->fsdb_mdtlov, fsname, "-mdtlov");
344         if (rc) 
345                 GOTO(err, rc);
346         rc = name_create(&fsdb->fsdb_mdtlmv, fsname, "-mdtlmv");
347         if (rc) 
348                 GOTO(err, rc);
349         rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
350         if (rc) 
351                 GOTO(err, rc);
352
353         rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
354         if (rc) 
355                 GOTO(err, rc);
356
357         fsdb->fsdb_srpc_fl_udesc = 1;
358         sema_init(&fsdb->fsdb_sem, 1);
359         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
360         lproc_mgs_add_live(obd, fsdb);
361
362         RETURN(fsdb);
363 err:
364         if (fsdb->fsdb_ost_index_map)
365                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
366         if (fsdb->fsdb_mdt_index_map)
367                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
368         name_destroy(&fsdb->fsdb_clilov);
369         name_destroy(&fsdb->fsdb_clilmv);
370         name_destroy(&fsdb->fsdb_mdtlov); 
371         name_destroy(&fsdb->fsdb_mdtlmv); 
372         OBD_FREE_PTR(fsdb);
373         RETURN(NULL);
374 }
375
376 static void mgs_free_fsdb(struct obd_device *obd, struct fs_db *fsdb)
377 {
378         /* wait for anyone with the sem */
379         down(&fsdb->fsdb_sem);
380         lproc_mgs_del_live(obd, fsdb);
381         list_del(&fsdb->fsdb_list);
382         OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
383         OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
384         name_destroy(&fsdb->fsdb_clilov); 
385         name_destroy(&fsdb->fsdb_clilmv); 
386         name_destroy(&fsdb->fsdb_mdtlov); 
387         name_destroy(&fsdb->fsdb_mdtlmv); 
388         name_destroy(&fsdb->fsdb_mdc); 
389         mgs_free_fsdb_srpc(fsdb);
390         OBD_FREE_PTR(fsdb);
391 }
392
393 int mgs_init_fsdb_list(struct obd_device *obd)
394 {
395         struct mgs_obd *mgs = &obd->u.mgs;
396         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
397         return 0;
398 }
399
400 int mgs_cleanup_fsdb_list(struct obd_device *obd)
401 {
402         struct mgs_obd *mgs = &obd->u.mgs;
403         struct fs_db *fsdb;
404         struct list_head *tmp, *tmp2;
405         down(&mgs->mgs_sem);
406         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
407                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
408                 mgs_free_fsdb(obd, fsdb);
409         }
410         up(&mgs->mgs_sem);
411         return 0;
412 }
413
414 static int mgs_find_or_make_fsdb(struct obd_device *obd, char *name, 
415                                struct fs_db **dbh)
416 {
417         struct mgs_obd *mgs = &obd->u.mgs;
418         struct fs_db *fsdb;
419         int rc = 0;
420
421         down(&mgs->mgs_sem);
422         fsdb = mgs_find_fsdb(obd, name);
423         if (fsdb) {
424                 up(&mgs->mgs_sem);
425                 *dbh = fsdb;
426                 return 0;
427         }
428
429         CDEBUG(D_MGS, "Creating new db\n");
430         fsdb = mgs_new_fsdb(obd, name);
431         up(&mgs->mgs_sem);
432         if (!fsdb)
433                 return -ENOMEM;
434
435         /* populate the db from the client llog */
436         rc = mgs_get_fsdb_from_llog(obd, fsdb);
437         if (rc) {
438                 CERROR("Can't get db from client log %d\n", rc);
439                 mgs_free_fsdb(obd, fsdb);
440                 return rc;
441         }
442
443         /* populate srpc rules from params llog */
444         rc = mgs_get_fsdb_srpc_from_llog(obd, fsdb);
445         if (rc) {
446                 CERROR("Can't get db from params log %d\n", rc);
447                 mgs_free_fsdb(obd, fsdb);
448                 return rc;
449         }
450
451         *dbh = fsdb;
452
453         return 0;
454 }
455
456 /* 1 = index in use
457    0 = index unused
458    -1= empty client log */
459 int mgs_check_index(struct obd_device *obd, struct mgs_target_info *mti)
460 {
461         struct fs_db *fsdb;
462         void *imap;
463         int rc = 0;
464         ENTRY;
465
466         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
467
468         rc = mgs_find_or_make_fsdb(obd, mti->mti_fsname, &fsdb);
469         if (rc) {
470                 CERROR("Can't get db for %s\n", mti->mti_fsname);
471                 RETURN(rc);
472         }
473
474         if (fsdb->fsdb_flags & FSDB_LOG_EMPTY)
475                 RETURN(-1);
476
477         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
478                 imap = fsdb->fsdb_ost_index_map;
479         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
480                 imap = fsdb->fsdb_mdt_index_map;
481         else
482                 RETURN(-EINVAL);
483
484         if (test_bit(mti->mti_stripe_index, imap))
485                 RETURN(1);
486         RETURN(0);
487 }
488
489 static __inline__ int next_index(void *index_map, int map_len)
490 {
491         int i;
492         for (i = 0; i < map_len * 8; i++)
493                  if (!test_bit(i, index_map)) {
494                          return i;
495                  }
496         CERROR("max index %d exceeded.\n", i);
497         return -1;
498 }
499
500 /* Return codes:
501         0  newly marked as in use
502         <0 err
503         +EALREADY for update of an old index */
504 int mgs_set_index(struct obd_device *obd, struct mgs_target_info *mti)
505 {
506         struct fs_db *fsdb;
507         void *imap;
508         int rc = 0;
509         ENTRY;
510
511         rc = mgs_find_or_make_fsdb(obd, mti->mti_fsname, &fsdb);
512         if (rc) {
513                 CERROR("Can't get db for %s\n", mti->mti_fsname);
514                 RETURN(rc);
515         }
516
517         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
518                 imap = fsdb->fsdb_ost_index_map;
519         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
520                 imap = fsdb->fsdb_mdt_index_map;
521         else
522                 RETURN(-EINVAL);
523
524         if (mti->mti_flags & LDD_F_NEED_INDEX) {
525                 rc = next_index(imap, INDEX_MAP_SIZE);
526                 if (rc == -1)
527                         RETURN(-ERANGE);
528                 mti->mti_stripe_index = rc;
529         }
530
531         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
532                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
533                                    "but the max index is %d.\n",
534                                    mti->mti_svname, mti->mti_stripe_index,
535                                    INDEX_MAP_SIZE * 8);
536                 RETURN(-ERANGE);
537         }
538
539         if (test_bit(mti->mti_stripe_index, imap)) {
540                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
541                     !(mti->mti_flags & LDD_F_WRITECONF)) {
542                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
543                                            "%d, but that index is already in "
544                                            "use. Use --writeconf to force\n",
545                                            mti->mti_svname,
546                                            mti->mti_stripe_index);
547                         RETURN(-EADDRINUSE);
548                 } else {
549                         CDEBUG(D_MGS, "Server %s updating index %d\n",
550                                mti->mti_svname, mti->mti_stripe_index);
551                         RETURN(EALREADY);
552                 }
553         }
554
555         set_bit(mti->mti_stripe_index, imap);
556         fsdb->fsdb_flags &= ~FSDB_LOG_EMPTY;
557         server_make_name(mti->mti_flags, mti->mti_stripe_index,
558                          mti->mti_fsname, mti->mti_svname);
559
560         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
561                mti->mti_stripe_index);
562
563         RETURN(0);
564 }
565
566 struct mgs_modify_lookup {
567         struct cfg_marker mml_marker;
568         int               mml_modified;
569 };
570
571 static int mgs_modify_handler(struct llog_handle *llh, struct llog_rec_hdr *rec, 
572                               void *data)
573 {
574         struct mgs_modify_lookup *mml = (struct mgs_modify_lookup *)data;
575         struct cfg_marker *marker;
576         struct lustre_cfg *lcfg = (struct lustre_cfg *)(rec + 1);
577         int cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) - 
578                 sizeof(struct llog_rec_tail);
579         int rc;
580         ENTRY;
581
582         if (rec->lrh_type != OBD_CFG_REC) {
583                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
584                 RETURN(-EINVAL);
585         }
586
587         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
588         if (rc) {
589                 CERROR("Insane cfg\n");
590                 RETURN(rc);
591         }
592
593         /* We only care about markers */
594         if (lcfg->lcfg_command != LCFG_MARKER)
595                 RETURN(0); 
596         
597         marker = lustre_cfg_buf(lcfg, 1);
598         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) && 
599             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
600             !(marker->cm_flags & CM_SKIP)) {
601                 /* Found a non-skipped marker match */
602                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
603                        rec->lrh_index, marker->cm_step, 
604                        marker->cm_flags, mml->mml_marker.cm_flags,
605                        marker->cm_tgtname, marker->cm_comment);
606                 /* Overwrite the old marker llog entry */
607                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
608                 marker->cm_flags |= mml->mml_marker.cm_flags;
609                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
610                 /* Header and tail are added back to lrh_len in 
611                    llog_lvfs_write_rec */
612                 rec->lrh_len = cfg_len; 
613                 rc = llog_write_rec(llh, rec, NULL, 0, (void *)lcfg, 
614                                     rec->lrh_index);
615                 if (!rc) 
616                          mml->mml_modified++;
617         }
618
619         RETURN(rc);
620 }
621
622 /* Modify an existing config log record (for CM_SKIP or CM_EXCLUDE) */
623 static int mgs_modify(struct obd_device *obd, struct fs_db *fsdb,
624                       struct mgs_target_info *mti, char *logname, 
625                       char *devname, char *comment, int flags)
626 {
627         struct llog_handle *loghandle;
628         struct lvfs_run_ctxt saved;
629         struct llog_ctxt *ctxt;
630         struct mgs_modify_lookup *mml;
631         int rc, rc2;
632         ENTRY;
633
634         CDEBUG(D_MGS, "modify %s/%s/%s\n", logname, devname, comment);
635
636         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
637         
638         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
639         LASSERT(ctxt != NULL);
640         rc = llog_create(ctxt, &loghandle, NULL, logname);
641         if (rc)
642                 GOTO(out_pop, rc);
643
644         rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
645         if (rc)
646                 GOTO(out_close, rc);
647
648         if (llog_get_size(loghandle) <= 1)
649                 GOTO(out_close, rc = 0);
650
651         OBD_ALLOC_PTR(mml);
652         if (!mml) 
653                 GOTO(out_close, rc = -ENOMEM);
654         strcpy(mml->mml_marker.cm_comment, comment);
655         strcpy(mml->mml_marker.cm_tgtname, devname);
656         /* Modify mostly means cancel */
657         mml->mml_marker.cm_flags = flags;
658         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
659         mml->mml_modified = 0;
660         rc = llog_process(loghandle, mgs_modify_handler, (void *)mml, NULL);
661         if (!rc && !mml->mml_modified) 
662                 rc = -ENODEV;
663         OBD_FREE_PTR(mml);
664
665 out_close:
666         rc2 = llog_close(loghandle);
667         if (!rc)
668                 rc = rc2;
669 out_pop:
670         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
671         if (rc && rc != -ENODEV) 
672                 CERROR("modify %s/%s failed %d\n",
673                        mti->mti_svname, comment, rc);
674         llog_ctxt_put(ctxt);
675         RETURN(rc);
676 }
677
678 /******************** config log recording functions *********************/
679
680 static int record_lcfg(struct obd_device *obd, struct llog_handle *llh,
681                          struct lustre_cfg *lcfg)
682 {
683         struct lvfs_run_ctxt   saved;
684         struct llog_rec_hdr    rec;
685         int buflen, rc;
686
687         if (!lcfg || !llh) 
688                 return -ENOMEM;
689
690         LASSERT(llh->lgh_ctxt);        
691
692         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
693                                 lcfg->lcfg_buflens);
694         rec.lrh_len = llog_data_len(buflen);
695         rec.lrh_type = OBD_CFG_REC;
696
697         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
698         /* idx = -1 means append */
699         rc = llog_write_rec(llh, &rec, NULL, 0, (void *)lcfg, -1);
700         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
701         if (rc) 
702                 CERROR("failed %d\n", rc);
703         return rc;
704 }
705
706 static int record_base(struct obd_device *obd, struct llog_handle *llh,
707                      char *cfgname, lnet_nid_t nid, int cmd,
708                      char *s1, char *s2, char *s3, char *s4)
709 {
710         struct lustre_cfg_bufs bufs;
711         struct lustre_cfg     *lcfg;
712         int rc;
713
714         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
715                cmd, s1, s2, s3, s4);
716
717         lustre_cfg_bufs_reset(&bufs, cfgname);
718         if (s1)
719                 lustre_cfg_bufs_set_string(&bufs, 1, s1);
720         if (s2)
721                 lustre_cfg_bufs_set_string(&bufs, 2, s2);
722         if (s3)
723                 lustre_cfg_bufs_set_string(&bufs, 3, s3);
724         if (s4)
725                 lustre_cfg_bufs_set_string(&bufs, 4, s4);
726
727         lcfg = lustre_cfg_new(cmd, &bufs);
728         if (!lcfg) 
729                 return -ENOMEM;
730         lcfg->lcfg_nid = nid;
731
732         rc = record_lcfg(obd, llh, lcfg);
733
734         lustre_cfg_free(lcfg);
735
736         if (rc) {
737                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
738                        cmd, s1, s2, s3, s4);
739         }
740         return(rc);
741 }
742
743
744 static inline int record_add_uuid(struct obd_device *obd,
745                                   struct llog_handle *llh,
746                                   uint64_t nid, char *uuid)
747 {
748         return record_base(obd,llh,NULL,nid,LCFG_ADD_UUID,uuid,0,0,0);
749
750 }
751
752 static inline int record_add_conn(struct obd_device *obd,
753                                   struct llog_handle *llh,
754                                   char *devname,
755                                   char *uuid)
756 {
757         return record_base(obd,llh,devname,0,LCFG_ADD_CONN,uuid,0,0,0);
758 }
759
760 static inline int record_attach(struct obd_device *obd, struct llog_handle *llh,
761                                 char *devname, char *type, char *uuid)
762 {
763         return record_base(obd,llh,devname,0,LCFG_ATTACH,type,uuid,0,0);
764 }
765
766 static inline int record_setup(struct obd_device *obd, struct llog_handle *llh,
767                                char *devname,
768                                char *s1, char *s2, char *s3, char *s4)
769 {
770         return record_base(obd,llh,devname,0,LCFG_SETUP,s1,s2,s3,s4);
771 }
772
773 static inline int record_sptlrpc_conf(struct obd_device *obd,
774                                       struct llog_handle *llh,
775                                       char *devname,
776                                       struct sptlrpc_conf_log *srpc_log)
777 {
778         struct lustre_cfg_bufs bufs;
779         struct lustre_cfg *lcfg;
780         int rc;
781
782         lustre_cfg_bufs_reset(&bufs, devname);
783         lustre_cfg_bufs_set(&bufs, 1, srpc_log, sizeof(*srpc_log));
784         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &bufs);
785
786         rc = record_lcfg(obd, llh, lcfg);
787
788         lustre_cfg_free(lcfg);
789         return rc;
790 }
791
792 static int record_lov_setup(struct obd_device *obd, struct llog_handle *llh,
793                             char *devname, struct lov_desc *desc)
794 {
795         struct lustre_cfg_bufs bufs;
796         struct lustre_cfg *lcfg;
797         int rc;
798
799         lustre_cfg_bufs_reset(&bufs, devname);
800         lustre_cfg_bufs_set(&bufs, 1, desc, sizeof(*desc));
801         lcfg = lustre_cfg_new(LCFG_SETUP, &bufs);
802         if (!lcfg) 
803                 return -ENOMEM;
804         rc = record_lcfg(obd, llh, lcfg);
805
806         lustre_cfg_free(lcfg);
807         return rc;
808 }
809
810 static int record_lmv_setup(struct obd_device *obd, struct llog_handle *llh,
811                             char *devname, struct lmv_desc *desc)
812 {
813         struct lustre_cfg_bufs bufs;
814         struct lustre_cfg *lcfg;
815         int rc;
816
817         lustre_cfg_bufs_reset(&bufs, devname);
818         lustre_cfg_bufs_set(&bufs, 1, desc, sizeof(*desc));
819         lcfg = lustre_cfg_new(LCFG_SETUP, &bufs);
820
821         rc = record_lcfg(obd, llh, lcfg);
822
823         lustre_cfg_free(lcfg);
824         return rc;
825 }
826
827 static inline int record_mdc_add(struct obd_device *obd,
828                                  struct llog_handle *llh,
829                                  char *logname, char *mdcuuid,
830                                  char *mdtuuid, char *index,
831                                  char *gen)
832 {
833         return record_base(obd,llh,logname,0,LCFG_ADD_MDC,
834                            mdtuuid,index,gen,mdcuuid);
835 }
836
837 static inline int record_lov_add(struct obd_device *obd,
838                                  struct llog_handle *llh,
839                                  char *lov_name, char *ost_uuid,
840                                  char *index, char *gen)
841 {
842         return record_base(obd,llh,lov_name,0,LCFG_LOV_ADD_OBD,
843                            ost_uuid,index,gen,0);
844 }
845
846 static inline int record_mount_opt(struct obd_device *obd,
847                                    struct llog_handle *llh,
848                                    char *profile, char *lov_name,
849                                    char *mdc_name)
850 {
851         return record_base(obd,llh,NULL,0,LCFG_MOUNTOPT,
852                            profile,lov_name,mdc_name,0);
853 }
854
855 static int record_marker(struct obd_device *obd, struct llog_handle *llh,
856                          struct fs_db *fsdb, __u32 flags,
857                          char *tgtname, char *comment)
858 {
859         struct cfg_marker marker;
860         struct lustre_cfg_bufs bufs;
861         struct lustre_cfg *lcfg;
862         int rc;
863
864         if (flags & CM_START)
865                 fsdb->fsdb_gen++;
866         marker.cm_step = fsdb->fsdb_gen;
867         marker.cm_flags = flags;
868         marker.cm_vers = LUSTRE_VERSION_CODE;
869         strncpy(marker.cm_tgtname, tgtname, sizeof(marker.cm_tgtname)); 
870         strncpy(marker.cm_comment, comment, sizeof(marker.cm_comment)); 
871         marker.cm_createtime = cfs_time_current_sec();
872         marker.cm_canceltime = 0;
873         lustre_cfg_bufs_reset(&bufs, NULL);
874         lustre_cfg_bufs_set(&bufs, 1, &marker, sizeof(marker));
875         lcfg = lustre_cfg_new(LCFG_MARKER, &bufs);
876         if (!lcfg) 
877                 return -ENOMEM;
878         rc = record_lcfg(obd, llh, lcfg);
879
880         lustre_cfg_free(lcfg);
881         return rc;
882 }
883
884 static int record_start_log(struct obd_device *obd,
885                             struct llog_handle **llh, char *name)
886 {
887         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
888         struct lvfs_run_ctxt saved;
889         struct llog_ctxt *ctxt;
890         int rc = 0;
891
892         if (*llh) 
893                 GOTO(out, rc = -EBUSY);
894
895         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
896         if (!ctxt)
897                 GOTO(out, rc = -ENODEV);
898
899         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
900         rc = llog_create(ctxt, llh, NULL, name);
901         if (rc == 0)
902                 llog_init_handle(*llh, LLOG_F_IS_PLAIN, &cfg_uuid);
903         else
904                 *llh = NULL;
905
906         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
907         llog_ctxt_put(ctxt);
908
909 out:
910         if (rc) {
911                 CERROR("Can't start log %s: %d\n", name, rc);
912         }
913         RETURN(rc);
914 }
915
916 static int record_end_log(struct obd_device *obd, struct llog_handle **llh)
917 {
918         struct lvfs_run_ctxt saved;
919         int rc = 0;
920
921         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
922
923         rc = llog_close(*llh);
924         *llh = NULL;
925
926         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
927         RETURN(rc);
928 }
929
930 static int mgs_log_is_empty(struct obd_device *obd, char *name)
931 {
932         struct lvfs_run_ctxt saved;
933         struct llog_handle *llh;
934         struct llog_ctxt *ctxt;
935         int rc = 0;
936
937         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
938         LASSERT(ctxt != NULL);
939         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
940         rc = llog_create(ctxt, &llh, NULL, name);
941         if (rc == 0) {
942                 llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
943                 rc = llog_get_size(llh);
944                 llog_close(llh);
945         }
946         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
947         llog_ctxt_put(ctxt);
948         /* header is record 1 */
949         return(rc <= 1);
950 }
951
952 /******************** config "macros" *********************/
953
954 /* write an lcfg directly into a log (with markers) */
955 static int mgs_write_log_direct(struct obd_device *obd, struct fs_db *fsdb,
956                                 char *logname, struct lustre_cfg *lcfg, 
957                                 char *devname, char *comment)
958 {
959         struct llog_handle *llh = NULL;
960         int rc;
961         ENTRY;
962
963         if (!lcfg) 
964                 RETURN(-ENOMEM);
965
966         rc = record_start_log(obd, &llh, logname);
967         if (rc) 
968                 RETURN(rc);
969
970         /* FIXME These should be a single journal transaction */
971         rc = record_marker(obd, llh, fsdb, CM_START, devname, comment); 
972         
973         rc = record_lcfg(obd, llh, lcfg);
974
975         rc = record_marker(obd, llh, fsdb, CM_END, devname, comment);
976         rc = record_end_log(obd, &llh);
977
978         RETURN(rc);
979 }
980
981 /* write the lcfg in all logs for the given fs */
982 int mgs_write_log_direct_all(struct obd_device *obd, struct fs_db *fsdb,
983                              struct mgs_target_info *mti, 
984                              struct lustre_cfg *lcfg,
985                              char *devname, char *comment)
986 {
987         struct mgs_obd *mgs = &obd->u.mgs;
988         struct list_head dentry_list;
989         struct l_linux_dirent *dirent, *n;
990         char *fsname = mti->mti_fsname;
991         char *logname;
992         int rc = 0, len = strlen(fsname);
993         ENTRY;
994         
995         /* We need to set params for any future logs 
996            as well. FIXME Append this file to every new log. 
997            Actually, we should store as params (text), not llogs.  Or
998            in a database. */
999         name_create(&logname, fsname, "-params");
1000         if (mgs_log_is_empty(obd, logname)) {
1001                 struct llog_handle *llh = NULL;
1002                 rc = record_start_log(obd, &llh, logname);
1003                 record_end_log(obd, &llh);
1004         }
1005         name_destroy(&logname);
1006         if (rc) 
1007                 RETURN(rc);
1008
1009         /* Find all the logs in the CONFIGS directory */
1010         rc = class_dentry_readdir(obd, mgs->mgs_configs_dir,
1011                                   mgs->mgs_vfsmnt, &dentry_list);
1012         if (rc) {
1013                 CERROR("Can't read %s dir\n", MOUNT_CONFIGS_DIR);
1014                 RETURN(rc);
1015         }
1016
1017         /* Could use fsdb index maps instead of directory listing */
1018         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
1019                 list_del(&dirent->lld_list);
1020                 /* don't write to sptlrpc rule log */
1021                 if (strncmp(fsname, dirent->lld_name, len) == 0 &&
1022                     strstr(dirent->lld_name, "-sptlrpc") == NULL) {
1023                         CDEBUG(D_MGS, "Changing log %s\n", dirent->lld_name);
1024                         /* Erase any old settings of this same parameter */
1025                         mgs_modify(obd, fsdb, mti, dirent->lld_name, devname, 
1026                                    comment, CM_SKIP);
1027                         /* Write the new one */
1028                         rc = mgs_write_log_direct(obd, fsdb, dirent->lld_name,
1029                                                   lcfg, devname, comment);
1030                         if (rc)
1031                                 CERROR("err %d writing log %s\n", rc, 
1032                                        dirent->lld_name);
1033                 }
1034                 OBD_FREE(dirent, sizeof(*dirent));
1035         }
1036
1037         RETURN(rc);
1038 }
1039
1040 struct temp_comp
1041 {
1042         struct mgs_target_info   *comp_tmti;
1043         struct mgs_target_info   *comp_mti;
1044         struct fs_db             *comp_fsdb;
1045         struct obd_device        *comp_obd;
1046 };
1047
1048 static int mgs_write_log_mdc_to_mdt(struct obd_device *, struct fs_db *,
1049                                     struct mgs_target_info *, char *);
1050
1051 static int mgs_steal_llog_handler(struct llog_handle *llh,
1052                                   struct llog_rec_hdr *rec,
1053                                   void *data)
1054 {
1055         struct obd_device * obd;
1056         struct mgs_target_info *mti, *tmti;
1057         struct fs_db *fsdb;
1058         int cfg_len = rec->lrh_len;
1059         char *cfg_buf = (char*) (rec + 1);
1060         struct lustre_cfg *lcfg;
1061         int rc = 0;
1062         struct llog_handle *mdt_llh = NULL;
1063         static int got_an_osc_or_mdc = 0;
1064         /* 0: not found any osc/mdc;
1065            1: found osc;
1066            2: found mdc;
1067         */
1068         static int last_step = -1;
1069
1070         ENTRY;
1071
1072         mti = ((struct temp_comp*)data)->comp_mti;
1073         tmti = ((struct temp_comp*)data)->comp_tmti;
1074         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1075         obd = ((struct temp_comp*)data)->comp_obd;
1076
1077         if (rec->lrh_type != OBD_CFG_REC) {
1078                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1079                 RETURN(-EINVAL);
1080         }
1081
1082         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1083         if (rc) {
1084                 CERROR("Insane cfg\n");
1085                 RETURN(rc);
1086         }
1087
1088         lcfg = (struct lustre_cfg *)cfg_buf;
1089
1090         if (lcfg->lcfg_command == LCFG_MARKER) {
1091                 struct cfg_marker *marker;
1092                 marker = lustre_cfg_buf(lcfg, 1);
1093                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1094                     (marker->cm_flags & CM_START)){
1095                         got_an_osc_or_mdc = 1;
1096                         rc = record_start_log(obd, &mdt_llh, mti->mti_svname);
1097                         rc = record_marker(obd, mdt_llh, fsdb, CM_START,
1098                                            mti->mti_svname,"add osc(copied)");
1099                         rc = record_end_log(obd, &mdt_llh);
1100                         last_step = marker->cm_step;
1101                         RETURN(rc);
1102                 }
1103                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1104                     (marker->cm_flags & CM_END)){
1105                         LASSERT(last_step == marker->cm_step);
1106                         last_step = -1;
1107                         got_an_osc_or_mdc = 0;
1108                         rc = record_start_log(obd, &mdt_llh, mti->mti_svname);
1109                         rc = record_marker(obd, mdt_llh, fsdb, CM_END,
1110                                            mti->mti_svname,"add osc(copied)");
1111                         rc = record_end_log(obd, &mdt_llh);
1112                         RETURN(rc);
1113                 }
1114                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1115                     (marker->cm_flags & CM_START)){
1116                         got_an_osc_or_mdc = 2;
1117                         last_step = marker->cm_step;
1118                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1119                                strlen(marker->cm_tgtname));
1120
1121                         RETURN(rc);
1122                 }
1123                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1124                     (marker->cm_flags & CM_END)){
1125                         LASSERT(last_step == marker->cm_step);
1126                         last_step = -1;
1127                         got_an_osc_or_mdc = 0;
1128                         RETURN(rc);
1129                 }
1130         }
1131
1132         if (got_an_osc_or_mdc == 0 || last_step < 0)
1133                 RETURN(rc);
1134         
1135         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1136                 uint64_t nodenid;
1137                 nodenid = lcfg->lcfg_nid;
1138                 
1139                 tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1140                 tmti->mti_nid_count++;
1141
1142                 RETURN(rc);
1143         }
1144
1145         if (lcfg->lcfg_command == LCFG_SETUP) {
1146                 char *target;
1147
1148                 target = lustre_cfg_string(lcfg, 1);
1149                 memcpy(tmti->mti_uuid, target, strlen(target));
1150                 RETURN(rc);
1151         }
1152
1153         /* ignore client side sptlrpc_conf_log */
1154         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1155                 RETURN(rc);
1156
1157         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1158                 int index;
1159
1160                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1161                         RETURN (-EINVAL);
1162
1163                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1164                        strlen(mti->mti_fsname));
1165                 tmti->mti_stripe_index = index;
1166
1167                 mgs_write_log_mdc_to_mdt(obd, fsdb, tmti, mti->mti_svname);
1168                 memset(tmti, 0, sizeof(*tmti));
1169                 RETURN(rc);
1170         }
1171         RETURN(rc);
1172 }
1173
1174 /* fsdb->fsdb_sem is already held  in mgs_write_log_target*/
1175 /* stealed from mgs_get_fsdb_from_llog*/
1176 static int mgs_steal_llog_for_mdt_from_client(struct obd_device *obd,
1177                                               char *client_name,
1178                                               struct temp_comp* comp)
1179 {
1180         struct llog_handle *loghandle;
1181         struct lvfs_run_ctxt saved;
1182         struct mgs_target_info *tmti;
1183         struct llog_ctxt *ctxt;
1184         int rc, rc2;
1185         ENTRY;
1186
1187         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
1188         LASSERT(ctxt != NULL);
1189
1190         OBD_ALLOC_PTR(tmti);
1191         if (tmti == NULL)
1192                 RETURN(-ENOMEM);
1193
1194         comp->comp_tmti = tmti;
1195         comp->comp_obd = obd;
1196
1197         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1198
1199         rc = llog_create(ctxt, &loghandle, NULL, client_name);
1200         if (rc)
1201                 GOTO(out_pop, rc);
1202
1203         rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
1204         if (rc)
1205                 GOTO(out_close, rc);
1206
1207         rc = llog_process(loghandle, mgs_steal_llog_handler, (void *)comp, NULL);
1208         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1209 out_close:
1210         rc2 = llog_close(loghandle);
1211         if (!rc)
1212                 rc = rc2;
1213 out_pop:
1214         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1215         OBD_FREE_PTR(tmti);
1216         llog_ctxt_put(ctxt);
1217         RETURN(rc);
1218 }
1219
1220 /* lmv is the second thing for client logs */
1221 /* copied from mgs_write_log_lov. Please refer to that.  */
1222 static int mgs_write_log_lmv(struct obd_device *obd, struct fs_db *fsdb,
1223                              struct mgs_target_info *mti,
1224                              char *logname, char *lmvname)
1225 {
1226         struct llog_handle *llh = NULL;
1227         struct lmv_desc *lmvdesc;
1228         char *uuid;
1229         int rc = 0;
1230         ENTRY;
1231
1232         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1233
1234         OBD_ALLOC_PTR(lmvdesc);
1235         if (lmvdesc == NULL)
1236                 RETURN(-ENOMEM);
1237         lmvdesc->ld_active_tgt_count = 0;
1238         lmvdesc->ld_tgt_count = 0;
1239         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1240         uuid = (char *)lmvdesc->ld_uuid.uuid;
1241
1242         rc = record_start_log(obd, &llh, logname);
1243         rc = record_marker(obd, llh, fsdb, CM_START, lmvname, "lmv setup");
1244         rc = record_attach(obd, llh, lmvname, "lmv", uuid);
1245         rc = record_lmv_setup(obd, llh, lmvname, lmvdesc);
1246         rc = record_marker(obd, llh, fsdb, CM_END, lmvname, "lmv setup");
1247         rc = record_end_log(obd, &llh);
1248
1249         OBD_FREE_PTR(lmvdesc);
1250         RETURN(rc);
1251 }
1252
1253 /* lov is the first thing in the mdt and client logs */
1254 static int mgs_write_log_lov(struct obd_device *obd, struct fs_db *fsdb,
1255                              struct mgs_target_info *mti,
1256                              char *logname, char *lovname)
1257 {
1258         struct llog_handle *llh = NULL;
1259         struct lov_desc *lovdesc;
1260         char *uuid;
1261         int rc = 0;
1262         ENTRY;
1263
1264         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1265
1266         /*
1267         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1268         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1269               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1270         */
1271
1272         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1273         OBD_ALLOC_PTR(lovdesc);
1274         if (lovdesc == NULL)
1275                 RETURN(-ENOMEM);
1276         lovdesc->ld_magic = LOV_DESC_MAGIC;
1277         lovdesc->ld_tgt_count = 0;
1278         /* Defaults.  Can be changed later by lcfg config_param */
1279         lovdesc->ld_default_stripe_count = 1;
1280         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1281         lovdesc->ld_default_stripe_size = 1024 * 1024;
1282         lovdesc->ld_default_stripe_offset = 0;
1283         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1284         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1285         /* can these be the same? */
1286         uuid = (char *)lovdesc->ld_uuid.uuid;
1287
1288         /* This should always be the first entry in a log.
1289         rc = mgs_clear_log(obd, logname); */
1290         rc = record_start_log(obd, &llh, logname);
1291         if (rc) 
1292                 GOTO(out, rc);
1293         /* FIXME these should be a single journal transaction */
1294         rc = record_marker(obd, llh, fsdb, CM_START, lovname, "lov setup"); 
1295         rc = record_attach(obd, llh, lovname, "lov", uuid);
1296         rc = record_lov_setup(obd, llh, lovname, lovdesc);
1297         rc = record_marker(obd, llh, fsdb, CM_END, lovname, "lov setup");
1298         rc = record_end_log(obd, &llh);
1299
1300         EXIT;
1301 out:
1302         OBD_FREE_PTR(lovdesc);
1303         return rc;
1304 }
1305
1306 /* add failnids to open log */
1307 static int mgs_write_log_failnids(struct obd_device *obd,
1308                                   struct mgs_target_info *mti,
1309                                   struct llog_handle *llh,
1310                                   char *cliname)
1311 {
1312         char *failnodeuuid = NULL;
1313         char *ptr = mti->mti_params;
1314         lnet_nid_t nid;
1315         int rc = 0;
1316
1317         /*
1318         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1319         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1320         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1321         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1322         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1323         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1324         */
1325
1326         /* Pull failnid info out of params string */
1327         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1328                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1329                         if (failnodeuuid == NULL) {
1330                                 /* We don't know the failover node name,
1331                                    so just use the first nid as the uuid */
1332                                 rc = name_create(&failnodeuuid,
1333                                                  libcfs_nid2str(nid), "");
1334                                 if (rc) 
1335                                         return rc;
1336                         }
1337                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1338                                "client %s\n", libcfs_nid2str(nid),
1339                                failnodeuuid, cliname);
1340                         rc = record_add_uuid(obd, llh, nid, failnodeuuid);
1341                 }
1342                 if (failnodeuuid) {
1343                         rc = record_add_conn(obd, llh, cliname, failnodeuuid);
1344                         name_destroy(&failnodeuuid);
1345                         failnodeuuid = NULL;
1346                 }
1347         }
1348
1349         return rc;
1350 }
1351
1352 static int mgs_write_log_mdc_to_lmv(struct obd_device *obd, struct fs_db *fsdb,
1353                                     struct mgs_target_info *mti,
1354                                     char *logname, char *lmvname)
1355 {
1356         struct llog_handle *llh = NULL;
1357         struct sptlrpc_conf_log *srpc_log;
1358         char *mdcname, *nodeuuid, *mdcuuid, *lmvuuid;
1359         char index[5];
1360         int i, rc;
1361         ENTRY;
1362         
1363         if (mgs_log_is_empty(obd, logname)) {
1364                 CERROR("log is empty! Logical error\n");
1365                 RETURN(-EINVAL);
1366         }
1367
1368         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1369                mti->mti_svname, logname, lmvname);
1370
1371         srpc_log = sptlrpc_conf_log_alloc();
1372         if (IS_ERR(srpc_log))
1373                 RETURN(PTR_ERR(srpc_log));
1374         srpc_log->scl_part = LUSTRE_SP_CLI;
1375
1376         rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
1377                                    LUSTRE_SP_CLI, LUSTRE_SP_MDT, srpc_log);
1378         if (rc)
1379                 goto out_srpc;
1380
1381         name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1382         name_create(&mdcname, mti->mti_svname, "-mdc");
1383         name_create(&mdcuuid, mdcname, "_UUID");
1384         name_create(&lmvuuid, lmvname, "_UUID");
1385
1386         rc = record_start_log(obd, &llh, logname);
1387         rc = record_marker(obd, llh, fsdb, CM_START, mti->mti_svname,
1388                            "add mdc");
1389
1390         for (i = 0; i < mti->mti_nid_count; i++) {
1391                 CDEBUG(D_MGS, "add nid %s for mdt\n", 
1392                        libcfs_nid2str(mti->mti_nids[i]));
1393                        
1394                 rc = record_add_uuid(obd, llh, mti->mti_nids[i], nodeuuid);
1395         }
1396
1397         rc = record_attach(obd, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1398         rc = record_setup(obd, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1399         rc = record_sptlrpc_conf(obd, llh, mdcname, srpc_log);
1400         rc = mgs_write_log_failnids(obd, mti, llh, mdcname);
1401         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1402         rc = record_mdc_add(obd, llh, lmvname, mdcuuid, mti->mti_uuid,
1403                             index, "1");
1404         rc = record_marker(obd, llh, fsdb, CM_END, mti->mti_svname,
1405                            "add mdc"); 
1406         rc = record_end_log(obd, &llh);
1407
1408         name_destroy(&lmvuuid);
1409         name_destroy(&mdcuuid);
1410         name_destroy(&mdcname);
1411         name_destroy(&nodeuuid);
1412 out_srpc:
1413         sptlrpc_conf_log_free(srpc_log);
1414         RETURN(rc);
1415 }
1416
1417 /* add new mdc to already existent MDS */
1418 static int mgs_write_log_mdc_to_mdt(struct obd_device *obd, struct fs_db *fsdb,
1419                                     struct mgs_target_info *mti, char *logname)
1420 {
1421         struct llog_handle *llh = NULL;
1422         struct sptlrpc_conf_log *srpc_log;
1423         char *nodeuuid, *mdcname, *mdcuuid, *mdtuuid;
1424         int idx = mti->mti_stripe_index;
1425         char index[9];
1426         int i, rc;
1427
1428         ENTRY;
1429         if (mgs_log_is_empty(obd, mti->mti_svname)) {
1430                 CERROR("log is empty! Logical error\n");
1431                 RETURN (-EINVAL);
1432         }
1433
1434         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
1435
1436         srpc_log = sptlrpc_conf_log_alloc();
1437         if (IS_ERR(srpc_log))
1438                 RETURN(PTR_ERR(srpc_log));
1439         srpc_log->scl_part = LUSTRE_SP_MDT;
1440
1441         rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
1442                                    LUSTRE_SP_MDT, LUSTRE_SP_MDT, srpc_log);
1443         if (rc)
1444                 goto out_srpc;
1445
1446         name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1447         snprintf(index, sizeof(index), "-mdc%04x", idx);
1448         name_create(&mdcname, logname, index);
1449         name_create(&mdcuuid, mdcname, "_UUID");
1450         name_create(&mdtuuid, logname, "_UUID");
1451
1452         rc = record_start_log(obd, &llh, logname);
1453         rc = record_marker(obd, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
1454         for (i = 0; i < mti->mti_nid_count; i++) {
1455                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1456                        libcfs_nid2str(mti->mti_nids[i]));
1457                 rc = record_add_uuid(obd, llh, mti->mti_nids[i], nodeuuid);
1458         }
1459         rc = record_attach(obd, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
1460         rc = record_setup(obd, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1461         rc = record_sptlrpc_conf(obd, llh, mdcname, srpc_log);
1462         rc = mgs_write_log_failnids(obd, mti, llh, mdcname);
1463         snprintf(index, sizeof(index), "%d", idx);
1464
1465         rc = record_mdc_add(obd, llh, logname, mdcuuid, mti->mti_uuid,
1466                             index, "1");
1467         rc = record_marker(obd, llh, fsdb, CM_END, mti->mti_svname, "add mdc"); 
1468         rc = record_end_log(obd, &llh);
1469
1470         name_destroy(&mdcuuid);
1471         name_destroy(&mdcname);
1472         name_destroy(&nodeuuid);
1473         name_destroy(&mdtuuid);
1474 out_srpc:
1475         sptlrpc_conf_log_free(srpc_log);
1476         RETURN(rc);
1477 }
1478
1479 static int mgs_write_log_mdt0(struct obd_device *obd, struct fs_db *fsdb,
1480                               struct mgs_target_info *mti)
1481 {
1482         char *log = mti->mti_svname;
1483         struct llog_handle *llh = NULL;
1484         char *uuid, *lovname;
1485         char mdt_index[5];
1486         struct sptlrpc_conf_log *srpc_log;
1487         char *ptr = mti->mti_params;
1488         int rc = 0, failout = 0;
1489         ENTRY;
1490
1491         srpc_log = sptlrpc_conf_log_alloc();
1492         if (IS_ERR(srpc_log))
1493                 RETURN(PTR_ERR(srpc_log));
1494         srpc_log->scl_part = LUSTRE_SP_MDT;
1495
1496         rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
1497                                    LUSTRE_SP_ANY, LUSTRE_SP_MDT, srpc_log);
1498         if (rc)
1499                 GOTO(out_srpc, rc);
1500
1501         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
1502         if (uuid == NULL)
1503                 GOTO(out_srpc, rc = -ENOMEM);
1504
1505         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0) 
1506                 failout = (strncmp(ptr, "failout", 7) == 0);
1507
1508         name_create(&lovname, log, "-mdtlov");
1509         if (mgs_log_is_empty(obd, log))
1510                 rc = mgs_write_log_lov(obd, fsdb, mti, log, lovname);
1511
1512         sprintf(uuid, "%s_UUID", log);
1513         sprintf(mdt_index,"%d",mti->mti_stripe_index);        
1514
1515         /* add MDT itself */
1516         rc = record_start_log(obd, &llh, log);
1517         if (rc) 
1518                 GOTO(out, rc);
1519         
1520         /* FIXME this whole fn should be a single journal transaction */
1521         rc = record_marker(obd, llh, fsdb, CM_START, log, "add mdt");
1522         rc = record_attach(obd, llh, log, LUSTRE_MDT_NAME, uuid);
1523         rc = record_mount_opt(obd, llh, log, lovname, NULL);
1524         rc = record_setup(obd, llh, log, uuid, mdt_index, lovname, 
1525                         failout ? "n" : "f");
1526         rc = record_sptlrpc_conf(obd, llh, log, srpc_log);
1527         rc = record_marker(obd, llh, fsdb, CM_END, log, "add mdt");
1528         rc = record_end_log(obd, &llh);
1529 out:
1530         name_destroy(&lovname);
1531         OBD_FREE(uuid, sizeof(struct obd_uuid));
1532 out_srpc:
1533         sptlrpc_conf_log_free(srpc_log);
1534         RETURN(rc);
1535 }
1536
1537 /* envelope method for all layers log */
1538 static int mgs_write_log_mdt(struct obd_device *obd, struct fs_db *fsdb,
1539                               struct mgs_target_info *mti)
1540 {
1541         struct llog_handle *llh = NULL;
1542         char *cliname;
1543         struct temp_comp comp = { 0 };
1544         char mdt_index[9];
1545         int rc, i = 0;
1546         ENTRY;
1547
1548         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
1549
1550 #if 0
1551         /* COMPAT_146 */
1552         if (mti->mti_flags & LDD_F_UPGRADE14) {
1553                 /* We're starting with an old uuid.  Assume old name for lov
1554                    as well since the lov entry already exists in the log. */
1555                 CDEBUG(D_MGS, "old mds uuid %s\n", mti->mti_uuid);
1556                 if (strncmp(mti->mti_uuid, fsdb->fsdb_mdtlov + 4, 
1557                             strlen(fsdb->fsdb_mdtlov) - 4) != 0) {
1558                         CERROR("old mds uuid %s doesn't match log %s (%s)\n",
1559                                mti->mti_uuid, fsdb->fsdb_mdtlov, 
1560                                fsdb->fsdb_mdtlov + 4);
1561                         RETURN(-EINVAL);
1562                 }
1563         }
1564         /* end COMPAT_146 */
1565 #endif
1566         if (mti->mti_uuid[0] == '\0') {
1567                 /* Make up our own uuid */
1568                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1569                          "%s_UUID", mti->mti_svname);
1570         }
1571
1572         /* add mdt */
1573         rc = mgs_write_log_mdt0(obd, fsdb, mti);
1574         
1575         /* Append the mdt info to the client log */
1576         name_create(&cliname, mti->mti_fsname, "-client");
1577         
1578         if (mgs_log_is_empty(obd, cliname)) { 
1579                 /* Start client log */
1580                 rc = mgs_write_log_lov(obd, fsdb, mti, cliname, 
1581                                        fsdb->fsdb_clilov);
1582                 rc = mgs_write_log_lmv(obd, fsdb, mti, cliname, 
1583                                        fsdb->fsdb_clilmv);
1584         }
1585
1586         /* 
1587         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1588         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
1589         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
1590         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1591         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
1592         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
1593         */
1594         
1595 #if 0
1596         /* COMPAT_146 */
1597         if (mti->mti_flags & LDD_F_UPGRADE14) { 
1598                 rc = record_start_log(obd, &llh, cliname);
1599                 if (rc) 
1600                         GOTO(out, rc);
1601         
1602                 rc = record_marker(obd, llh, fsdb, CM_START, 
1603                                    mti->mti_svname,"add mdc");
1604                                    
1605                 /* Old client log already has MDC entry, but needs mount opt 
1606                    for new client name (lustre-client) */
1607                 /* FIXME Old MDT log already has an old mount opt 
1608                    which we should remove (currently handled by
1609                    class_del_profiles()) */
1610                 rc = record_mount_opt(obd, llh, cliname, fsdb->fsdb_clilov,
1611                                       fsdb->fsdb_mdc);
1612                 /* end COMPAT_146 */
1613                 
1614                 rc = record_marker(obd, llh, fsdb, CM_END, 
1615                                    mti->mti_svname, "add mdc");
1616         } else
1617 #endif
1618         {
1619                 /* copy client info about lov/lmv */
1620                 comp.comp_mti = mti;
1621                 comp.comp_fsdb = fsdb;
1622                 
1623                 rc = mgs_steal_llog_for_mdt_from_client(obd, cliname, 
1624                                                         &comp);
1625
1626                 rc = mgs_write_log_mdc_to_lmv(obd, fsdb, mti, cliname,
1627                                               fsdb->fsdb_clilmv);
1628                 /* add mountopts */
1629                 rc = record_start_log(obd, &llh, cliname);
1630                 if (rc) 
1631                         GOTO(out, rc);
1632
1633                 rc = record_marker(obd, llh, fsdb, CM_START, cliname, 
1634                                    "mount opts");
1635                 rc = record_mount_opt(obd, llh, cliname, fsdb->fsdb_clilov,
1636                                       fsdb->fsdb_clilmv);
1637                 rc = record_marker(obd, llh, fsdb, CM_END, cliname, 
1638                                    "mount opts"); 
1639         }
1640                            
1641         rc = record_end_log(obd, &llh);
1642 out:
1643         name_destroy(&cliname);
1644         
1645         // for_all_existing_mdt except current one
1646         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1647                 char *mdtname;
1648                 if (i !=  mti->mti_stripe_index &&
1649                     test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1650                         sprintf(mdt_index,"-MDT%04x",i);
1651                         
1652                         name_create(&mdtname, mti->mti_fsname, mdt_index);
1653                         rc = mgs_write_log_mdc_to_mdt(obd, fsdb, mti, mdtname);
1654                         name_destroy(&mdtname);
1655                 }
1656         }
1657         
1658         RETURN(rc);
1659 }
1660
1661 /* Add the ost info to the client/mdt lov */
1662 static int mgs_write_log_osc_to_lov(struct obd_device *obd, struct fs_db *fsdb,
1663                                     struct mgs_target_info *mti,
1664                                     char *logname, char *suffix, char *lovname,
1665                                     enum lustre_sec_part sec_part, int flags)
1666 {
1667         struct llog_handle *llh = NULL;
1668         struct sptlrpc_conf_log *srpc_log;
1669         char *nodeuuid, *oscname, *oscuuid, *lovuuid, *svname;
1670         char index[5];
1671         int i, rc;
1672
1673         ENTRY;
1674         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
1675                mti->mti_svname, logname);
1676         
1677         srpc_log = sptlrpc_conf_log_alloc();
1678         if (IS_ERR(srpc_log))
1679                 RETURN(PTR_ERR(srpc_log));
1680         srpc_log->scl_part = sec_part;
1681
1682         rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
1683                                    sec_part, LUSTRE_SP_OST, srpc_log);
1684         if (rc)
1685                 goto out_srpc;
1686
1687         if (mgs_log_is_empty(obd, logname)) {
1688                 /* The first item in the log must be the lov, so we have
1689                    somewhere to add our osc. */
1690                 rc = mgs_write_log_lov(obd, fsdb, mti, logname, lovname);
1691         }
1692   
1693         name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1694         name_create(&svname, mti->mti_svname, "-osc");
1695         name_create(&oscname, svname, suffix);
1696         name_create(&oscuuid, oscname, "_UUID");
1697         name_create(&lovuuid, lovname, "_UUID");
1698
1699         /*
1700         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1701         multihomed (#4)
1702         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
1703         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
1704         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
1705         failover (#6,7)
1706         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1707         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
1708         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
1709         */
1710         
1711         rc = record_start_log(obd, &llh, logname);
1712         if (rc) 
1713                 GOTO(out, rc);
1714         /* FIXME these should be a single journal transaction */
1715         rc = record_marker(obd, llh, fsdb, CM_START | flags, mti->mti_svname,
1716                            "add osc"); 
1717         for (i = 0; i < mti->mti_nid_count; i++) {
1718                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
1719                 rc = record_add_uuid(obd, llh, mti->mti_nids[i], nodeuuid);
1720         }
1721         rc = record_attach(obd, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
1722         rc = record_setup(obd, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
1723         rc = record_sptlrpc_conf(obd, llh, oscname, srpc_log);
1724         rc = mgs_write_log_failnids(obd, mti, llh, oscname);
1725         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1726         rc = record_lov_add(obd, llh, lovname, mti->mti_uuid, index, "1");
1727         rc = record_marker(obd, llh, fsdb, CM_END | flags, mti->mti_svname,
1728                            "add osc"); 
1729         rc = record_end_log(obd, &llh);
1730 out:        
1731         name_destroy(&lovuuid);
1732         name_destroy(&oscuuid);
1733         name_destroy(&oscname);
1734         name_destroy(&svname);
1735         name_destroy(&nodeuuid);
1736 out_srpc:
1737         sptlrpc_conf_log_free(srpc_log);
1738         RETURN(rc);
1739 }
1740
1741 static int mgs_write_log_ost(struct obd_device *obd, struct fs_db *fsdb,
1742                              struct mgs_target_info *mti)
1743 {
1744         struct llog_handle *llh = NULL;
1745         struct sptlrpc_conf_log *srpc_log;
1746         char *logname, *lovname;
1747         char mdt_index[9];
1748         char *ptr = mti->mti_params;
1749         int rc, flags = 0, failout = 0, i;
1750         ENTRY;
1751         
1752         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
1753
1754         /* The ost startup log */
1755
1756         /* If the ost log already exists, that means that someone reformatted
1757            the ost and it called target_add again. */
1758         if (!mgs_log_is_empty(obd, mti->mti_svname)) {
1759                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
1760                                    "exists, yet the server claims it never "
1761                                    "registered. It may have been reformatted, "
1762                                    "or the index changed. writeconf the MDT to "
1763                                    "regenerate all logs.\n", mti->mti_svname);
1764                 RETURN(-EALREADY);
1765         }
1766
1767         srpc_log = sptlrpc_conf_log_alloc();
1768         if (IS_ERR(srpc_log))
1769                 RETURN(PTR_ERR(srpc_log));
1770         srpc_log->scl_part = LUSTRE_SP_OST;
1771
1772         rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
1773                                    LUSTRE_SP_ANY, LUSTRE_SP_OST, srpc_log);
1774         if (rc)
1775                 goto out_srpc;
1776
1777         /*
1778         attach obdfilter ost1 ost1_UUID
1779         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
1780         */
1781         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0) 
1782                 failout = (strncmp(ptr, "failout", 7) == 0);
1783         rc = record_start_log(obd, &llh, mti->mti_svname);
1784         if (rc) 
1785                 RETURN(rc);
1786         /* FIXME these should be a single journal transaction */
1787         rc = record_marker(obd, llh, fsdb, CM_START, mti->mti_svname,"add ost"); 
1788         if (*mti->mti_uuid == '\0') 
1789                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1790                          "%s_UUID", mti->mti_svname);
1791         rc = record_attach(obd, llh, mti->mti_svname,
1792                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
1793         rc = record_setup(obd, llh, mti->mti_svname,
1794                           "dev"/*ignored*/, "type"/*ignored*/,
1795                           failout ? "n" : "f", 0/*options*/);
1796         rc = record_sptlrpc_conf(obd, llh, mti->mti_svname, srpc_log);
1797         rc = record_marker(obd, llh, fsdb, CM_END, mti->mti_svname, "add ost"); 
1798         rc = record_end_log(obd, &llh);
1799
1800         /* We also have to update the other logs where this osc is part of 
1801            the lov */
1802
1803         if (fsdb->fsdb_flags & FSDB_OLDLOG14) {
1804                 /* If we're upgrading, the old mdt log already has our
1805                    entry. Let's do a fake one for fun. */
1806                 /* Note that we can't add any new failnids, since we don't
1807                    know the old osc names. */
1808                 flags = CM_SKIP | CM_UPGRADE146;
1809         
1810         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
1811                 /* If the update flag isn't set, don't update client/mdt
1812                    logs. */
1813                 flags |= CM_SKIP;
1814                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
1815                               "the MDT first to regenerate it.\n",
1816                               mti->mti_svname);
1817         }
1818
1819         // for_all_existing_mdt
1820         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1821                  if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1822                         sprintf(mdt_index,"-MDT%04x",i);
1823                         name_create(&logname, mti->mti_fsname, mdt_index);
1824                         name_create(&lovname, logname, "-mdtlov");
1825                         mgs_write_log_osc_to_lov(obd, fsdb, mti, logname,
1826                                                  mdt_index, lovname,
1827                                                  LUSTRE_SP_MDT, flags);
1828                         name_destroy(&logname);
1829                         name_destroy(&lovname);
1830                 }
1831         }
1832     
1833         /* Append ost info to the client log */
1834         name_create(&logname, mti->mti_fsname, "-client");
1835         mgs_write_log_osc_to_lov(obd, fsdb, mti, logname, "",
1836                                  fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
1837         name_destroy(&logname);
1838 out_srpc:
1839         sptlrpc_conf_log_free(srpc_log);
1840         RETURN(rc);
1841 }
1842
1843 /* Add additional failnids to an existing log.  
1844    The mdc/osc must have been added to logs first */
1845 /* tcp nids must be in dotted-quad ascii -
1846    we can't resolve hostnames from the kernel. */
1847 static int mgs_write_log_add_failnid(struct obd_device *obd, struct fs_db *fsdb,
1848                                      struct mgs_target_info *mti)
1849 {
1850         char *logname, *cliname;
1851         struct llog_handle *llh = NULL;
1852         int rc;
1853         ENTRY;
1854
1855         /* FIXME how do we delete a failnid? Currently --writeconf is the
1856            only way.  Maybe make --erase-params pass a flag to really 
1857            erase all params from logs - except it can't erase the failnids
1858            given when a target first registers, since they aren't processed
1859            as params... */
1860
1861         /* Verify that we know about this target */
1862         if (mgs_log_is_empty(obd, mti->mti_svname)) {
1863                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
1864                                    "yet. It must be started before failnids "
1865                                    "can be added.\n", mti->mti_svname);
1866                 RETURN(-ENOENT);
1867         }
1868
1869         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
1870         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
1871                 name_create(&cliname, mti->mti_svname, "-mdc");
1872         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
1873                 name_create(&cliname, mti->mti_svname, "-osc");
1874         } else {
1875                 RETURN(-EINVAL);
1876         }
1877         
1878         /* Add failover nids to client log */
1879         name_create(&logname, mti->mti_fsname, "-client");
1880         rc = record_start_log(obd, &llh, logname);
1881         if (!rc) { 
1882                 /* FIXME this fn should be a single journal transaction */
1883                 rc = record_marker(obd, llh, fsdb, CM_START, mti->mti_svname,
1884                                    "add failnid");
1885                 rc = mgs_write_log_failnids(obd, mti, llh, cliname);
1886                 rc = record_marker(obd, llh, fsdb, CM_END, mti->mti_svname,
1887                                    "add failnid"); 
1888                 rc = record_end_log(obd, &llh);
1889         }
1890         name_destroy(&logname);
1891
1892         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
1893                 /* Add OST failover nids to the MDT log as well */
1894                 name_create(&logname, mti->mti_fsname, "-MDT0000");
1895                 rc = record_start_log(obd, &llh, logname);
1896                 if (!rc) {
1897                         rc = record_marker(obd, llh, fsdb, CM_START, 
1898                                            mti->mti_svname, "add failnid");
1899                         rc = mgs_write_log_failnids(obd, mti, llh, cliname);
1900                         rc = record_marker(obd, llh, fsdb, CM_END, 
1901                                            mti->mti_svname, "add failnid"); 
1902                         rc = record_end_log(obd, &llh);
1903                 }
1904                 name_destroy(&logname);
1905         }
1906
1907         name_destroy(&cliname);
1908         RETURN(rc);
1909 }
1910
1911 static int mgs_wlp_lcfg(struct obd_device *obd, struct fs_db *fsdb, 
1912                         struct mgs_target_info *mti,
1913                         char *logname, struct lustre_cfg_bufs *bufs,
1914                         char *tgtname, char *ptr)
1915 {
1916         char comment[MTI_NAME_MAXLEN];
1917         char *tmp;
1918         struct lustre_cfg *lcfg;
1919         int rc;
1920         
1921         /* Erase any old settings of this same parameter */
1922         memcpy(comment, ptr, MTI_NAME_MAXLEN);
1923         comment[MTI_NAME_MAXLEN - 1] = 0;
1924         /* But don't try to match the value. */
1925         if ((tmp = strchr(comment, '=')))
1926             *tmp = 0;
1927         /* FIXME we should skip settings that are the same as old values */
1928         rc = mgs_modify(obd, fsdb, mti, logname, tgtname, comment, CM_SKIP);
1929         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", rc ?
1930                       "Sett" : "Modify", tgtname, comment, logname);
1931
1932         lustre_cfg_bufs_reset(bufs, tgtname);
1933         lustre_cfg_bufs_set_string(bufs, 1, ptr);
1934         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
1935         if (!lcfg) 
1936                 return -ENOMEM;
1937         rc = mgs_write_log_direct(obd, fsdb, logname, lcfg, tgtname, comment);
1938         lustre_cfg_free(lcfg);
1939         return rc;
1940 }
1941
1942 /*
1943  * populate rules which applied to a target device
1944  */
1945 static int mgs_get_srpc_conf_log(struct fs_db *fsdb, const char *tgt,
1946                                  enum lustre_sec_part from,
1947                                  enum lustre_sec_part to,
1948                                  struct sptlrpc_conf_log *log)
1949 {
1950         struct mgs_tgt_srpc_conf *tgtconf;
1951         struct sptlrpc_rule_set  *tgt_rset;
1952         int                       found_tgt = 0, rc;
1953
1954         for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf;
1955              tgtconf = tgtconf->mtsc_next) {
1956                 if (!strcmp(tgt, tgtconf->mtsc_tgt)) {
1957                         found_tgt = 1;
1958                         break;
1959                 }
1960         }
1961
1962         if (found_tgt)
1963                 tgt_rset = &tgtconf->mtsc_rset;
1964         else
1965                 tgt_rset = NULL;
1966
1967         rc = sptlrpc_conf_log_populate(&fsdb->fsdb_srpc_gen, tgt_rset,
1968                                        from, to, fsdb->fsdb_srpc_fl_udesc, log);
1969         if (rc)
1970                 CERROR("failed to populate srpc log for %s: %d\n", tgt, rc);
1971
1972         return rc;
1973 }
1974
1975 struct mgs_msl_data {
1976         struct obd_device      *mmd_obd;
1977         struct fs_db           *mmd_fsdb;
1978         struct mgs_target_info *mmd_mti;
1979         int                     mmd_skip;
1980         int                     mmd_attached;
1981         int                     mmd_server;
1982         enum lustre_sec_part    mmd_tgtpart;
1983         char                    mmd_tgtname[MTI_NAME_MAXLEN];
1984 };
1985
1986 static void mgs_msl_data_cleanup(struct mgs_msl_data *mmd)
1987 {
1988         mmd->mmd_attached = 0;
1989         mmd->mmd_tgtname[0] = '\0';
1990 }
1991
1992 static int mgs_msl_tgt_uuid2name(char *tgtname, char *tgtuuid)
1993 {
1994         char    *ptr;
1995
1996         if (tgtuuid == NULL) {
1997                 CERROR("missing target UUID???\n");
1998                 return -EINVAL;
1999         }
2000
2001         ptr = strstr(tgtuuid, "_UUID");
2002         if (ptr == NULL) {
2003                 CERROR("unrecognized UUID: %s\n", tgtuuid);
2004                 return -EINVAL;
2005         }
2006
2007         *ptr = '\0';;
2008         strncpy(tgtname, tgtuuid, MTI_NAME_MAXLEN);
2009         tgtname[MTI_NAME_MAXLEN - 1] = '\0';
2010
2011         return 0;
2012 }
2013
2014 static int mgs_modify_srpc_log_handler(struct llog_handle *llh,
2015                                        struct llog_rec_hdr *rec, 
2016                                        void *data)
2017 {
2018         struct mgs_msl_data *mmd = (struct mgs_msl_data *)data;
2019         struct cfg_marker   *marker;
2020         struct lustre_cfg   *lcfg = (struct lustre_cfg *)(rec + 1);
2021         int                  cfg_len, rc;
2022         ENTRY;
2023
2024         if (rec->lrh_type != OBD_CFG_REC) {
2025                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2026                 RETURN(-EINVAL);
2027         }
2028
2029         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) - 
2030                   sizeof(struct llog_rec_tail);
2031
2032         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2033         if (rc) {
2034                 CERROR("Insane cfg\n");
2035                 RETURN(rc);
2036         }
2037
2038         if (lcfg->lcfg_command == LCFG_MARKER) {
2039                 marker = lustre_cfg_buf(lcfg, 1);
2040
2041                 if (marker->cm_flags & CM_START &&
2042                     marker->cm_flags & CM_SKIP)
2043                         mmd->mmd_skip = 1;
2044                 if (marker->cm_flags & CM_END)
2045                         mmd->mmd_skip = 0;
2046
2047                 RETURN(0);
2048         }
2049
2050         if (mmd->mmd_skip)
2051                 RETURN(0);
2052
2053         switch (lcfg->lcfg_command) {
2054         case LCFG_ATTACH:
2055                 mmd->mmd_attached = 1;
2056
2057                 if (!strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OST_NAME)) {
2058                         mmd->mmd_server = 1;
2059                         mmd->mmd_tgtpart = LUSTRE_SP_OST;
2060                 } else if (!strcmp(lustre_cfg_string(lcfg, 1),
2061                                    LUSTRE_MDT_NAME)) {
2062                         mmd->mmd_server = 1;
2063                         mmd->mmd_tgtpart = LUSTRE_SP_MDT;
2064                 } else if (!strcmp(lustre_cfg_string(lcfg, 1),
2065                                    LUSTRE_OSC_NAME)) {
2066                         mmd->mmd_server = 0;
2067                         mmd->mmd_tgtpart = LUSTRE_SP_OST;
2068                 } else if (!strcmp(lustre_cfg_string(lcfg, 1),
2069                                    LUSTRE_MDC_NAME)) {
2070                         mmd->mmd_server = 0;
2071                         mmd->mmd_tgtpart = LUSTRE_SP_MDT;
2072                 } else {
2073                         mmd->mmd_attached = 0;
2074                 }
2075
2076                 if (mmd->mmd_attached && mmd->mmd_server) {
2077                         rc = mgs_msl_tgt_uuid2name(mmd->mmd_tgtname,
2078                                                    lustre_cfg_string(lcfg, 2));
2079                         if (rc) {
2080                                 mgs_msl_data_cleanup(mmd);
2081                                 break;
2082                         }
2083                 }
2084
2085                 break;
2086         case LCFG_SETUP:
2087                 if (!mmd->mmd_attached)
2088                         break;
2089
2090                 /* already got tgtname at LCFG_ATTACH */
2091                 if (mmd->mmd_server)
2092                         break;
2093
2094                 rc = mgs_msl_tgt_uuid2name(mmd->mmd_tgtname,
2095                                            lustre_cfg_string(lcfg, 1));
2096                 if (rc) {
2097                         mgs_msl_data_cleanup(mmd);
2098                         break;
2099                 }
2100
2101                 break;
2102         case LCFG_SPTLRPC_CONF: {
2103                 struct sptlrpc_conf_log *log;
2104                 enum lustre_sec_part from;
2105
2106                 if (!mmd->mmd_attached)
2107                         break;
2108
2109                 log = sptlrpc_conf_log_extract(lcfg);
2110                 if (log == NULL) {
2111                         CERROR("missing sptlrpc config log???\n");
2112                         mgs_msl_data_cleanup(mmd);
2113                         break;
2114                 }
2115
2116                 if (mmd->mmd_server)
2117                         from = LUSTRE_SP_ANY;
2118                 else
2119                         from = log->scl_part;
2120
2121                 /* cleanup the old log */
2122                 sptlrpc_conf_log_cleanup(log);
2123
2124                 /* populate new log */
2125                 rc = mgs_get_srpc_conf_log(mmd->mmd_fsdb, mmd->mmd_tgtname,
2126                                            from, mmd->mmd_tgtpart, log);
2127                 if (rc) {
2128                         mgs_msl_data_cleanup(mmd);
2129                         break;
2130                 }
2131
2132                 /* Overwrite the log */
2133                 rec->lrh_len = cfg_len; 
2134                 rc = llog_write_rec(llh, rec, NULL, 0, (void *)lcfg, 
2135                                     rec->lrh_index);
2136                 if (rc)
2137                         CERROR("overwrite sptlrpc conf log failed: %d\n", rc);
2138
2139                 /* append new one */
2140                 rc = record_marker(mmd->mmd_obd, llh, mmd->mmd_fsdb, CM_START,
2141                                    mmd->mmd_mti->mti_svname, "sptlrpc config");
2142                 rc = record_sptlrpc_conf(mmd->mmd_obd, llh,
2143                                          lustre_cfg_string(lcfg, 0), log);
2144                 rc = record_marker(mmd->mmd_obd, llh, mmd->mmd_fsdb, CM_END,
2145                                    mmd->mmd_mti->mti_svname, "sptlrpc config");
2146
2147                 mgs_msl_data_cleanup(mmd);
2148                 break;
2149         }
2150         default:
2151                 /* ignore all others */
2152                 break;
2153         }
2154
2155         RETURN(rc);
2156 }
2157
2158 static int mgs_modify_srpc_log(struct obd_device *obd,
2159                                struct fs_db *fsdb,
2160                                struct mgs_target_info *mti,
2161                                char *logname)
2162 {
2163         struct llog_handle   *llh;
2164         struct lvfs_run_ctxt  saved;
2165         struct llog_ctxt     *ctxt;
2166         struct mgs_msl_data  *mmd;
2167         int rc, rc2;
2168         ENTRY;
2169
2170         CDEBUG(D_MGS, "modify sptlrpc log for %s\n", logname);
2171
2172         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2173         
2174         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
2175         LASSERT(ctxt != NULL);
2176         rc = llog_create(ctxt, &llh, NULL, logname);
2177         if (rc)
2178                 GOTO(out_pop, rc);
2179
2180         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
2181         if (rc)
2182                 GOTO(out_close, rc);
2183
2184         if (llog_get_size(llh) <= 1)
2185                 GOTO(out_close, rc = 0);
2186
2187         OBD_ALLOC_PTR(mmd);
2188         if (!mmd) 
2189                 GOTO(out_close, rc = -ENOMEM);
2190
2191         mmd->mmd_obd = obd;
2192         mmd->mmd_fsdb = fsdb;
2193         mmd->mmd_mti = mti;
2194
2195         rc = llog_process(llh, mgs_modify_srpc_log_handler, (void *) mmd, NULL);
2196
2197         OBD_FREE_PTR(mmd);
2198
2199 out_close:
2200         rc2 = llog_close(llh);
2201         if (!rc)
2202                 rc = rc2;
2203
2204 out_pop:
2205         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2206         llog_ctxt_put(ctxt);
2207
2208         if (rc) 
2209                 CERROR("modify sptlrpc log %s failed %d\n", logname, rc);
2210         RETURN(rc);
2211 }
2212
2213 /*
2214  * for each of log, remove old conf at first
2215  */
2216 static int mgs_modify_srpc_log_all(struct obd_device *obd,
2217                                    struct fs_db *fsdb,
2218                                    struct mgs_target_info *mti)
2219 {
2220         char             tgt_index[9];
2221         char            *logname;
2222         int              i, rc = 0, rc2;
2223         ENTRY;
2224
2225         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2226                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
2227                         sprintf(tgt_index,"-MDT%04x",i);
2228
2229                         name_create(&logname, mti->mti_fsname, tgt_index);
2230                         rc2 = mgs_modify(obd, fsdb, mti, logname,
2231                                          mti->mti_fsname, "sptlrpc config",
2232                                          CM_SKIP);
2233                         rc2 = mgs_modify_srpc_log(obd, fsdb, mti, logname);
2234                         name_destroy(&logname);
2235
2236                         if (rc2 && rc == 0)
2237                                 rc = rc2;
2238                 }
2239         }
2240
2241         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2242                 if (test_bit(i,  fsdb->fsdb_ost_index_map)) {
2243                         sprintf(tgt_index,"-OST%04x",i);
2244
2245                         name_create(&logname, mti->mti_fsname, tgt_index);
2246                         rc2 = mgs_modify(obd, fsdb, mti, logname,
2247                                          mti->mti_fsname, "sptlrpc config",
2248                                          CM_SKIP);
2249                         rc2 = mgs_modify_srpc_log(obd, fsdb, mti, logname);
2250                         name_destroy(&logname);
2251
2252                         if (rc2 && rc == 0)
2253                                 rc = rc2;
2254                 }
2255         }
2256
2257         name_create(&logname, mti->mti_fsname, "-client");
2258         rc2 = mgs_modify(obd, fsdb, mti, logname,
2259                          mti->mti_fsname, "sptlrpc config", CM_SKIP);
2260         rc2 = mgs_modify_srpc_log(obd, fsdb, mti, logname);
2261         name_destroy(&logname);
2262
2263         if (rc2 && rc == 0)
2264                 rc = rc2;
2265
2266         RETURN(rc);
2267 }
2268
2269 static int mgs_srpc_set_param_disk(struct obd_device *obd,
2270                                    struct fs_db *fsdb,
2271                                    struct mgs_target_info *mti,
2272                                    char *param)
2273 {
2274         struct llog_handle     *llh = NULL;
2275         char                   *logname;
2276         char                   *comment, *ptr;
2277         struct lustre_cfg_bufs  bufs;
2278         struct lustre_cfg      *lcfg;
2279         int                     rc, len;
2280         ENTRY;
2281
2282         /* get comment */
2283         ptr = strchr(param, '=');
2284         LASSERT(ptr);
2285         len = ptr - param;
2286
2287         OBD_ALLOC(comment, len + 1);
2288         if (comment == NULL)
2289                 RETURN(-ENOMEM);
2290         strncpy(comment, param, len);
2291         comment[len] = '\0';
2292
2293         /* prepare lcfg */
2294         lustre_cfg_bufs_reset(&bufs, mti->mti_svname);
2295         lustre_cfg_bufs_set_string(&bufs, 1, param);
2296         lcfg = lustre_cfg_new(0, &bufs);
2297         if (lcfg == NULL)
2298                 GOTO(out_comment, rc = -ENOMEM);
2299
2300         /* construct log name */
2301         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2302         if (rc)
2303                 GOTO(out_lcfg, rc);
2304
2305         if (mgs_log_is_empty(obd, logname)) {
2306                 rc = record_start_log(obd, &llh, logname);
2307                 record_end_log(obd, &llh);
2308                 if (rc) 
2309                         GOTO(out, rc);
2310         }
2311
2312         /* obsolete old one */
2313         mgs_modify(obd, fsdb, mti, logname, mti->mti_svname, comment, CM_SKIP);
2314
2315         /* write the new one */
2316         rc = mgs_write_log_direct(obd, fsdb, logname, lcfg,
2317                                   mti->mti_svname, comment);
2318         if (rc)
2319                 CERROR("err %d writing log %s\n", rc, logname);
2320
2321 out:
2322         name_destroy(&logname);
2323 out_lcfg:
2324         lustre_cfg_free(lcfg);
2325 out_comment:
2326         OBD_FREE(comment, len + 1);
2327         RETURN(rc);
2328 }
2329
2330 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2331                                         char *param)
2332 {
2333         char    *ptr;
2334
2335         /* disable the adjustable udesc parameter for now, i.e. use default
2336          * setting that client always ship udesc to MDT if possible. to enable
2337          * it simply remove the following line */
2338         goto error_out;
2339
2340         ptr = strchr(param, '=');
2341         if (ptr == NULL)
2342                 goto error_out;
2343         *ptr++ = '\0';
2344
2345         if (strcmp(param, PARAM_SRPC_UDESC))
2346                 goto error_out;
2347
2348         if (strcmp(ptr, "yes") == 0) {
2349                 fsdb->fsdb_srpc_fl_udesc = 1;
2350                 CWARN("Enable user descriptor shipping from client to MDT\n");
2351         } else if (strcmp(ptr, "no") == 0) {
2352                 fsdb->fsdb_srpc_fl_udesc = 0;
2353                 CWARN("Disable user descriptor shipping from client to MDT\n");
2354         } else {
2355                 *(ptr - 1) = '=';
2356                 goto error_out;
2357         }
2358         return 0;
2359
2360 error_out:
2361         CERROR("Invalid param: %s\n", param);
2362         return -EINVAL;
2363 }
2364
2365 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2366                                   const char *svname,
2367                                   char *param)
2368 {
2369         struct sptlrpc_rule      rule;
2370         struct sptlrpc_rule_set *rset;
2371         int                      rc;
2372         ENTRY;
2373
2374         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2375                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2376                 RETURN(-EINVAL);
2377         }
2378
2379         if (strncmp(param, PARAM_SRPC_UDESC,
2380                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2381                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2382         }
2383
2384         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2385                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2386                 RETURN(-EINVAL);
2387         }
2388
2389         param += sizeof(PARAM_SRPC_FLVR) - 1;
2390
2391         rc = sptlrpc_parse_rule(param, &rule);
2392         if (rc)
2393                 RETURN(rc);
2394
2395         /* preapre room for this coming rule. svcname format should be:
2396          * - fsname: general rule
2397          * - fsname-tgtname: target-specific rule
2398          */
2399         if (strchr(svname, '-')) {
2400                 struct mgs_tgt_srpc_conf *tgtconf;
2401                 int                       found = 0;
2402
2403                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2404                      tgtconf = tgtconf->mtsc_next) {
2405                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2406                                 found = 1;
2407                                 break;
2408                         }
2409                 }
2410
2411                 if (!found) {
2412                         int name_len;
2413
2414                         OBD_ALLOC_PTR(tgtconf);
2415                         if (tgtconf == NULL)
2416                                 RETURN(-ENOMEM);
2417
2418                         name_len = strlen(svname);
2419
2420                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
2421                         if (tgtconf->mtsc_tgt == NULL) {
2422                                 OBD_FREE_PTR(tgtconf);
2423                                 RETURN(-ENOMEM);
2424                         }
2425                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
2426
2427                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
2428                         fsdb->fsdb_srpc_tgt = tgtconf;
2429                 }
2430
2431                 rset = &tgtconf->mtsc_rset;
2432         } else {
2433                 rset = &fsdb->fsdb_srpc_gen;
2434         }
2435
2436         /* limit the maximum number of rules, but allow deletion in any case */
2437         if (rset->srs_nrule >= SPTLRPC_CONF_LOG_MAX / 2 &&
2438             rule.sr_flvr.sf_rpc != SPTLRPC_FLVR_INVALID) {
2439                 CERROR("too many (%d) rules already for %s\n",
2440                        rset->srs_nrule, svname);
2441                 RETURN(-E2BIG);
2442         }
2443
2444         rc = sptlrpc_rule_set_merge(rset, &rule, 1);
2445
2446         RETURN(rc);
2447 }
2448
2449 static int mgs_srpc_set_param(struct obd_device *obd,
2450                               struct fs_db *fsdb,
2451                               struct mgs_target_info *mti,
2452                               char *param)
2453 {
2454         char                    *copy;
2455         int                      rc, copy_size;
2456         ENTRY;
2457
2458         /* keep a copy of original param, which could be destroied
2459          * during parsing */
2460         copy_size = strlen(param) + 1;
2461         OBD_ALLOC(copy, copy_size);
2462         if (copy == NULL)
2463                 return -ENOMEM;
2464         memcpy(copy, param, copy_size);
2465
2466         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
2467         if (rc)
2468                 goto out_free;
2469
2470         /* previous steps guaranteed the syntax is correct */
2471         rc = mgs_srpc_set_param_disk(obd, fsdb, mti, copy);
2472         if (rc)
2473                 goto out_free;
2474
2475         /* now apply the new rules to all existing config logs */
2476         rc = mgs_modify_srpc_log_all(obd, fsdb, mti);
2477
2478 out_free:
2479         OBD_FREE(copy, copy_size);
2480         RETURN(rc);
2481 }
2482
2483 struct mgs_srpc_read_data {
2484         struct fs_db   *msrd_fsdb;
2485         int             msrd_skip;
2486 };
2487
2488 static int mgs_srpc_read_handler(struct llog_handle *llh,
2489                                  struct llog_rec_hdr *rec, 
2490                                  void *data)
2491 {
2492         struct mgs_srpc_read_data *msrd = (struct mgs_srpc_read_data *) data;
2493         struct cfg_marker         *marker;
2494         struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
2495         char                      *svname, *param;
2496         int                        cfg_len, rc;
2497         ENTRY;
2498
2499         if (rec->lrh_type != OBD_CFG_REC) {
2500                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2501                 RETURN(-EINVAL);
2502         }
2503
2504         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) - 
2505                   sizeof(struct llog_rec_tail);
2506
2507         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2508         if (rc) {
2509                 CERROR("Insane cfg\n");
2510                 RETURN(rc);
2511         }
2512
2513         if (lcfg->lcfg_command == LCFG_MARKER) {
2514                 marker = lustre_cfg_buf(lcfg, 1);
2515
2516                 if (marker->cm_flags & CM_START &&
2517                     marker->cm_flags & CM_SKIP)
2518                         msrd->msrd_skip = 1;
2519                 if (marker->cm_flags & CM_END)
2520                         msrd->msrd_skip = 0;
2521
2522                 RETURN(0);
2523         }
2524
2525         if (msrd->msrd_skip)
2526                 RETURN(0);
2527
2528         if (lcfg->lcfg_command != 0) {
2529                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
2530                 RETURN(0);
2531         }
2532
2533         svname = lustre_cfg_string(lcfg, 0);
2534         if (svname == NULL) {
2535                 CERROR("svname is empty\n");
2536                 RETURN(0);
2537         }
2538
2539         param = lustre_cfg_string(lcfg, 1);
2540         if (param == NULL) {
2541                 CERROR("param is empty\n");
2542                 RETURN(0);
2543         }
2544
2545         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
2546         if (rc)
2547                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
2548
2549         RETURN(0);
2550 }
2551
2552 static int mgs_get_fsdb_srpc_from_llog(struct obd_device *obd,
2553                                        struct fs_db *fsdb)
2554 {
2555         struct llog_handle        *llh = NULL;
2556         struct lvfs_run_ctxt       saved;
2557         struct llog_ctxt          *ctxt;
2558         char                      *logname;
2559         struct mgs_srpc_read_data  msrd;
2560         int                        rc;
2561         ENTRY;
2562
2563         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
2564         LASSERT(ctxt != NULL);
2565
2566         /* construct log name */
2567         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
2568         if (rc)
2569                 RETURN(rc);
2570
2571         if (mgs_log_is_empty(obd, logname))
2572                 GOTO(out, rc = 0);
2573
2574         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2575
2576         rc = llog_create(ctxt, &llh, NULL, logname);
2577         if (rc)
2578                 GOTO(out_pop, rc);
2579
2580         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
2581         if (rc)
2582                 GOTO(out_close, rc);
2583
2584         if (llog_get_size(llh) <= 1)
2585                 GOTO(out_close, rc = 0);
2586
2587         msrd.msrd_fsdb = fsdb;
2588         msrd.msrd_skip = 0;
2589
2590         rc = llog_process(llh, mgs_srpc_read_handler, (void *) &msrd, NULL);
2591
2592 out_close:
2593         llog_close(llh);
2594 out_pop:
2595         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2596 out:
2597         name_destroy(&logname);
2598         llog_ctxt_put(ctxt);
2599
2600         if (rc)
2601                 CERROR("failed to read sptlrpc config database: %d\n", rc);
2602         RETURN(rc);
2603 }
2604
2605 static int mgs_write_log_params(struct obd_device *obd, struct fs_db *fsdb,
2606                                 struct mgs_target_info *mti)
2607 {
2608         struct lustre_cfg_bufs bufs;
2609         struct lustre_cfg *lcfg;
2610         char *logname;
2611         char *ptr = mti->mti_params;
2612         char *endptr, *tmp;
2613         int rc = 0;
2614         ENTRY;
2615
2616         if (!mti->mti_params) 
2617                 RETURN(0);
2618
2619         /* For various parameter settings, we have to figure out which logs
2620            care about them (e.g. both mdt and client for lov settings) */
2621         while (ptr) {
2622                 while (*ptr == ' ') 
2623                         ptr++;
2624                 if (*ptr == '\0')
2625                         break;
2626                 endptr = strchr(ptr, ' ');
2627                 if (endptr) 
2628                         *endptr = '\0';
2629                 CDEBUG(D_MGS, "next param '%s'\n", ptr);
2630
2631                 /* The params are stored in MOUNT_DATA_FILE and modified 
2632                    via tunefs.lustre, or set using lctl conf_param */
2633
2634                 /* Processed in lustre_start_mgc */
2635                 if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0) 
2636                         GOTO(end_while, rc);
2637
2638                 /* Processed in mgs_write_log_ost */
2639                 if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
2640                         if (mti->mti_flags & LDD_F_PARAM) {
2641                                 LCONSOLE_ERROR_MSG(0x169, "%s can only be "
2642                                                    "changed with tunefs.lustre"
2643                                                    "and --writeconf\n", ptr);
2644                                 rc = -EPERM;
2645                         }
2646                         GOTO(end_while, rc);
2647                 }
2648
2649                 if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
2650                         rc = mgs_srpc_set_param(obd, fsdb, mti, ptr);
2651                         GOTO(end_while, rc);
2652                 }
2653
2654                 if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
2655                         /* Add a failover nidlist */
2656                         rc = 0;
2657                         /* We already processed failovers params for new
2658                            targets in mgs_write_log_target */
2659                         if (mti->mti_flags & LDD_F_PARAM) {
2660                                 CDEBUG(D_MGS, "Adding failnode\n");
2661                                 rc = mgs_write_log_add_failnid(obd, fsdb, mti);
2662                         }
2663                         GOTO(end_while, rc);
2664                 }
2665
2666                 if (class_match_param(ptr, PARAM_SYS_TIMEOUT, &tmp) == 0) {
2667                         /* Change obd timeout */
2668                         int timeout;
2669                         timeout = simple_strtoul(tmp, NULL, 0);
2670
2671                         CDEBUG(D_MGS, "obd timeout %d\n", timeout);
2672                         lustre_cfg_bufs_reset(&bufs, NULL);
2673                         lcfg = lustre_cfg_new(LCFG_SET_TIMEOUT, &bufs);
2674                         lcfg->lcfg_num = timeout;
2675                         /* modify all servers and clients */
2676                         rc = mgs_write_log_direct_all(obd, fsdb, mti, lcfg,
2677                                                       mti->mti_fsname,
2678                                                       "timeout"); 
2679                         lustre_cfg_free(lcfg);
2680                         GOTO(end_while, rc);
2681                 }
2682
2683                 if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) { 
2684                         /* active=0 means off, anything else means on */
2685                         char mdt_index[16];
2686                         int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
2687                         int i;
2688
2689                         if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2690                                 LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
2691                                                    "be (de)activated.\n",
2692                                                    mti->mti_svname);
2693                                 rc = -EINVAL;
2694                                 goto end_while;
2695                         }
2696                         LCONSOLE_WARN("Permanently %sactivating %s\n",
2697                                       flag ? "de": "re", mti->mti_svname);
2698                         /* Modify clilov */
2699                         name_create(&logname, mti->mti_fsname, "-client");
2700                         rc = mgs_modify(obd, fsdb, mti, logname, 
2701                                         mti->mti_svname, "add osc", flag);
2702                         name_destroy(&logname);
2703                         if (rc) 
2704                                 goto active_err;
2705                         /* Modify mdtlov */
2706                         /* FIXME add to all MDT logs for CMD */
2707                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2708                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2709                                         continue;
2710                                 sprintf(mdt_index,"-MDT%04x", i);
2711                                 name_create(&logname, mti->mti_fsname, mdt_index);
2712                                 rc = mgs_modify(obd, fsdb, mti, logname, 
2713                                                 mti->mti_svname, "add osc", flag);
2714                                 name_destroy(&logname);
2715                                 if (rc)
2716                                         goto active_err;
2717                         }
2718 active_err:
2719                         if (rc) {
2720                                 LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
2721                                                    "log (%d). No permanent "
2722                                                    "changes were made to the "
2723                                                    "config log.\n",
2724                                                    mti->mti_svname, rc);
2725                                 if (fsdb->fsdb_flags & FSDB_OLDLOG14) 
2726                                         LCONSOLE_ERROR_MSG(0x146, "This may be"
2727                                                            " because the log "
2728                                                            "is in the old 1.4"
2729                                                            "style. Consider "
2730                                                            " --writeconf to "
2731                                                            "update the logs.\n");
2732                                 goto end_while;
2733                         }
2734                         /* Fall through to osc proc for deactivating 
2735                            live OSC on running MDT / clients. */
2736                 }
2737                 /* Below here, let obd's XXX_process_config methods handle it */
2738  
2739                 /* All lov. in proc */
2740                 if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
2741                         char mdt_index[16];
2742                         char *mdtlovname;
2743                         
2744                         CDEBUG(D_MGS, "lov param %s\n", ptr);
2745                         if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
2746                                 LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
2747                                                    "set on the MDT, not %s. "
2748                                                    "Ignoring.\n", 
2749                                                    mti->mti_svname);
2750                                 rc = 0;
2751                                 goto end_while;
2752                         }
2753
2754                         /* Modify mdtlov */
2755                         if (mgs_log_is_empty(obd, mti->mti_svname))
2756                                 GOTO(end_while, rc = -ENODEV);
2757
2758                         sprintf(mdt_index,"-MDT%04x", mti->mti_stripe_index);
2759                         name_create(&logname, mti->mti_fsname, mdt_index);
2760                         name_create(&mdtlovname, logname, "-mdtlov");
2761                         rc = mgs_wlp_lcfg(obd, fsdb, mti, mti->mti_svname, 
2762                                           &bufs, mdtlovname, ptr);
2763                         name_destroy(&logname);
2764                         name_destroy(&mdtlovname);
2765                         if (rc)
2766                                 GOTO(end_while, rc);
2767
2768                         /* Modify clilov */
2769                         name_create(&logname, mti->mti_fsname, "-client");
2770                         rc = mgs_wlp_lcfg(obd, fsdb, mti, logname, &bufs,
2771                                           fsdb->fsdb_clilov, ptr);
2772                         name_destroy(&logname);
2773                         GOTO(end_while, rc);
2774                 }
2775
2776                 /* All osc., mdc., llite. params in proc */
2777                 if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) || 
2778                     (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
2779                     (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
2780                         char *cname;
2781                         if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
2782                                 name_create(&cname, mti->mti_fsname, "-client");
2783                         /* Add the client type to match the obdname 
2784                            in class_config_llog_handler */
2785                         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2786                                 /* COMPAT_146 */
2787                                 if (fsdb->fsdb_mdc)
2788                                         name_create(&cname, fsdb->fsdb_mdc, "");
2789                                 else
2790                                         name_create(&cname, mti->mti_svname,
2791                                                     "-mdc");
2792                         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2793                                 /* COMPAT_146 */
2794                                 if (fsdb->fsdb_flags & FSDB_OLDLOG14) {
2795                                         LCONSOLE_ERROR_MSG(0x148, "Upgraded "
2796                                                            "client logs for %s"
2797                                                            " cannot be "
2798                                                            "modified. Consider"
2799                                                            " updating the "
2800                                                            "configuration with"
2801                                                            " --writeconf\n",
2802                                                            mti->mti_svname);
2803                                         /* We don't know the names of all the
2804                                            old oscs*/
2805                                         rc = -EINVAL;
2806                                         goto end_while;
2807                                 }
2808                                 name_create(&cname, mti->mti_svname, "-osc");
2809                         } else {       
2810                                 rc = -EINVAL;
2811                                 goto end_while;
2812                         }
2813
2814                         CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2815                         
2816                         /* Modify client */
2817                         name_create(&logname, mti->mti_fsname, "-client");
2818                         rc = mgs_wlp_lcfg(obd, fsdb, mti, logname, &bufs,
2819                                           cname, ptr);
2820
2821                         /* osc params affect the MDT as well */
2822                         if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2823                                 char mdt_index[16];
2824                                 int i;
2825
2826                                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2827                                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2828                                                 continue;
2829                                         name_destroy(&cname);
2830                                         sprintf(mdt_index, "-osc-MDT%04x", i);
2831                                         name_create(&cname, mti->mti_svname,
2832                                                     mdt_index);
2833                                         name_destroy(&logname);
2834                                         sprintf(mdt_index, "-MDT%04x", i);
2835                                         name_create(&logname, mti->mti_fsname,
2836                                                     mdt_index);
2837                                         if (!mgs_log_is_empty(obd, logname))
2838                                                 rc = mgs_wlp_lcfg(obd, fsdb,
2839                                                                   mti, logname,
2840                                                                   &bufs, cname,
2841                                                                   ptr);
2842                                         if (rc)
2843                                                 break;
2844                                 }
2845                         }
2846                         name_destroy(&logname);
2847                         name_destroy(&cname);
2848                         GOTO(end_while, rc);
2849                 }
2850
2851                 /* All mdt., ost. params in proc */
2852                 if ((class_match_param(ptr, PARAM_MDT, NULL) == 0) || 
2853                     (class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
2854                     (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
2855                         CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2856                         if (mgs_log_is_empty(obd, mti->mti_svname)) {
2857                                 rc = -ENODEV;
2858                                 goto end_while;
2859                         }
2860                         rc = mgs_wlp_lcfg(obd, fsdb, mti, mti->mti_svname,
2861                                           &bufs, mti->mti_svname, ptr);
2862                         GOTO(end_while, rc);
2863                 }
2864
2865                 LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
2866
2867 end_while:
2868                 if (rc) {
2869                         CERROR("err %d on param '%s\n", rc, ptr);
2870                         break;
2871                 }
2872                 
2873                 if (!endptr)
2874                         /* last param */
2875                         break;
2876                  
2877                 *endptr = ' ';
2878                 ptr = endptr + 1;
2879         }
2880
2881         RETURN(rc);
2882 }
2883
2884 /* Not implementing automatic failover nid addition at this time. */
2885 int mgs_check_failnid(struct obd_device *obd, struct mgs_target_info *mti)
2886 {
2887 #if 0
2888         struct fs_db *fsdb;
2889         int rc;
2890         ENTRY;
2891
2892         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb); 
2893         if (rc) 
2894                 RETURN(rc);
2895
2896         if (mgs_log_is_empty(obd, mti->mti_svname)) 
2897                 /* should never happen */
2898                 RETURN(-ENOENT);
2899
2900         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
2901
2902         /* FIXME We can just check mti->params to see if we're already in
2903            the failover list.  Modify mti->params for rewriting back at 
2904            server_register_target(). */
2905         
2906         down(&fsdb->fsdb_sem);
2907         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
2908         up(&fsdb->fsdb_sem);
2909
2910         RETURN(rc);
2911 #endif
2912         return 0;
2913 }
2914
2915 int mgs_write_log_target(struct obd_device *obd,
2916                          struct mgs_target_info *mti)
2917 {
2918         struct fs_db *fsdb;
2919         int rc = -EINVAL;
2920         ENTRY;
2921
2922         /* set/check the new target index */
2923         rc = mgs_set_index(obd, mti);
2924         if (rc < 0) {
2925                 CERROR("Can't get index (%d)\n", rc);
2926                 RETURN(rc);
2927         }
2928
2929         /* COMPAT_146 */
2930         if (mti->mti_flags & LDD_F_UPGRADE14) {
2931                 if (rc == EALREADY) {
2932                         LCONSOLE_INFO("Found index %d for %s 1.4 log, "
2933                                       "upgrading\n", mti->mti_stripe_index, 
2934                                       mti->mti_svname);
2935                 } else {
2936                         LCONSOLE_ERROR_MSG(0x149, "Failed to find %s in the old"
2937                                            " client log. Apparently it is not "
2938                                            "part of this filesystem, or the old"
2939                                            " log is wrong.\nUse 'writeconf' on "
2940                                            "the MDT to force log regeneration."
2941                                            "\n", mti->mti_svname);
2942                         /* Not in client log?  Upgrade anyhow...*/
2943                         /* Argument against upgrading: reformat MDT,
2944                            upgrade OST, then OST will start but will be SKIPped
2945                            in client logs.  Maybe error now is better. */
2946                         /* RETURN(-EINVAL); */
2947                 }
2948                 /* end COMPAT_146 */
2949         } else {
2950                 if (rc == EALREADY) {
2951                         LCONSOLE_WARN("Found index %d for %s, updating log\n", 
2952                                       mti->mti_stripe_index, mti->mti_svname);
2953                         /* We would like to mark old log sections as invalid 
2954                            and add new log sections in the client and mdt logs.
2955                            But if we add new sections, then live clients will
2956                            get repeat setup instructions for already running
2957                            osc's. So don't update the client/mdt logs. */
2958                         mti->mti_flags &= ~LDD_F_UPDATE;
2959                 }
2960         }
2961
2962         rc = mgs_find_or_make_fsdb(obd, mti->mti_fsname, &fsdb); 
2963         if (rc) {
2964                 CERROR("Can't get db for %s\n", mti->mti_fsname);
2965                 RETURN(rc);
2966         }
2967
2968         down(&fsdb->fsdb_sem);
2969
2970         if (mti->mti_flags & 
2971             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
2972                 /* Generate a log from scratch */
2973                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2974                         rc = mgs_write_log_mdt(obd, fsdb, mti);
2975                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2976                         rc = mgs_write_log_ost(obd, fsdb, mti);
2977                 } else {
2978                         CERROR("Unknown target type %#x, can't create log for "
2979                                "%s\n", mti->mti_flags, mti->mti_svname);
2980                 }
2981                 if (rc) {
2982                         CERROR("Can't write logs for %s (%d)\n",
2983                                mti->mti_svname, rc);
2984                         GOTO(out_up, rc);
2985                 }
2986         } else {
2987                 /* Just update the params from tunefs in mgs_write_log_params */
2988                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
2989                 mti->mti_flags |= LDD_F_PARAM;
2990         }
2991         
2992         rc = mgs_write_log_params(obd, fsdb, mti);
2993
2994 out_up:
2995         up(&fsdb->fsdb_sem);
2996         RETURN(rc);
2997 }
2998
2999 /* COMPAT_146 */
3000 /* verify that we can handle the old config logs */ 
3001 int mgs_upgrade_sv_14(struct obd_device *obd, struct mgs_target_info *mti)
3002 {
3003         struct fs_db *fsdb;
3004         int rc = 0;
3005         ENTRY;
3006
3007         /* Create ost log normally, as servers register.  Servers 
3008            register with their old uuids (from last_rcvd), so old
3009            (MDT and client) logs should work.
3010          - new MDT won't know about old OSTs, only the ones that have 
3011            registered, so we need the old MDT log to get the LOV right 
3012            in order for old clients to work. 
3013          - Old clients connect to the MDT, not the MGS, for their logs, and 
3014            will therefore receive the old client log from the MDT /LOGS dir. 
3015          - Old clients can continue to use and connect to old or new OSTs
3016          - New clients will contact the MGS for their log 
3017         */
3018
3019         LCONSOLE_INFO("upgrading server %s from pre-1.6\n", mti->mti_svname); 
3020         server_mti_print("upgrade", mti);
3021         
3022         rc = mgs_find_or_make_fsdb(obd, mti->mti_fsname, &fsdb);
3023         if (rc) 
3024                 RETURN(rc);
3025
3026         if (fsdb->fsdb_flags & FSDB_LOG_EMPTY) {
3027                 LCONSOLE_ERROR_MSG(0x14a, "The old client log %s-client is "
3028                                    "missing.  Was tunefs.lustre successful?\n",
3029                                    mti->mti_fsname);
3030                 RETURN(-ENOENT);
3031         }
3032
3033         if (fsdb->fsdb_gen == 0) {
3034                 /* There were no markers in the client log, meaning we have 
3035                    not updated the logs for this fs */
3036                 CDEBUG(D_MGS, "found old, unupdated client log\n");
3037         }
3038
3039         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3040                 if (mgs_log_is_empty(obd, mti->mti_svname)) {
3041                         LCONSOLE_ERROR_MSG(0x14b, "The old MDT log %s is "
3042                                            "missing. Was tunefs.lustre "
3043                                            "successful?\n",
3044                                            mti->mti_svname);
3045                         RETURN(-ENOENT);
3046                 }
3047                 /* We're starting with an old uuid.  Assume old name for lov
3048                    as well since the lov entry already exists in the log. */
3049                 CDEBUG(D_MGS, "old mds uuid %s\n", mti->mti_uuid);
3050                 if (strncmp(mti->mti_uuid, fsdb->fsdb_mdtlov + 4, 
3051                             strlen(fsdb->fsdb_mdtlov) - 4) != 0) {
3052                         CERROR("old mds uuid %s doesn't match log %s (%s)\n",
3053                                mti->mti_uuid, fsdb->fsdb_mdtlov, 
3054                                fsdb->fsdb_mdtlov + 4);
3055                         RETURN(-EINVAL);
3056                 }
3057         }
3058
3059         if (!(fsdb->fsdb_flags & FSDB_OLDLOG14)) {
3060                 LCONSOLE_ERROR_MSG(0x14c, "%s-client is supposedly an old "
3061                                    "log, but no old LOV or MDT was found. "
3062                                    "Consider updating the configuration with"
3063                                    " --writeconf.\n", mti->mti_fsname);
3064         }
3065
3066         RETURN(rc);
3067 }
3068 /* end COMPAT_146 */
3069
3070 int mgs_erase_log(struct obd_device *obd, char *name)
3071 {
3072         struct lvfs_run_ctxt saved;
3073         struct llog_ctxt *ctxt;
3074         struct llog_handle *llh;
3075         int rc = 0;
3076
3077         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
3078         LASSERT(ctxt != NULL);
3079
3080         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3081         rc = llog_create(ctxt, &llh, NULL, name);
3082         if (rc == 0) {
3083                 llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
3084                 rc = llog_destroy(llh);
3085                 llog_free_handle(llh);
3086         }
3087         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3088         llog_ctxt_put(ctxt);
3089
3090         if (rc)
3091                 CERROR("failed to clear log %s: %d\n", name, rc);
3092
3093         return(rc);
3094 }
3095
3096 /* erase all logs for the given fs */
3097 int mgs_erase_logs(struct obd_device *obd, char *fsname)
3098 {
3099         struct mgs_obd *mgs = &obd->u.mgs;
3100         static struct fs_db *fsdb;
3101         struct list_head dentry_list;
3102         struct l_linux_dirent *dirent, *n;
3103         int rc, len = strlen(fsname);
3104         ENTRY;
3105         
3106         /* Find all the logs in the CONFIGS directory */
3107         rc = class_dentry_readdir(obd, mgs->mgs_configs_dir,
3108                                   mgs->mgs_vfsmnt, &dentry_list);
3109         if (rc) {
3110                 CERROR("Can't read %s dir\n", MOUNT_CONFIGS_DIR);
3111                 RETURN(rc);
3112         }
3113                                                                                 
3114         down(&mgs->mgs_sem);
3115         
3116         /* Delete the fs db */
3117         fsdb = mgs_find_fsdb(obd, fsname);
3118         if (fsdb) 
3119                 mgs_free_fsdb(obd, fsdb);
3120
3121         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
3122                 list_del(&dirent->lld_list);
3123                 if (strncmp(fsname, dirent->lld_name, len) == 0) {
3124                         CDEBUG(D_MGS, "Removing log %s\n", dirent->lld_name);
3125                         mgs_erase_log(obd, dirent->lld_name);
3126                 }
3127                 OBD_FREE(dirent, sizeof(*dirent));
3128         }
3129         
3130         up(&mgs->mgs_sem);
3131
3132         RETURN(rc);
3133 }
3134
3135 /* from llog_swab */
3136 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3137 {
3138         int i;
3139         ENTRY;
3140
3141         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3142         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3143
3144         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3145         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3146         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3147         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3148
3149         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3150         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3151                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3152                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3153                                i, lcfg->lcfg_buflens[i], 
3154                                lustre_cfg_string(lcfg, i));
3155                 }
3156         EXIT;
3157 }
3158
3159 /* Set a permanent (config log) param for a target or fs */
3160 int mgs_setparam(struct obd_device *obd, struct lustre_cfg *lcfg, char *fsname)
3161 {
3162         struct fs_db *fsdb;
3163         struct mgs_target_info *mti;
3164         char *devname, *param;
3165         char *ptr, *tmp;
3166         __u32 index;
3167         int rc = 0;
3168         ENTRY;
3169
3170         print_lustre_cfg(lcfg);
3171         
3172         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3173         devname = lustre_cfg_string(lcfg, 0);
3174         param = lustre_cfg_string(lcfg, 1);
3175         if (!devname) {
3176                 /* Assume device name embedded in param:
3177                    lustre-OST0000.osc.max_dirty_mb=32 */
3178                 ptr = strchr(param, '.');
3179                 if (ptr) {
3180                         devname = param;
3181                         *ptr = 0;
3182                         param = ptr + 1;
3183                 }
3184         }
3185         if (!devname) {
3186                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3187                 RETURN(-ENOSYS);
3188         }
3189
3190         /* Extract fsname */
3191         ptr = strrchr(devname, '-');
3192         memset(fsname, 0, MTI_NAME_MAXLEN);
3193         if (ptr && (server_name2index(ptr, &index, NULL) >= 0)) {
3194                 strncpy(fsname, devname, ptr - devname);
3195         } else {
3196                 /* assume devname is the fsname */
3197                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3198         }
3199         fsname[MTI_NAME_MAXLEN - 1] = 0;
3200         CDEBUG(D_MGS, "setparam on fs %s device %s\n", fsname, devname);
3201
3202         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb); 
3203         if (rc) 
3204                 RETURN(rc);
3205         if (fsdb->fsdb_flags & FSDB_LOG_EMPTY) {
3206                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3207                        "is '%s'\n", fsname, devname);
3208                 mgs_free_fsdb(obd, fsdb);
3209                 RETURN(-EINVAL);
3210         }
3211
3212         /* Create a fake mti to hold everything */
3213         OBD_ALLOC_PTR(mti);
3214         if (!mti) 
3215                 GOTO(out, rc = -ENOMEM);
3216         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3217         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3218         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3219         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3220         if (rc < 0) 
3221                 /* Not a valid server; may be only fsname */
3222                 rc = 0;
3223         else
3224                 /* Strip -osc or -mdc suffix from svname */
3225                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname, 
3226                                      mti->mti_svname)) 
3227                         GOTO(out, rc = -EINVAL);
3228
3229         mti->mti_flags = rc | LDD_F_PARAM;
3230
3231         down(&fsdb->fsdb_sem);
3232         rc = mgs_write_log_params(obd, fsdb, mti); 
3233         up(&fsdb->fsdb_sem);
3234
3235 out:
3236         OBD_FREE_PTR(mti);
3237         RETURN(rc);
3238 }
3239
3240 static int mgs_write_log_pool(struct obd_device *obd, char *logname,
3241                               struct fs_db *fsdb, char *lovname,
3242                               enum lcfg_command_type cmd,
3243                               char *poolname, char *fsname,
3244                               char *ostname, char *comment)
3245 {
3246         struct llog_handle *llh = NULL;
3247         int rc;
3248
3249         rc = record_start_log(obd, &llh, logname);
3250         if (rc)
3251                 return rc;
3252         rc = record_marker(obd, llh, fsdb, CM_START, lovname, comment);
3253         record_base(obd, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3254         rc = record_marker(obd, llh, fsdb, CM_END, lovname, comment);
3255         rc = record_end_log(obd, &llh);
3256
3257         return rc;
3258 }
3259
3260 int mgs_pool_cmd(struct obd_device *obd, enum lcfg_command_type cmd,
3261                  char *fsname, char *poolname, char *ostname)
3262 {
3263         struct fs_db *fsdb;
3264         char mdt_index[16];
3265         char *lovname;
3266         char *logname;
3267         char *label = NULL, *canceled_label = NULL;
3268         int label_sz;
3269         struct mgs_target_info *mti = NULL;
3270         int rc, i;
3271         ENTRY;
3272
3273         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3274         if (rc) {
3275                 CERROR("Can't get db for %s\n", fsname);
3276                 RETURN(rc);
3277         }
3278         if (fsdb->fsdb_flags & FSDB_LOG_EMPTY) {
3279                 CERROR("%s is not defined\n", fsname);
3280                 mgs_free_fsdb(obd, fsdb);
3281                 RETURN(-EINVAL);
3282         }
3283
3284         label_sz = 10 + strlen(fsname) + strlen(poolname);
3285
3286         /* check if ostname match fsname */
3287         if (ostname != NULL) {
3288                 char *ptr;
3289
3290                 ptr = strrchr(ostname, '-');
3291                 if ((ptr == NULL) ||
3292                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3293                         RETURN(-EINVAL);
3294                 label_sz += strlen(ostname);
3295         }
3296
3297         OBD_ALLOC(label, label_sz);
3298         if (label == NULL)
3299                 GOTO(out, rc = -ENOMEM);
3300
3301         switch(cmd) {
3302         case LCFG_POOL_NEW: {
3303                 sprintf(label,
3304                         "new %s.%s", fsname, poolname);
3305                 break;
3306         }
3307         case LCFG_POOL_ADD: {
3308                 sprintf(label,
3309                         "add %s.%s.%s", fsname, poolname, ostname);
3310                 break;
3311         }
3312         case LCFG_POOL_REM: {
3313                 OBD_ALLOC(canceled_label, label_sz);
3314                 if (canceled_label == NULL)
3315                          GOTO(out, rc = -ENOMEM);
3316                 sprintf(label,
3317                         "rem %s.%s.%s", fsname, poolname, ostname);
3318                 sprintf(canceled_label,
3319                         "add %s.%s.%s", fsname, poolname, ostname);
3320                 break;
3321         }
3322         case LCFG_POOL_DEL: {
3323                 OBD_ALLOC(canceled_label, label_sz);
3324                 if (canceled_label == NULL)
3325                          GOTO(out, rc = -ENOMEM);
3326                 sprintf(label,
3327                         "del %s.%s", fsname, poolname);
3328                 sprintf(canceled_label,
3329                         "new %s.%s", fsname, poolname);
3330                 break;
3331         }
3332         default: {
3333                 break;
3334         }
3335         }
3336
3337         down(&fsdb->fsdb_sem);
3338
3339         if (canceled_label != NULL) {
3340                 OBD_ALLOC_PTR(mti);
3341                 if (mti == NULL)
3342                         GOTO(out, rc = -ENOMEM);
3343         }
3344
3345         /* loop on all potential MDT */
3346         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3347                  if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3348                         sprintf(mdt_index, "-MDT%04x", i);
3349                         name_create(&logname, fsname, mdt_index);
3350                         name_create(&lovname, logname, "-mdtlov");
3351
3352                         if (canceled_label != NULL) {
3353                                 strcpy(mti->mti_svname, "lov pool");
3354                                 mgs_modify(obd, fsdb, mti, logname, lovname,
3355                                            canceled_label, CM_SKIP);
3356                         }
3357
3358                         mgs_write_log_pool(obd, logname, fsdb, lovname,
3359                                            cmd, fsname, poolname, ostname,
3360                                            label);
3361                         name_destroy(&logname);
3362                         name_destroy(&lovname);
3363                 }
3364         }
3365
3366         name_create(&logname, fsname, "-client");
3367         if (canceled_label != NULL)
3368                 mgs_modify(obd, fsdb, mti, logname, fsdb->fsdb_clilov,
3369                            canceled_label, CM_SKIP);
3370
3371         mgs_write_log_pool(obd, logname, fsdb, fsdb->fsdb_clilov,
3372                            cmd, fsname, poolname, ostname, label);
3373         name_destroy(&logname);
3374
3375         up(&fsdb->fsdb_sem);
3376
3377         EXIT;
3378 out:
3379         if (label != NULL)
3380                 OBD_FREE(label, label_sz);
3381
3382         if (canceled_label != NULL)
3383                 OBD_FREE(canceled_label, label_sz);
3384
3385         if (mti != NULL)
3386                 OBD_FREE_PTR(mti);
3387
3388         return rc;
3389 }
3390
3391 #if 0
3392 /******************** unused *********************/
3393 static int mgs_backup_llog(struct obd_device *obd, char* fsname)
3394 {
3395         struct file *filp, *bak_filp;
3396         struct lvfs_run_ctxt saved;
3397         char *logname, *buf;
3398         loff_t soff = 0 , doff = 0;
3399         int count = 4096, len;
3400         int rc = 0;
3401
3402         OBD_ALLOC(logname, PATH_MAX);
3403         if (logname == NULL)
3404                 return -ENOMEM;
3405
3406         OBD_ALLOC(buf, count);
3407         if (!buf)
3408                 GOTO(out , rc = -ENOMEM);
3409
3410         len = snprintf(logname, PATH_MAX, "%s/%s.bak",
3411                        MOUNT_CONFIGS_DIR, fsname);
3412
3413         if (len >= PATH_MAX - 1) {
3414                 GOTO(out, -ENAMETOOLONG);
3415         } 
3416
3417         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3418                 
3419         bak_filp = l_filp_open(logname, O_RDWR|O_CREAT|O_TRUNC, 0660);
3420         if (IS_ERR(bak_filp)) {
3421                 rc = PTR_ERR(bak_filp);
3422                 CERROR("backup logfile open %s: %d\n", logname, rc);
3423                 GOTO(pop, rc);
3424         }
3425         sprintf(logname, "%s/%s", MOUNT_CONFIGS_DIR, fsname);
3426         filp = l_filp_open(logname, O_RDONLY, 0);
3427         if (IS_ERR(filp)) {
3428                 rc = PTR_ERR(filp);
3429                 CERROR("logfile open %s: %d\n", logname, rc);
3430                 GOTO(close1f, rc);
3431         }
3432
3433         while ((rc = lustre_fread(filp, buf, count, &soff)) > 0) {
3434                 rc = lustre_fwrite(bak_filp, buf, count, &doff);
3435                 break;
3436         }
3437
3438         filp_close(filp, 0);
3439 close1f:
3440         filp_close(bak_filp, 0);
3441 pop:
3442         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3443 out:
3444         if (buf)
3445                 OBD_FREE(buf, count);
3446         OBD_FREE(logname, PATH_MAX);
3447         return rc;
3448 }
3449
3450 #endif