Whamcloud - gitweb
LU-1842 quota: remove leftovers from old code
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 cfs_set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 cfs_set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         cfs_set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         cfs_mutex_lock(&fsdb->fsdb_mutex);
275         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
276         if (rc)
277                 GOTO(out_pop, rc);
278
279         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
280         if (rc)
281                 GOTO(out_close, rc);
282
283         if (llog_get_size(loghandle) <= 1)
284                 cfs_set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
285
286         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
287         CDEBUG(D_INFO, "get_db = %d\n", rc);
288 out_close:
289         llog_close(env, loghandle);
290 out_pop:
291         cfs_mutex_unlock(&fsdb->fsdb_mutex);
292         name_destroy(&logname);
293 out_put:
294         llog_ctxt_put(ctxt);
295
296         RETURN(rc);
297 }
298
299 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
300 {
301         struct mgs_tgt_srpc_conf *tgtconf;
302
303         /* free target-specific rules */
304         while (fsdb->fsdb_srpc_tgt) {
305                 tgtconf = fsdb->fsdb_srpc_tgt;
306                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
307
308                 LASSERT(tgtconf->mtsc_tgt);
309
310                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
311                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
312                 OBD_FREE_PTR(tgtconf);
313         }
314
315         /* free general rules */
316         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
317 }
318
319 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
320 {
321         struct fs_db *fsdb;
322         cfs_list_t *tmp;
323
324         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
325                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
326                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
327                         return fsdb;
328         }
329         return NULL;
330 }
331
332 /* caller must hold the mgs->mgs_fs_db_lock */
333 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
334                                   struct mgs_device *mgs, char *fsname)
335 {
336         struct fs_db *fsdb;
337         int rc;
338         ENTRY;
339
340         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
341                 CERROR("fsname %s is too long\n", fsname);
342                 RETURN(NULL);
343         }
344
345         OBD_ALLOC_PTR(fsdb);
346         if (!fsdb)
347                 RETURN(NULL);
348
349         strcpy(fsdb->fsdb_name, fsname);
350         cfs_mutex_init(&fsdb->fsdb_mutex);
351         cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
352         fsdb->fsdb_gen = 1;
353
354         if (strcmp(fsname, MGSSELF_NAME) == 0) {
355                 cfs_set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
356         } else {
357                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
358                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
359                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
360                         CERROR("No memory for index maps\n");
361                         GOTO(err, rc = -ENOMEM);
362                 }
363
364                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
365                 if (rc)
366                         GOTO(err, rc);
367                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
368                 if (rc)
369                         GOTO(err, rc);
370
371                 /* initialise data for NID table */
372                 mgs_ir_init_fs(env, mgs, fsdb);
373
374                 lproc_mgs_add_live(mgs, fsdb);
375         }
376
377         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
378
379         RETURN(fsdb);
380 err:
381         if (fsdb->fsdb_ost_index_map)
382                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
383         if (fsdb->fsdb_mdt_index_map)
384                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
385         name_destroy(&fsdb->fsdb_clilov);
386         name_destroy(&fsdb->fsdb_clilmv);
387         OBD_FREE_PTR(fsdb);
388         RETURN(NULL);
389 }
390
391 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
392 {
393         /* wait for anyone with the sem */
394         cfs_mutex_lock(&fsdb->fsdb_mutex);
395         lproc_mgs_del_live(mgs, fsdb);
396         cfs_list_del(&fsdb->fsdb_list);
397
398         /* deinitialize fsr */
399         mgs_ir_fini_fs(mgs, fsdb);
400
401         if (fsdb->fsdb_ost_index_map)
402                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
403         if (fsdb->fsdb_mdt_index_map)
404                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
405         name_destroy(&fsdb->fsdb_clilov);
406         name_destroy(&fsdb->fsdb_clilmv);
407         mgs_free_fsdb_srpc(fsdb);
408         cfs_mutex_unlock(&fsdb->fsdb_mutex);
409         OBD_FREE_PTR(fsdb);
410 }
411
412 int mgs_init_fsdb_list(struct mgs_device *mgs)
413 {
414         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
415         return 0;
416 }
417
418 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
419 {
420         struct fs_db *fsdb;
421         cfs_list_t *tmp, *tmp2;
422         cfs_mutex_lock(&mgs->mgs_mutex);
423         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
424                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
425                 mgs_free_fsdb(mgs, fsdb);
426         }
427         cfs_mutex_unlock(&mgs->mgs_mutex);
428         return 0;
429 }
430
431 int mgs_find_or_make_fsdb(const struct lu_env *env,
432                           struct mgs_device *mgs, char *name,
433                           struct fs_db **dbh)
434 {
435         struct fs_db *fsdb;
436         int rc = 0;
437
438         cfs_mutex_lock(&mgs->mgs_mutex);
439         fsdb = mgs_find_fsdb(mgs, name);
440         if (fsdb) {
441                 cfs_mutex_unlock(&mgs->mgs_mutex);
442                 *dbh = fsdb;
443                 return 0;
444         }
445
446         CDEBUG(D_MGS, "Creating new db\n");
447         fsdb = mgs_new_fsdb(env, mgs, name);
448         cfs_mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 return -ENOMEM;
451
452         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         mgs_free_fsdb(mgs, fsdb);
458                         return rc;
459                 }
460         }
461
462         /* populate srpc rules from params llog */
463         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
464         if (rc) {
465                 CERROR("Can't get db from params log %d\n", rc);
466                 mgs_free_fsdb(mgs, fsdb);
467                 return rc;
468         }
469
470         *dbh = fsdb;
471
472         return 0;
473 }
474
475 /* 1 = index in use
476    0 = index unused
477    -1= empty client log */
478 int mgs_check_index(const struct lu_env *env,
479                     struct mgs_device *mgs,
480                     struct mgs_target_info *mti)
481 {
482         struct fs_db *fsdb;
483         void *imap;
484         int rc = 0;
485         ENTRY;
486
487         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
488
489         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
490         if (rc) {
491                 CERROR("Can't get db for %s\n", mti->mti_fsname);
492                 RETURN(rc);
493         }
494
495         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
496                 RETURN(-1);
497
498         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
499                 imap = fsdb->fsdb_ost_index_map;
500         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
501                 imap = fsdb->fsdb_mdt_index_map;
502         else
503                 RETURN(-EINVAL);
504
505         if (cfs_test_bit(mti->mti_stripe_index, imap))
506                 RETURN(1);
507         RETURN(0);
508 }
509
510 static __inline__ int next_index(void *index_map, int map_len)
511 {
512         int i;
513         for (i = 0; i < map_len * 8; i++)
514                  if (!cfs_test_bit(i, index_map)) {
515                          return i;
516                  }
517         CERROR("max index %d exceeded.\n", i);
518         return -1;
519 }
520
521 /* Return codes:
522         0  newly marked as in use
523         <0 err
524         +EALREADY for update of an old index */
525 static int mgs_set_index(const struct lu_env *env,
526                          struct mgs_device *mgs,
527                          struct mgs_target_info *mti)
528 {
529         struct fs_db *fsdb;
530         void *imap;
531         int rc = 0;
532         ENTRY;
533
534         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
535         if (rc) {
536                 CERROR("Can't get db for %s\n", mti->mti_fsname);
537                 RETURN(rc);
538         }
539
540         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
541                 imap = fsdb->fsdb_ost_index_map;
542         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
543                 imap = fsdb->fsdb_mdt_index_map;
544                 if (fsdb->fsdb_mdt_count >= MAX_MDT_COUNT) {
545                         LCONSOLE_ERROR_MSG(0x13f, "The max mdt count"
546                                            "is %d\n", (int)MAX_MDT_COUNT);
547                         RETURN(-ERANGE);
548                 }
549         } else {
550                 RETURN(-EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         RETURN(-ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
563                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
564                                    "but the max index is %d.\n",
565                                    mti->mti_svname, mti->mti_stripe_index,
566                                    INDEX_MAP_SIZE * 8);
567                 RETURN(-ERANGE);
568         }
569
570         if (cfs_test_bit(mti->mti_stripe_index, imap)) {
571                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
572                     !(mti->mti_flags & LDD_F_WRITECONF)) {
573                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
574                                            "%d, but that index is already in "
575                                            "use. Use --writeconf to force\n",
576                                            mti->mti_svname,
577                                            mti->mti_stripe_index);
578                         RETURN(-EADDRINUSE);
579                 } else {
580                         CDEBUG(D_MGS, "Server %s updating index %d\n",
581                                mti->mti_svname, mti->mti_stripe_index);
582                         RETURN(EALREADY);
583                 }
584         }
585
586         cfs_set_bit(mti->mti_stripe_index, imap);
587         cfs_clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
588         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
589                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
590
591         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
592                mti->mti_stripe_index);
593
594         RETURN(0);
595 }
596
597 struct mgs_modify_lookup {
598         struct cfg_marker mml_marker;
599         int               mml_modified;
600 };
601
602 static int mgs_modify_handler(const struct lu_env *env,
603                               struct llog_handle *llh,
604                               struct llog_rec_hdr *rec, void *data)
605 {
606         struct mgs_modify_lookup *mml = data;
607         struct cfg_marker *marker;
608         struct lustre_cfg *lcfg = (struct lustre_cfg *)(rec + 1);
609         int cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
610                 sizeof(struct llog_rec_tail);
611         int rc;
612         ENTRY;
613
614         if (rec->lrh_type != OBD_CFG_REC) {
615                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
616                 RETURN(-EINVAL);
617         }
618
619         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
620         if (rc) {
621                 CERROR("Insane cfg\n");
622                 RETURN(rc);
623         }
624
625         /* We only care about markers */
626         if (lcfg->lcfg_command != LCFG_MARKER)
627                 RETURN(0);
628
629         marker = lustre_cfg_buf(lcfg, 1);
630         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
631             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
632             !(marker->cm_flags & CM_SKIP)) {
633                 /* Found a non-skipped marker match */
634                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
635                        rec->lrh_index, marker->cm_step,
636                        marker->cm_flags, mml->mml_marker.cm_flags,
637                        marker->cm_tgtname, marker->cm_comment);
638                 /* Overwrite the old marker llog entry */
639                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
640                 marker->cm_flags |= mml->mml_marker.cm_flags;
641                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
642                 /* Header and tail are added back to lrh_len in
643                    llog_lvfs_write_rec */
644                 rec->lrh_len = cfg_len;
645                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
646                                 rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(cfs_mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         strcpy(mml->mml_marker.cm_comment, comment);
696         strcpy(mml->mml_marker.cm_tgtname, devname);
697         /* Modify mostly means cancel */
698         mml->mml_marker.cm_flags = flags;
699         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
700         mml->mml_modified = 0;
701         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
702                           NULL);
703         if (!rc && !mml->mml_modified)
704                 rc = 1;
705         OBD_FREE_PTR(mml);
706
707 out_close:
708         llog_close(env, loghandle);
709 out_pop:
710         if (rc < 0)
711                 CERROR("%s: modify %s/%s failed: rc = %d\n",
712                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
713         llog_ctxt_put(ctxt);
714         RETURN(rc);
715 }
716
717 /******************** config log recording functions *********************/
718
719 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
720                        struct lustre_cfg *lcfg)
721 {
722         struct llog_rec_hdr      rec;
723         int                      buflen, rc;
724
725         if (!lcfg || !llh)
726                 return -ENOMEM;
727
728         LASSERT(llh->lgh_ctxt);
729
730         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
731                                 lcfg->lcfg_buflens);
732         rec.lrh_len = llog_data_len(buflen);
733         rec.lrh_type = OBD_CFG_REC;
734
735         /* idx = -1 means append */
736         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
737         if (rc)
738                 CERROR("failed %d\n", rc);
739         return rc;
740 }
741
742 static int record_base(const struct lu_env *env, struct llog_handle *llh,
743                      char *cfgname, lnet_nid_t nid, int cmd,
744                      char *s1, char *s2, char *s3, char *s4)
745 {
746         struct mgs_thread_info *mgi = mgs_env_info(env);
747         struct lustre_cfg     *lcfg;
748         int rc;
749
750         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
751                cmd, s1, s2, s3, s4);
752
753         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
754         if (s1)
755                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
756         if (s2)
757                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
758         if (s3)
759                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
760         if (s4)
761                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
762
763         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
764         if (!lcfg)
765                 return -ENOMEM;
766         lcfg->lcfg_nid = nid;
767
768         rc = record_lcfg(env, llh, lcfg);
769
770         lustre_cfg_free(lcfg);
771
772         if (rc) {
773                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
774                        cmd, s1, s2, s3, s4);
775         }
776         return rc;
777 }
778
779
780 static inline int record_add_uuid(const struct lu_env *env,
781                                   struct llog_handle *llh,
782                                   uint64_t nid, char *uuid)
783 {
784         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
785
786 }
787
788 static inline int record_add_conn(const struct lu_env *env,
789                                   struct llog_handle *llh,
790                                   char *devname, char *uuid)
791 {
792         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
793 }
794
795 static inline int record_attach(const struct lu_env *env,
796                                 struct llog_handle *llh, char *devname,
797                                 char *type, char *uuid)
798 {
799         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
800 }
801
802 static inline int record_setup(const struct lu_env *env,
803                                struct llog_handle *llh, char *devname,
804                                char *s1, char *s2, char *s3, char *s4)
805 {
806         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
807 }
808
809 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
810                             char *devname, struct lov_desc *desc)
811 {
812         struct mgs_thread_info *mgi = mgs_env_info(env);
813         struct lustre_cfg *lcfg;
814         int rc;
815
816         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
817         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
818         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
819         if (!lcfg)
820                 return -ENOMEM;
821         rc = record_lcfg(env, llh, lcfg);
822
823         lustre_cfg_free(lcfg);
824         return rc;
825 }
826
827 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
828                             char *devname, struct lmv_desc *desc)
829 {
830         struct mgs_thread_info *mgi = mgs_env_info(env);
831         struct lustre_cfg *lcfg;
832         int rc;
833
834         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
835         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
836         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
837
838         rc = record_lcfg(env, llh, lcfg);
839
840         lustre_cfg_free(lcfg);
841         return rc;
842 }
843
844 static inline int record_mdc_add(const struct lu_env *env,
845                                  struct llog_handle *llh,
846                                  char *logname, char *mdcuuid,
847                                  char *mdtuuid, char *index,
848                                  char *gen)
849 {
850         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
851                            mdtuuid,index,gen,mdcuuid);
852 }
853
854 static inline int record_lov_add(const struct lu_env *env,
855                                  struct llog_handle *llh,
856                                  char *lov_name, char *ost_uuid,
857                                  char *index, char *gen)
858 {
859         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
860                            ost_uuid,index,gen,0);
861 }
862
863 static inline int record_mount_opt(const struct lu_env *env,
864                                    struct llog_handle *llh,
865                                    char *profile, char *lov_name,
866                                    char *mdc_name)
867 {
868         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
869                            profile,lov_name,mdc_name,0);
870 }
871
872 static int record_marker(const struct lu_env *env,
873                          struct llog_handle *llh,
874                          struct fs_db *fsdb, __u32 flags,
875                          char *tgtname, char *comment)
876 {
877         struct mgs_thread_info *mgi = mgs_env_info(env);
878         struct lustre_cfg *lcfg;
879         int rc;
880
881         if (flags & CM_START)
882                 fsdb->fsdb_gen++;
883         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
884         mgi->mgi_marker.cm_flags = flags;
885         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
886         strncpy(mgi->mgi_marker.cm_tgtname, tgtname,
887                 sizeof(mgi->mgi_marker.cm_tgtname));
888         strncpy(mgi->mgi_marker.cm_comment, comment,
889                 sizeof(mgi->mgi_marker.cm_comment));
890         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
891         mgi->mgi_marker.cm_canceltime = 0;
892         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
893         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
894                             sizeof(mgi->mgi_marker));
895         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
896         if (!lcfg)
897                 return -ENOMEM;
898         rc = record_lcfg(env, llh, lcfg);
899
900         lustre_cfg_free(lcfg);
901         return rc;
902 }
903
904 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
905                             struct llog_handle **llh, char *name)
906 {
907         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
908         struct llog_ctxt        *ctxt;
909         int                      rc = 0;
910
911         if (*llh)
912                 GOTO(out, rc = -EBUSY);
913
914         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
915         if (!ctxt)
916                 GOTO(out, rc = -ENODEV);
917         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
918
919         rc = llog_open_create(env, ctxt, llh, NULL, name);
920         if (rc)
921                 GOTO(out_ctxt, rc);
922         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
923         if (rc)
924                 llog_close(env, *llh);
925 out_ctxt:
926         llog_ctxt_put(ctxt);
927 out:
928         if (rc) {
929                 CERROR("%s: can't start log %s: rc = %d\n",
930                        mgs->mgs_obd->obd_name, name, rc);
931                 *llh = NULL;
932         }
933         RETURN(rc);
934 }
935
936 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
937 {
938         int rc;
939
940         rc = llog_close(env, *llh);
941         *llh = NULL;
942
943         return rc;
944 }
945
946 static int mgs_log_is_empty(const struct lu_env *env,
947                             struct mgs_device *mgs, char *name)
948 {
949         struct llog_handle *llh;
950         struct llog_ctxt *ctxt;
951         int rc = 0;
952
953         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
954         LASSERT(ctxt != NULL);
955         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
956         if (rc < 0) {
957                 if (rc == -ENOENT)
958                         rc = 0;
959                 GOTO(out_ctxt, rc);
960         }
961
962         llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
963         if (rc)
964                 GOTO(out_close, rc);
965         rc = llog_get_size(llh);
966
967 out_close:
968         llog_close(env, llh);
969 out_ctxt:
970         llog_ctxt_put(ctxt);
971         /* header is record 1 */
972         return (rc <= 1);
973 }
974
975 /******************** config "macros" *********************/
976
977 /* write an lcfg directly into a log (with markers) */
978 static int mgs_write_log_direct(const struct lu_env *env,
979                                 struct mgs_device *mgs, struct fs_db *fsdb,
980                                 char *logname, struct lustre_cfg *lcfg,
981                                 char *devname, char *comment)
982 {
983         struct llog_handle *llh = NULL;
984         int rc;
985         ENTRY;
986
987         if (!lcfg)
988                 RETURN(-ENOMEM);
989
990         rc = record_start_log(env, mgs, &llh, logname);
991         if (rc)
992                 RETURN(rc);
993
994         /* FIXME These should be a single journal transaction */
995         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
996         if (rc)
997                 GOTO(out_end, rc);
998         rc = record_lcfg(env, llh, lcfg);
999         if (rc)
1000                 GOTO(out_end, rc);
1001         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1002         if (rc)
1003                 GOTO(out_end, rc);
1004 out_end:
1005         record_end_log(env, &llh);
1006         RETURN(rc);
1007 }
1008
1009 /* write the lcfg in all logs for the given fs */
1010 int mgs_write_log_direct_all(const struct lu_env *env,
1011                              struct mgs_device *mgs,
1012                              struct fs_db *fsdb,
1013                              struct mgs_target_info *mti,
1014                              struct lustre_cfg *lcfg,
1015                              char *devname, char *comment,
1016                              int server_only)
1017 {
1018         cfs_list_t list;
1019         struct mgs_direntry *dirent, *n;
1020         char *fsname = mti->mti_fsname;
1021         char *logname;
1022         int rc = 0, len = strlen(fsname);
1023         ENTRY;
1024
1025         /* We need to set params for any future logs
1026            as well. FIXME Append this file to every new log.
1027            Actually, we should store as params (text), not llogs.  Or
1028            in a database. */
1029         rc = name_create(&logname, fsname, "-params");
1030         if (rc)
1031                 RETURN(rc);
1032         if (mgs_log_is_empty(env, mgs, logname)) {
1033                 struct llog_handle *llh = NULL;
1034                 rc = record_start_log(env, mgs, &llh, logname);
1035                 record_end_log(env, &llh);
1036         }
1037         name_destroy(&logname);
1038         if (rc)
1039                 RETURN(rc);
1040
1041         /* Find all the logs in the CONFIGS directory */
1042         rc = class_dentry_readdir(env, mgs, &list);
1043         if (rc)
1044                 RETURN(rc);
1045
1046         /* Could use fsdb index maps instead of directory listing */
1047         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1048                 cfs_list_del(&dirent->list);
1049                 /* don't write to sptlrpc rule log */
1050                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1051                         goto next;
1052
1053                 /* caller wants write server logs only */
1054                 if (server_only && strstr(dirent->name, "-client") != NULL)
1055                         goto next;
1056
1057                 if (strncmp(fsname, dirent->name, len) == 0) {
1058                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1059                         /* Erase any old settings of this same parameter */
1060                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1061                                         devname, comment, CM_SKIP);
1062                         if (rc < 0)
1063                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1064                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1065                         /* Write the new one */
1066                         if (lcfg) {
1067                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1068                                                           dirent->name,
1069                                                           lcfg, devname,
1070                                                           comment);
1071                                 if (rc)
1072                                         CERROR("%s: writing log %s: rc = %d\n",
1073                                                mgs->mgs_obd->obd_name,
1074                                                dirent->name, rc);
1075                         }
1076                 }
1077 next:
1078                 mgs_direntry_free(dirent);
1079         }
1080
1081         RETURN(rc);
1082 }
1083
1084 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1085                                     struct mgs_device *mgs,
1086                                     struct fs_db *fsdb,
1087                                     struct mgs_target_info *mti,
1088                                     char *logname);
1089 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1090                                     struct mgs_device *mgs,
1091                                     struct fs_db *fsdb,
1092                                     struct mgs_target_info *mti,
1093                                     char *logname, char *suffix, char *lovname,
1094                                     enum lustre_sec_part sec_part, int flags);
1095 static int name_create_mdt_and_lov(char **logname, char **lovname,
1096                                    struct fs_db *fsdb, int i);
1097
1098 static int mgs_steal_llog_handler(const struct lu_env *env,
1099                                   struct llog_handle *llh,
1100                                   struct llog_rec_hdr *rec, void *data)
1101 {
1102         struct mgs_device *mgs;
1103         struct obd_device *obd;
1104         struct mgs_target_info *mti, *tmti;
1105         struct fs_db *fsdb;
1106         int cfg_len = rec->lrh_len;
1107         char *cfg_buf = (char*) (rec + 1);
1108         struct lustre_cfg *lcfg;
1109         int rc = 0;
1110         struct llog_handle *mdt_llh = NULL;
1111         static int got_an_osc_or_mdc = 0;
1112         /* 0: not found any osc/mdc;
1113            1: found osc;
1114            2: found mdc;
1115         */
1116         static int last_step = -1;
1117
1118         ENTRY;
1119
1120         mti = ((struct temp_comp*)data)->comp_mti;
1121         tmti = ((struct temp_comp*)data)->comp_tmti;
1122         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1123         obd = ((struct temp_comp *)data)->comp_obd;
1124         mgs = lu2mgs_dev(obd->obd_lu_dev);
1125         LASSERT(mgs);
1126
1127         if (rec->lrh_type != OBD_CFG_REC) {
1128                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1129                 RETURN(-EINVAL);
1130         }
1131
1132         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1133         if (rc) {
1134                 CERROR("Insane cfg\n");
1135                 RETURN(rc);
1136         }
1137
1138         lcfg = (struct lustre_cfg *)cfg_buf;
1139
1140         if (lcfg->lcfg_command == LCFG_MARKER) {
1141                 struct cfg_marker *marker;
1142                 marker = lustre_cfg_buf(lcfg, 1);
1143                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1144                     (marker->cm_flags & CM_START)){
1145                         got_an_osc_or_mdc = 1;
1146                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1147                                 sizeof(tmti->mti_svname));
1148                         rc = record_start_log(env, mgs, &mdt_llh,
1149                                               mti->mti_svname);
1150                         if (rc)
1151                                 RETURN(rc);
1152                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1153                                            mti->mti_svname,"add osc(copied)");
1154                         record_end_log(env, &mdt_llh);
1155                         last_step = marker->cm_step;
1156                         RETURN(rc);
1157                 }
1158                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1159                     (marker->cm_flags & CM_END)){
1160                         LASSERT(last_step == marker->cm_step);
1161                         last_step = -1;
1162                         got_an_osc_or_mdc = 0;
1163                         rc = record_start_log(env, mgs, &mdt_llh,
1164                                               mti->mti_svname);
1165                         if (rc)
1166                                 RETURN(rc);
1167                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1168                                            mti->mti_svname,"add osc(copied)");
1169                         record_end_log(env, &mdt_llh);
1170                         RETURN(rc);
1171                 }
1172                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1173                     (marker->cm_flags & CM_START)){
1174                         got_an_osc_or_mdc = 2;
1175                         last_step = marker->cm_step;
1176                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1177                                strlen(marker->cm_tgtname));
1178
1179                         RETURN(rc);
1180                 }
1181                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1182                     (marker->cm_flags & CM_END)){
1183                         LASSERT(last_step == marker->cm_step);
1184                         last_step = -1;
1185                         got_an_osc_or_mdc = 0;
1186                         RETURN(rc);
1187                 }
1188         }
1189
1190         if (got_an_osc_or_mdc == 0 || last_step < 0)
1191                 RETURN(rc);
1192
1193         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1194                 uint64_t nodenid;
1195                 nodenid = lcfg->lcfg_nid;
1196
1197                 tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1198                 tmti->mti_nid_count++;
1199
1200                 RETURN(rc);
1201         }
1202
1203         if (lcfg->lcfg_command == LCFG_SETUP) {
1204                 char *target;
1205
1206                 target = lustre_cfg_string(lcfg, 1);
1207                 memcpy(tmti->mti_uuid, target, strlen(target));
1208                 RETURN(rc);
1209         }
1210
1211         /* ignore client side sptlrpc_conf_log */
1212         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1213                 RETURN(rc);
1214
1215         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1216                 int index;
1217
1218                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1219                         RETURN (-EINVAL);
1220
1221                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1222                        strlen(mti->mti_fsname));
1223                 tmti->mti_stripe_index = index;
1224
1225                 rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, tmti,
1226                                               mti->mti_svname);
1227                 memset(tmti, 0, sizeof(*tmti));
1228                 RETURN(rc);
1229         }
1230
1231         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1232                 int index;
1233                 char mdt_index[9];
1234                 char *logname, *lovname;
1235
1236                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1237                                              mti->mti_stripe_index);
1238                 if (rc)
1239                         RETURN(rc);
1240                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1241
1242                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1243                         name_destroy(&logname);
1244                         name_destroy(&lovname);
1245                         RETURN(-EINVAL);
1246                 }
1247
1248                 tmti->mti_stripe_index = index;
1249                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1250                                          mdt_index, lovname,
1251                                          LUSTRE_SP_MDT, 0);
1252                 name_destroy(&logname);
1253                 name_destroy(&lovname);
1254                 RETURN(rc);
1255         }
1256         RETURN(rc);
1257 }
1258
1259 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1260 /* stealed from mgs_get_fsdb_from_llog*/
1261 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1262                                               struct mgs_device *mgs,
1263                                               char *client_name,
1264                                               struct temp_comp* comp)
1265 {
1266         struct llog_handle *loghandle;
1267         struct mgs_target_info *tmti;
1268         struct llog_ctxt *ctxt;
1269         int rc;
1270
1271         ENTRY;
1272
1273         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1274         LASSERT(ctxt != NULL);
1275
1276         OBD_ALLOC_PTR(tmti);
1277         if (tmti == NULL)
1278                 GOTO(out_ctxt, rc = -ENOMEM);
1279
1280         comp->comp_tmti = tmti;
1281         comp->comp_obd = mgs->mgs_obd;
1282
1283         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1284                        LLOG_OPEN_EXISTS);
1285         if (rc < 0) {
1286                 if (rc == -ENOENT)
1287                         rc = 0;
1288                 GOTO(out_pop, rc);
1289         }
1290
1291         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1292         if (rc)
1293                 GOTO(out_close, rc);
1294
1295         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1296                                   (void *)comp, NULL, false);
1297         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1298 out_close:
1299         llog_close(env, loghandle);
1300 out_pop:
1301         OBD_FREE_PTR(tmti);
1302 out_ctxt:
1303         llog_ctxt_put(ctxt);
1304         RETURN(rc);
1305 }
1306
1307 /* lmv is the second thing for client logs */
1308 /* copied from mgs_write_log_lov. Please refer to that.  */
1309 static int mgs_write_log_lmv(const struct lu_env *env,
1310                              struct mgs_device *mgs,
1311                              struct fs_db *fsdb,
1312                              struct mgs_target_info *mti,
1313                              char *logname, char *lmvname)
1314 {
1315         struct llog_handle *llh = NULL;
1316         struct lmv_desc *lmvdesc;
1317         char *uuid;
1318         int rc = 0;
1319         ENTRY;
1320
1321         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1322
1323         OBD_ALLOC_PTR(lmvdesc);
1324         if (lmvdesc == NULL)
1325                 RETURN(-ENOMEM);
1326         lmvdesc->ld_active_tgt_count = 0;
1327         lmvdesc->ld_tgt_count = 0;
1328         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1329         uuid = (char *)lmvdesc->ld_uuid.uuid;
1330
1331         rc = record_start_log(env, mgs, &llh, logname);
1332         if (rc)
1333                 GOTO(out_free, rc);
1334         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1335         if (rc)
1336                 GOTO(out_end, rc);
1337         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1338         if (rc)
1339                 GOTO(out_end, rc);
1340         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1341         if (rc)
1342                 GOTO(out_end, rc);
1343         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1344         if (rc)
1345                 GOTO(out_end, rc);
1346 out_end:
1347         record_end_log(env, &llh);
1348 out_free:
1349         OBD_FREE_PTR(lmvdesc);
1350         RETURN(rc);
1351 }
1352
1353 /* lov is the first thing in the mdt and client logs */
1354 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1355                              struct fs_db *fsdb, struct mgs_target_info *mti,
1356                              char *logname, char *lovname)
1357 {
1358         struct llog_handle *llh = NULL;
1359         struct lov_desc *lovdesc;
1360         char *uuid;
1361         int rc = 0;
1362         ENTRY;
1363
1364         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1365
1366         /*
1367         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1368         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1369               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1370         */
1371
1372         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1373         OBD_ALLOC_PTR(lovdesc);
1374         if (lovdesc == NULL)
1375                 RETURN(-ENOMEM);
1376         lovdesc->ld_magic = LOV_DESC_MAGIC;
1377         lovdesc->ld_tgt_count = 0;
1378         /* Defaults.  Can be changed later by lcfg config_param */
1379         lovdesc->ld_default_stripe_count = 1;
1380         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1381         lovdesc->ld_default_stripe_size = 1024 * 1024;
1382         lovdesc->ld_default_stripe_offset = -1;
1383         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1384         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1385         /* can these be the same? */
1386         uuid = (char *)lovdesc->ld_uuid.uuid;
1387
1388         /* This should always be the first entry in a log.
1389         rc = mgs_clear_log(obd, logname); */
1390         rc = record_start_log(env, mgs, &llh, logname);
1391         if (rc)
1392                 GOTO(out_free, rc);
1393         /* FIXME these should be a single journal transaction */
1394         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1395         if (rc)
1396                 GOTO(out_end, rc);
1397         rc = record_attach(env, llh, lovname, "lov", uuid);
1398         if (rc)
1399                 GOTO(out_end, rc);
1400         rc = record_lov_setup(env, llh, lovname, lovdesc);
1401         if (rc)
1402                 GOTO(out_end, rc);
1403         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1404         if (rc)
1405                 GOTO(out_end, rc);
1406         EXIT;
1407 out_end:
1408         record_end_log(env, &llh);
1409 out_free:
1410         OBD_FREE_PTR(lovdesc);
1411         return rc;
1412 }
1413
1414 /* add failnids to open log */
1415 static int mgs_write_log_failnids(const struct lu_env *env,
1416                                   struct mgs_target_info *mti,
1417                                   struct llog_handle *llh,
1418                                   char *cliname)
1419 {
1420         char *failnodeuuid = NULL;
1421         char *ptr = mti->mti_params;
1422         lnet_nid_t nid;
1423         int rc = 0;
1424
1425         /*
1426         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1427         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1428         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1429         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1430         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1431         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1432         */
1433
1434         /* Pull failnid info out of params string */
1435         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1436                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1437                         if (failnodeuuid == NULL) {
1438                                 /* We don't know the failover node name,
1439                                    so just use the first nid as the uuid */
1440                                 rc = name_create(&failnodeuuid,
1441                                                  libcfs_nid2str(nid), "");
1442                                 if (rc)
1443                                         return rc;
1444                         }
1445                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1446                                "client %s\n", libcfs_nid2str(nid),
1447                                failnodeuuid, cliname);
1448                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1449                 }
1450                 if (failnodeuuid)
1451                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1452         }
1453
1454         name_destroy(&failnodeuuid);
1455         return rc;
1456 }
1457
1458 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1459                                     struct mgs_device *mgs,
1460                                     struct fs_db *fsdb,
1461                                     struct mgs_target_info *mti,
1462                                     char *logname, char *lmvname)
1463 {
1464         struct llog_handle *llh = NULL;
1465         char *mdcname = NULL;
1466         char *nodeuuid = NULL;
1467         char *mdcuuid = NULL;
1468         char *lmvuuid = NULL;
1469         char index[6];
1470         int i, rc;
1471         ENTRY;
1472
1473         if (mgs_log_is_empty(env, mgs, logname)) {
1474                 CERROR("log is empty! Logical error\n");
1475                 RETURN(-EINVAL);
1476         }
1477
1478         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1479                mti->mti_svname, logname, lmvname);
1480
1481         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1482         if (rc)
1483                 RETURN(rc);
1484         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1485         if (rc)
1486                 GOTO(out_free, rc);
1487         rc = name_create(&mdcuuid, mdcname, "_UUID");
1488         if (rc)
1489                 GOTO(out_free, rc);
1490         rc = name_create(&lmvuuid, lmvname, "_UUID");
1491         if (rc)
1492                 GOTO(out_free, rc);
1493
1494         rc = record_start_log(env, mgs, &llh, logname);
1495         if (rc)
1496                 GOTO(out_free, rc);
1497         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1498                            "add mdc");
1499         if (rc)
1500                 GOTO(out_end, rc);
1501         for (i = 0; i < mti->mti_nid_count; i++) {
1502                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1503                        libcfs_nid2str(mti->mti_nids[i]));
1504
1505                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1506                 if (rc)
1507                         GOTO(out_end, rc);
1508         }
1509
1510         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1511         if (rc)
1512                 GOTO(out_end, rc);
1513         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1514         if (rc)
1515                 GOTO(out_end, rc);
1516         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1517         if (rc)
1518                 GOTO(out_end, rc);
1519         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1520         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1521                             index, "1");
1522         if (rc)
1523                 GOTO(out_end, rc);
1524         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1525                            "add mdc");
1526         if (rc)
1527                 GOTO(out_end, rc);
1528 out_end:
1529         record_end_log(env, &llh);
1530 out_free:
1531         name_destroy(&lmvuuid);
1532         name_destroy(&mdcuuid);
1533         name_destroy(&mdcname);
1534         name_destroy(&nodeuuid);
1535         RETURN(rc);
1536 }
1537
1538 /* add new mdc to already existent MDS */
1539 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1540                                     struct mgs_device *mgs,
1541                                     struct fs_db *fsdb,
1542                                     struct mgs_target_info *mti,
1543                                     char *logname)
1544 {
1545         struct llog_handle *llh = NULL;
1546         char *nodeuuid = NULL;
1547         char *mdcname = NULL;
1548         char *mdcuuid = NULL;
1549         char *mdtuuid = NULL;
1550         int idx = mti->mti_stripe_index;
1551         char index[9];
1552         int i, rc;
1553
1554         ENTRY;
1555         if (mgs_log_is_empty(env, mgs, logname)) {
1556                 CERROR("log is empty! Logical error\n");
1557                 RETURN (-EINVAL);
1558         }
1559
1560         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
1561
1562         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1563         if (rc)
1564                 RETURN(rc);
1565         snprintf(index, sizeof(index), "-mdc%04x", idx);
1566         rc = name_create(&mdcname, logname, index);
1567         if (rc)
1568                 GOTO(out_free, rc);
1569         rc = name_create(&mdcuuid, mdcname, "_UUID");
1570         if (rc)
1571                 GOTO(out_free, rc);
1572         rc = name_create(&mdtuuid, logname, "_UUID");
1573         if (rc)
1574                 GOTO(out_free, rc);
1575
1576         rc = record_start_log(env, mgs, &llh, logname);
1577         if (rc)
1578                 GOTO(out_free, rc);
1579         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
1580         if (rc)
1581                 GOTO(out_end, rc);
1582         for (i = 0; i < mti->mti_nid_count; i++) {
1583                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1584                        libcfs_nid2str(mti->mti_nids[i]));
1585                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1586                 if (rc)
1587                         GOTO(out_end, rc);
1588         }
1589         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
1590         if (rc)
1591                 GOTO(out_end, rc);
1592         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1593         if (rc)
1594                 GOTO(out_end, rc);
1595         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1596         if (rc)
1597                 GOTO(out_end, rc);
1598         snprintf(index, sizeof(index), "%d", idx);
1599
1600         rc = record_mdc_add(env, llh, logname, mdcuuid, mti->mti_uuid,
1601                             index, "1");
1602         if (rc)
1603                 GOTO(out_end, rc);
1604         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add mdc");
1605         if (rc)
1606                 GOTO(out_end, rc);
1607 out_end:
1608         record_end_log(env, &llh);
1609 out_free:
1610         name_destroy(&mdtuuid);
1611         name_destroy(&mdcuuid);
1612         name_destroy(&mdcname);
1613         name_destroy(&nodeuuid);
1614         RETURN(rc);
1615 }
1616
1617 static int mgs_write_log_mdt0(const struct lu_env *env,
1618                               struct mgs_device *mgs,
1619                               struct fs_db *fsdb,
1620                               struct mgs_target_info *mti)
1621 {
1622         char *log = mti->mti_svname;
1623         struct llog_handle *llh = NULL;
1624         char *uuid, *lovname;
1625         char mdt_index[6];
1626         char *ptr = mti->mti_params;
1627         int rc = 0, failout = 0;
1628         ENTRY;
1629
1630         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
1631         if (uuid == NULL)
1632                 RETURN(-ENOMEM);
1633
1634         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1635                 failout = (strncmp(ptr, "failout", 7) == 0);
1636
1637         rc = name_create(&lovname, log, "-mdtlov");
1638         if (rc)
1639                 GOTO(out_free, rc);
1640         if (mgs_log_is_empty(env, mgs, log)) {
1641                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
1642                 if (rc)
1643                         GOTO(out_lod, rc);
1644         }
1645
1646         sprintf(mdt_index, "%d", mti->mti_stripe_index);
1647
1648         rc = record_start_log(env, mgs, &llh, log);
1649         if (rc)
1650                 GOTO(out_lod, rc);
1651
1652         /* add MDT itself */
1653
1654         /* FIXME this whole fn should be a single journal transaction */
1655         sprintf(uuid, "%s_UUID", log);
1656         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
1657         if (rc)
1658                 GOTO(out_lod, rc);
1659         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
1660         if (rc)
1661                 GOTO(out_end, rc);
1662         rc = record_mount_opt(env, llh, log, lovname, NULL);
1663         if (rc)
1664                 GOTO(out_end, rc);
1665         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
1666                         failout ? "n" : "f");
1667         if (rc)
1668                 GOTO(out_end, rc);
1669         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
1670         if (rc)
1671                 GOTO(out_end, rc);
1672 out_end:
1673         record_end_log(env, &llh);
1674 out_lod:
1675         name_destroy(&lovname);
1676 out_free:
1677         OBD_FREE(uuid, sizeof(struct obd_uuid));
1678         RETURN(rc);
1679 }
1680
1681 static inline int name_create_mdt(char **logname, char *fsname, int i)
1682 {
1683         char mdt_index[9];
1684
1685         sprintf(mdt_index, "-MDT%04x", i);
1686         return name_create(logname, fsname, mdt_index);
1687 }
1688
1689 static int name_create_mdt_and_lov(char **logname, char **lovname,
1690                                     struct fs_db *fsdb, int i)
1691 {
1692         int rc;
1693
1694         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
1695         if (rc)
1696                 return rc;
1697         /* COMPAT_180 */
1698         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1699                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
1700         else
1701                 rc = name_create(lovname, *logname, "-mdtlov");
1702         if (rc) {
1703                 name_destroy(logname);
1704                 *logname = NULL;
1705         }
1706         return rc;
1707 }
1708
1709 static inline int name_create_mdt_osc(char **oscname, char *ostname,
1710                                        struct fs_db *fsdb, int i)
1711 {
1712         char suffix[16];
1713
1714         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1715                 sprintf(suffix, "-osc");
1716         else
1717                 sprintf(suffix, "-osc-MDT%04x", i);
1718         return name_create(oscname, ostname, suffix);
1719 }
1720
1721 /* envelope method for all layers log */
1722 static int mgs_write_log_mdt(const struct lu_env *env,
1723                              struct mgs_device *mgs,
1724                              struct fs_db *fsdb,
1725                              struct mgs_target_info *mti)
1726 {
1727         struct mgs_thread_info *mgi = mgs_env_info(env);
1728         struct llog_handle *llh = NULL;
1729         char *cliname;
1730         int rc, i = 0;
1731         ENTRY;
1732
1733         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
1734
1735         if (mti->mti_uuid[0] == '\0') {
1736                 /* Make up our own uuid */
1737                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1738                          "%s_UUID", mti->mti_svname);
1739         }
1740
1741         /* add mdt */
1742         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
1743         if (rc)
1744                 RETURN(rc);
1745         /* Append the mdt info to the client log */
1746         rc = name_create(&cliname, mti->mti_fsname, "-client");
1747         if (rc)
1748                 RETURN(rc);
1749
1750         if (mgs_log_is_empty(env, mgs, cliname)) {
1751                 /* Start client log */
1752                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
1753                                        fsdb->fsdb_clilov);
1754                 if (rc)
1755                         GOTO(out_free, rc);
1756                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
1757                                        fsdb->fsdb_clilmv);
1758                 if (rc)
1759                         GOTO(out_free, rc);
1760         }
1761
1762         /*
1763         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1764         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
1765         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
1766         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1767         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
1768         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
1769         */
1770
1771                 /* copy client info about lov/lmv */
1772                 mgi->mgi_comp.comp_mti = mti;
1773                 mgi->mgi_comp.comp_fsdb = fsdb;
1774
1775                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
1776                                                         &mgi->mgi_comp);
1777                 if (rc)
1778                         GOTO(out_free, rc);
1779                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
1780                                               fsdb->fsdb_clilmv);
1781                 if (rc)
1782                         GOTO(out_free, rc);
1783
1784                 /* add mountopts */
1785                 rc = record_start_log(env, mgs, &llh, cliname);
1786                 if (rc)
1787                         GOTO(out_free, rc);
1788
1789                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
1790                                    "mount opts");
1791                 if (rc)
1792                         GOTO(out_end, rc);
1793                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
1794                                       fsdb->fsdb_clilmv);
1795                 if (rc)
1796                         GOTO(out_end, rc);
1797                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
1798                                    "mount opts");
1799
1800         if (rc)
1801                 GOTO(out_end, rc);
1802         /* for_all_existing_mdt except current one */
1803         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1804                 char *mdtname;
1805                 if (i !=  mti->mti_stripe_index &&
1806                     cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1807                         rc = name_create_mdt(&mdtname, mti->mti_fsname, i);
1808                         if (rc)
1809                                 GOTO(out_end, rc);
1810                         rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, mti, mdtname);
1811                         name_destroy(&mdtname);
1812                         if (rc)
1813                                 GOTO(out_end, rc);
1814                 }
1815         }
1816 out_end:
1817         record_end_log(env, &llh);
1818 out_free:
1819         name_destroy(&cliname);
1820         RETURN(rc);
1821 }
1822
1823 /* Add the ost info to the client/mdt lov */
1824 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1825                                     struct mgs_device *mgs, struct fs_db *fsdb,
1826                                     struct mgs_target_info *mti,
1827                                     char *logname, char *suffix, char *lovname,
1828                                     enum lustre_sec_part sec_part, int flags)
1829 {
1830         struct llog_handle *llh = NULL;
1831         char *nodeuuid = NULL;
1832         char *oscname = NULL;
1833         char *oscuuid = NULL;
1834         char *lovuuid = NULL;
1835         char *svname = NULL;
1836         char index[6];
1837         int i, rc;
1838
1839         ENTRY;
1840         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
1841                mti->mti_svname, logname);
1842
1843         if (mgs_log_is_empty(env, mgs, logname)) {
1844                 CERROR("log is empty! Logical error\n");
1845                 RETURN (-EINVAL);
1846         }
1847
1848         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1849         if (rc)
1850                 RETURN(rc);
1851         rc = name_create(&svname, mti->mti_svname, "-osc");
1852         if (rc)
1853                 GOTO(out_free, rc);
1854         rc = name_create(&oscname, svname, suffix);
1855         if (rc)
1856                 GOTO(out_free, rc);
1857         rc = name_create(&oscuuid, oscname, "_UUID");
1858         if (rc)
1859                 GOTO(out_free, rc);
1860         rc = name_create(&lovuuid, lovname, "_UUID");
1861         if (rc)
1862                 GOTO(out_free, rc);
1863
1864         /*
1865         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1866         multihomed (#4)
1867         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
1868         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
1869         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
1870         failover (#6,7)
1871         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1872         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
1873         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
1874         */
1875
1876         rc = record_start_log(env, mgs, &llh, logname);
1877         if (rc)
1878                 GOTO(out_free, rc);
1879         /* FIXME these should be a single journal transaction */
1880         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
1881                            "add osc");
1882         if (rc)
1883                 GOTO(out_end, rc);
1884         for (i = 0; i < mti->mti_nid_count; i++) {
1885                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
1886                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1887                 if (rc)
1888                         GOTO(out_end, rc);
1889         }
1890         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
1891         if (rc)
1892                 GOTO(out_end, rc);
1893         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
1894         if (rc)
1895                 GOTO(out_end, rc);
1896         rc = mgs_write_log_failnids(env, mti, llh, oscname);
1897         if (rc)
1898                 GOTO(out_end, rc);
1899         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1900         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
1901         if (rc)
1902                 GOTO(out_end, rc);
1903         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
1904                            "add osc");
1905         if (rc)
1906                 GOTO(out_end, rc);
1907 out_end:
1908         record_end_log(env, &llh);
1909 out_free:
1910         name_destroy(&lovuuid);
1911         name_destroy(&oscuuid);
1912         name_destroy(&oscname);
1913         name_destroy(&svname);
1914         name_destroy(&nodeuuid);
1915         RETURN(rc);
1916 }
1917
1918 static int mgs_write_log_ost(const struct lu_env *env,
1919                              struct mgs_device *mgs, struct fs_db *fsdb,
1920                              struct mgs_target_info *mti)
1921 {
1922         struct llog_handle *llh = NULL;
1923         char *logname, *lovname;
1924         char *ptr = mti->mti_params;
1925         int rc, flags = 0, failout = 0, i;
1926         ENTRY;
1927
1928         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
1929
1930         /* The ost startup log */
1931
1932         /* If the ost log already exists, that means that someone reformatted
1933            the ost and it called target_add again. */
1934         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
1935                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
1936                                    "exists, yet the server claims it never "
1937                                    "registered. It may have been reformatted, "
1938                                    "or the index changed. writeconf the MDT to "
1939                                    "regenerate all logs.\n", mti->mti_svname);
1940                 RETURN(-EALREADY);
1941         }
1942
1943         /*
1944         attach obdfilter ost1 ost1_UUID
1945         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
1946         */
1947         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1948                 failout = (strncmp(ptr, "failout", 7) == 0);
1949         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
1950         if (rc)
1951                 RETURN(rc);
1952         /* FIXME these should be a single journal transaction */
1953         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
1954         if (rc)
1955                 GOTO(out_end, rc);
1956         if (*mti->mti_uuid == '\0')
1957                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1958                          "%s_UUID", mti->mti_svname);
1959         rc = record_attach(env, llh, mti->mti_svname,
1960                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
1961         if (rc)
1962                 GOTO(out_end, rc);
1963         rc = record_setup(env, llh, mti->mti_svname,
1964                           "dev"/*ignored*/, "type"/*ignored*/,
1965                           failout ? "n" : "f", 0/*options*/);
1966         if (rc)
1967                 GOTO(out_end, rc);
1968         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
1969         if (rc)
1970                 GOTO(out_end, rc);
1971 out_end:
1972         record_end_log(env, &llh);
1973         if (rc)
1974                 RETURN(rc);
1975         /* We also have to update the other logs where this osc is part of
1976            the lov */
1977
1978         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
1979                 /* If we're upgrading, the old mdt log already has our
1980                    entry. Let's do a fake one for fun. */
1981                 /* Note that we can't add any new failnids, since we don't
1982                    know the old osc names. */
1983                 flags = CM_SKIP | CM_UPGRADE146;
1984
1985         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
1986                 /* If the update flag isn't set, don't update client/mdt
1987                    logs. */
1988                 flags |= CM_SKIP;
1989                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
1990                               "the MDT first to regenerate it.\n",
1991                               mti->mti_svname);
1992         }
1993
1994         /* Add ost to all MDT lov defs */
1995         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1996                 if (cfs_test_bit(i, fsdb->fsdb_mdt_index_map)) {
1997                         char mdt_index[9];
1998
1999                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2000                                                      i);
2001                         if (rc)
2002                                 RETURN(rc);
2003                         sprintf(mdt_index, "-MDT%04x", i);
2004                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2005                                                       logname, mdt_index,
2006                                                       lovname, LUSTRE_SP_MDT,
2007                                                       flags);
2008                         name_destroy(&logname);
2009                         name_destroy(&lovname);
2010                         if (rc)
2011                                 RETURN(rc);
2012                 }
2013         }
2014
2015         /* Append ost info to the client log */
2016         rc = name_create(&logname, mti->mti_fsname, "-client");
2017         if (rc)
2018                 RETURN(rc);
2019         if (mgs_log_is_empty(env, mgs, logname)) {
2020                 /* Start client log */
2021                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2022                                        fsdb->fsdb_clilov);
2023                 if (rc)
2024                         GOTO(out_free, rc);
2025                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2026                                        fsdb->fsdb_clilmv);
2027                 if (rc)
2028                         GOTO(out_free, rc);
2029         }
2030         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2031                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2032 out_free:
2033         name_destroy(&logname);
2034         RETURN(rc);
2035 }
2036
2037 static __inline__ int mgs_param_empty(char *ptr)
2038 {
2039         char *tmp;
2040
2041         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2042                 return 1;
2043         return 0;
2044 }
2045
2046 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2047                                           struct mgs_device *mgs,
2048                                           struct fs_db *fsdb,
2049                                           struct mgs_target_info *mti,
2050                                           char *logname, char *cliname)
2051 {
2052         int rc;
2053         struct llog_handle *llh = NULL;
2054
2055         if (mgs_param_empty(mti->mti_params)) {
2056                 /* Remove _all_ failnids */
2057                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2058                                 mti->mti_svname, "add failnid", CM_SKIP);
2059                 return rc < 0 ? rc : 0;
2060         }
2061
2062         /* Otherwise failover nids are additive */
2063         rc = record_start_log(env, mgs, &llh, logname);
2064         if (rc)
2065                 return rc;
2066                 /* FIXME this should be a single journal transaction */
2067         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2068                            "add failnid");
2069         if (rc)
2070                 goto out_end;
2071         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2072         if (rc)
2073                 goto out_end;
2074         rc = record_marker(env, llh, fsdb, CM_END,
2075                            mti->mti_svname, "add failnid");
2076 out_end:
2077         record_end_log(env, &llh);
2078         return rc;
2079 }
2080
2081
2082 /* Add additional failnids to an existing log.
2083    The mdc/osc must have been added to logs first */
2084 /* tcp nids must be in dotted-quad ascii -
2085    we can't resolve hostnames from the kernel. */
2086 static int mgs_write_log_add_failnid(const struct lu_env *env,
2087                                      struct mgs_device *mgs,
2088                                      struct fs_db *fsdb,
2089                                      struct mgs_target_info *mti)
2090 {
2091         char *logname, *cliname;
2092         int rc;
2093         ENTRY;
2094
2095         /* FIXME we currently can't erase the failnids
2096          * given when a target first registers, since they aren't part of
2097          * an "add uuid" stanza */
2098
2099         /* Verify that we know about this target */
2100         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2101                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2102                                    "yet. It must be started before failnids "
2103                                    "can be added.\n", mti->mti_svname);
2104                 RETURN(-ENOENT);
2105         }
2106
2107         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2108         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2109                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2110         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2111                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2112         } else {
2113                 RETURN(-EINVAL);
2114         }
2115         if (rc)
2116                 RETURN(rc);
2117         /* Add failover nids to the client log */
2118         rc = name_create(&logname, mti->mti_fsname, "-client");
2119         if (rc) {
2120                 name_destroy(&cliname);
2121                 RETURN(rc);
2122         }
2123         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2124         name_destroy(&logname);
2125         name_destroy(&cliname);
2126         if (rc)
2127                 RETURN(rc);
2128
2129         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2130                 /* Add OST failover nids to the MDT logs as well */
2131                 int i;
2132
2133                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2134                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2135                                 continue;
2136                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2137                         if (rc)
2138                                 RETURN(rc);
2139                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2140                                                  fsdb, i);
2141                         if (rc) {
2142                                 name_destroy(&logname);
2143                                 RETURN(rc);
2144                         }
2145                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2146                                                             mti, logname,
2147                                                             cliname);
2148                         name_destroy(&cliname);
2149                         name_destroy(&logname);
2150                         if (rc)
2151                                 RETURN(rc);
2152                 }
2153         }
2154
2155         RETURN(rc);
2156 }
2157
2158 static int mgs_wlp_lcfg(const struct lu_env *env,
2159                         struct mgs_device *mgs, struct fs_db *fsdb,
2160                         struct mgs_target_info *mti,
2161                         char *logname, struct lustre_cfg_bufs *bufs,
2162                         char *tgtname, char *ptr)
2163 {
2164         char comment[MTI_NAME_MAXLEN];
2165         char *tmp;
2166         struct lustre_cfg *lcfg;
2167         int rc, del;
2168
2169         /* Erase any old settings of this same parameter */
2170         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2171         comment[MTI_NAME_MAXLEN - 1] = 0;
2172         /* But don't try to match the value. */
2173         if ((tmp = strchr(comment, '=')))
2174             *tmp = 0;
2175         /* FIXME we should skip settings that are the same as old values */
2176         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2177         if (rc < 0)
2178                 return rc;
2179         del = mgs_param_empty(ptr);
2180
2181         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2182                       "Sett" : "Modify", tgtname, comment, logname);
2183         if (del)
2184                 return rc;
2185
2186         lustre_cfg_bufs_reset(bufs, tgtname);
2187         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2188         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2189         if (!lcfg)
2190                 return -ENOMEM;
2191         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2192         lustre_cfg_free(lcfg);
2193         return rc;
2194 }
2195
2196 /* write global variable settings into log */
2197 static int mgs_write_log_sys(const struct lu_env *env,
2198                              struct mgs_device *mgs, struct fs_db *fsdb,
2199                              struct mgs_target_info *mti, char *sys, char *ptr)
2200 {
2201         struct mgs_thread_info *mgi = mgs_env_info(env);
2202         struct lustre_cfg *lcfg;
2203         char *tmp, sep;
2204         int rc, cmd, convert = 1;
2205
2206         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2207                 cmd = LCFG_SET_TIMEOUT;
2208         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2209                 cmd = LCFG_SET_LDLM_TIMEOUT;
2210         /* Check for known params here so we can return error to lctl */
2211         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2212                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2213                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2214                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2215                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2216                 cmd = LCFG_PARAM;
2217         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2218                 convert = 0; /* Don't convert string value to integer */
2219                 cmd = LCFG_PARAM;
2220         } else {
2221                 return -EINVAL;
2222         }
2223
2224         if (mgs_param_empty(ptr))
2225                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2226         else
2227                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2228
2229         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2230         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2231         if (!convert && *tmp != '\0')
2232                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2233         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2234         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2235         /* truncate the comment to the parameter name */
2236         ptr = tmp - 1;
2237         sep = *ptr;
2238         *ptr = '\0';
2239         /* modify all servers and clients */
2240         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2241                                       *tmp == '\0' ? NULL : lcfg,
2242                                       mti->mti_fsname, sys, 0);
2243         if (rc == 0 && *tmp != '\0') {
2244                 switch (cmd) {
2245                 case LCFG_SET_TIMEOUT:
2246                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2247                                 class_process_config(lcfg);
2248                         break;
2249                 case LCFG_SET_LDLM_TIMEOUT:
2250                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2251                                 class_process_config(lcfg);
2252                         break;
2253                 default:
2254                         break;
2255                 }
2256         }
2257         *ptr = sep;
2258         lustre_cfg_free(lcfg);
2259         return rc;
2260 }
2261
2262 /* write quota settings into log */
2263 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2264                                struct fs_db *fsdb, struct mgs_target_info *mti,
2265                                char *quota, char *ptr)
2266 {
2267         struct mgs_thread_info  *mgi = mgs_env_info(env);
2268         struct lustre_cfg       *lcfg;
2269         char                    *tmp;
2270         char                     sep;
2271         int                      rc, cmd = LCFG_PARAM;
2272
2273         /* support only 'meta' and 'data' pools so far */
2274         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2275             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2276                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2277                        "& quota.ost are)\n", ptr);
2278                 return -EINVAL;
2279         }
2280
2281         if (*tmp == '\0') {
2282                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2283         } else {
2284                 CDEBUG(D_MGS, "global '%s'\n", quota);
2285
2286                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2287                     strcmp(tmp, "none") != 0) {
2288                         CERROR("enable option(%s) isn't supported\n", tmp);
2289                         return -EINVAL;
2290                 }
2291         }
2292
2293         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2294         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2295         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2296         /* truncate the comment to the parameter name */
2297         ptr = tmp - 1;
2298         sep = *ptr;
2299         *ptr = '\0';
2300
2301         /* XXX we duplicated quota enable information in all server
2302          *     config logs, it should be moved to a separate config
2303          *     log once we cleanup the config log for global param. */
2304         /* modify all servers */
2305         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2306                                       *tmp == '\0' ? NULL : lcfg,
2307                                       mti->mti_fsname, quota, 1);
2308         *ptr = sep;
2309         lustre_cfg_free(lcfg);
2310         return rc;
2311 }
2312
2313 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2314                                    struct mgs_device *mgs,
2315                                    struct fs_db *fsdb,
2316                                    struct mgs_target_info *mti,
2317                                    char *param)
2318 {
2319         struct mgs_thread_info *mgi = mgs_env_info(env);
2320         struct llog_handle     *llh = NULL;
2321         char                   *logname;
2322         char                   *comment, *ptr;
2323         struct lustre_cfg      *lcfg;
2324         int                     rc, len;
2325         ENTRY;
2326
2327         /* get comment */
2328         ptr = strchr(param, '=');
2329         LASSERT(ptr);
2330         len = ptr - param;
2331
2332         OBD_ALLOC(comment, len + 1);
2333         if (comment == NULL)
2334                 RETURN(-ENOMEM);
2335         strncpy(comment, param, len);
2336         comment[len] = '\0';
2337
2338         /* prepare lcfg */
2339         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2340         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2341         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2342         if (lcfg == NULL)
2343                 GOTO(out_comment, rc = -ENOMEM);
2344
2345         /* construct log name */
2346         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2347         if (rc)
2348                 GOTO(out_lcfg, rc);
2349
2350         if (mgs_log_is_empty(env, mgs, logname)) {
2351                 rc = record_start_log(env, mgs, &llh, logname);
2352                 if (rc)
2353                         GOTO(out, rc);
2354                 record_end_log(env, &llh);
2355         }
2356
2357         /* obsolete old one */
2358         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2359                         comment, CM_SKIP);
2360         if (rc < 0)
2361                 GOTO(out, rc);
2362         /* write the new one */
2363         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2364                                   mti->mti_svname, comment);
2365         if (rc)
2366                 CERROR("err %d writing log %s\n", rc, logname);
2367 out:
2368         name_destroy(&logname);
2369 out_lcfg:
2370         lustre_cfg_free(lcfg);
2371 out_comment:
2372         OBD_FREE(comment, len + 1);
2373         RETURN(rc);
2374 }
2375
2376 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2377                                         char *param)
2378 {
2379         char    *ptr;
2380
2381         /* disable the adjustable udesc parameter for now, i.e. use default
2382          * setting that client always ship udesc to MDT if possible. to enable
2383          * it simply remove the following line */
2384         goto error_out;
2385
2386         ptr = strchr(param, '=');
2387         if (ptr == NULL)
2388                 goto error_out;
2389         *ptr++ = '\0';
2390
2391         if (strcmp(param, PARAM_SRPC_UDESC))
2392                 goto error_out;
2393
2394         if (strcmp(ptr, "yes") == 0) {
2395                 cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2396                 CWARN("Enable user descriptor shipping from client to MDT\n");
2397         } else if (strcmp(ptr, "no") == 0) {
2398                 cfs_clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2399                 CWARN("Disable user descriptor shipping from client to MDT\n");
2400         } else {
2401                 *(ptr - 1) = '=';
2402                 goto error_out;
2403         }
2404         return 0;
2405
2406 error_out:
2407         CERROR("Invalid param: %s\n", param);
2408         return -EINVAL;
2409 }
2410
2411 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2412                                   const char *svname,
2413                                   char *param)
2414 {
2415         struct sptlrpc_rule      rule;
2416         struct sptlrpc_rule_set *rset;
2417         int                      rc;
2418         ENTRY;
2419
2420         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2421                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2422                 RETURN(-EINVAL);
2423         }
2424
2425         if (strncmp(param, PARAM_SRPC_UDESC,
2426                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2427                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2428         }
2429
2430         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2431                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2432                 RETURN(-EINVAL);
2433         }
2434
2435         param += sizeof(PARAM_SRPC_FLVR) - 1;
2436
2437         rc = sptlrpc_parse_rule(param, &rule);
2438         if (rc)
2439                 RETURN(rc);
2440
2441         /* mgs rules implies must be mgc->mgs */
2442         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2443                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2444                      rule.sr_from != LUSTRE_SP_ANY) ||
2445                     (rule.sr_to != LUSTRE_SP_MGS &&
2446                      rule.sr_to != LUSTRE_SP_ANY))
2447                         RETURN(-EINVAL);
2448         }
2449
2450         /* preapre room for this coming rule. svcname format should be:
2451          * - fsname: general rule
2452          * - fsname-tgtname: target-specific rule
2453          */
2454         if (strchr(svname, '-')) {
2455                 struct mgs_tgt_srpc_conf *tgtconf;
2456                 int                       found = 0;
2457
2458                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2459                      tgtconf = tgtconf->mtsc_next) {
2460                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2461                                 found = 1;
2462                                 break;
2463                         }
2464                 }
2465
2466                 if (!found) {
2467                         int name_len;
2468
2469                         OBD_ALLOC_PTR(tgtconf);
2470                         if (tgtconf == NULL)
2471                                 RETURN(-ENOMEM);
2472
2473                         name_len = strlen(svname);
2474
2475                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
2476                         if (tgtconf->mtsc_tgt == NULL) {
2477                                 OBD_FREE_PTR(tgtconf);
2478                                 RETURN(-ENOMEM);
2479                         }
2480                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
2481
2482                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
2483                         fsdb->fsdb_srpc_tgt = tgtconf;
2484                 }
2485
2486                 rset = &tgtconf->mtsc_rset;
2487         } else {
2488                 rset = &fsdb->fsdb_srpc_gen;
2489         }
2490
2491         rc = sptlrpc_rule_set_merge(rset, &rule);
2492
2493         RETURN(rc);
2494 }
2495
2496 static int mgs_srpc_set_param(const struct lu_env *env,
2497                               struct mgs_device *mgs,
2498                               struct fs_db *fsdb,
2499                               struct mgs_target_info *mti,
2500                               char *param)
2501 {
2502         char                   *copy;
2503         int                     rc, copy_size;
2504         ENTRY;
2505
2506 #ifndef HAVE_GSS
2507         RETURN(-EINVAL);
2508 #endif
2509         /* keep a copy of original param, which could be destroied
2510          * during parsing */
2511         copy_size = strlen(param) + 1;
2512         OBD_ALLOC(copy, copy_size);
2513         if (copy == NULL)
2514                 return -ENOMEM;
2515         memcpy(copy, param, copy_size);
2516
2517         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
2518         if (rc)
2519                 goto out_free;
2520
2521         /* previous steps guaranteed the syntax is correct */
2522         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
2523         if (rc)
2524                 goto out_free;
2525
2526         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2527                 /*
2528                  * for mgs rules, make them effective immediately.
2529                  */
2530                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
2531                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
2532                                                  &fsdb->fsdb_srpc_gen);
2533         }
2534
2535 out_free:
2536         OBD_FREE(copy, copy_size);
2537         RETURN(rc);
2538 }
2539
2540 struct mgs_srpc_read_data {
2541         struct fs_db   *msrd_fsdb;
2542         int             msrd_skip;
2543 };
2544
2545 static int mgs_srpc_read_handler(const struct lu_env *env,
2546                                  struct llog_handle *llh,
2547                                  struct llog_rec_hdr *rec, void *data)
2548 {
2549         struct mgs_srpc_read_data *msrd = data;
2550         struct cfg_marker         *marker;
2551         struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
2552         char                      *svname, *param;
2553         int                        cfg_len, rc;
2554         ENTRY;
2555
2556         if (rec->lrh_type != OBD_CFG_REC) {
2557                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2558                 RETURN(-EINVAL);
2559         }
2560
2561         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
2562                   sizeof(struct llog_rec_tail);
2563
2564         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2565         if (rc) {
2566                 CERROR("Insane cfg\n");
2567                 RETURN(rc);
2568         }
2569
2570         if (lcfg->lcfg_command == LCFG_MARKER) {
2571                 marker = lustre_cfg_buf(lcfg, 1);
2572
2573                 if (marker->cm_flags & CM_START &&
2574                     marker->cm_flags & CM_SKIP)
2575                         msrd->msrd_skip = 1;
2576                 if (marker->cm_flags & CM_END)
2577                         msrd->msrd_skip = 0;
2578
2579                 RETURN(0);
2580         }
2581
2582         if (msrd->msrd_skip)
2583                 RETURN(0);
2584
2585         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
2586                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
2587                 RETURN(0);
2588         }
2589
2590         svname = lustre_cfg_string(lcfg, 0);
2591         if (svname == NULL) {
2592                 CERROR("svname is empty\n");
2593                 RETURN(0);
2594         }
2595
2596         param = lustre_cfg_string(lcfg, 1);
2597         if (param == NULL) {
2598                 CERROR("param is empty\n");
2599                 RETURN(0);
2600         }
2601
2602         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
2603         if (rc)
2604                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
2605
2606         RETURN(0);
2607 }
2608
2609 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
2610                                 struct mgs_device *mgs,
2611                                 struct fs_db *fsdb)
2612 {
2613         struct llog_handle        *llh = NULL;
2614         struct llog_ctxt          *ctxt;
2615         char                      *logname;
2616         struct mgs_srpc_read_data  msrd;
2617         int                        rc;
2618         ENTRY;
2619
2620         /* construct log name */
2621         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
2622         if (rc)
2623                 RETURN(rc);
2624
2625         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2626         LASSERT(ctxt != NULL);
2627
2628         if (mgs_log_is_empty(env, mgs, logname))
2629                 GOTO(out, rc = 0);
2630
2631         rc = llog_open(env, ctxt, &llh, NULL, logname,
2632                        LLOG_OPEN_EXISTS);
2633         if (rc < 0) {
2634                 if (rc == -ENOENT)
2635                         rc = 0;
2636                 GOTO(out, rc);
2637         }
2638
2639         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
2640         if (rc)
2641                 GOTO(out_close, rc);
2642
2643         if (llog_get_size(llh) <= 1)
2644                 GOTO(out_close, rc = 0);
2645
2646         msrd.msrd_fsdb = fsdb;
2647         msrd.msrd_skip = 0;
2648
2649         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
2650                           NULL);
2651
2652 out_close:
2653         llog_close(env, llh);
2654 out:
2655         llog_ctxt_put(ctxt);
2656         name_destroy(&logname);
2657
2658         if (rc)
2659                 CERROR("failed to read sptlrpc config database: %d\n", rc);
2660         RETURN(rc);
2661 }
2662
2663 /* Permanent settings of all parameters by writing into the appropriate
2664  * configuration logs.
2665  * A parameter with null value ("<param>='\0'") means to erase it out of
2666  * the logs.
2667  */
2668 static int mgs_write_log_param(const struct lu_env *env,
2669                                struct mgs_device *mgs, struct fs_db *fsdb,
2670                                struct mgs_target_info *mti, char *ptr)
2671 {
2672         struct mgs_thread_info *mgi = mgs_env_info(env);
2673         char *logname;
2674         char *tmp;
2675         int rc = 0, rc2 = 0;
2676         ENTRY;
2677
2678         /* For various parameter settings, we have to figure out which logs
2679            care about them (e.g. both mdt and client for lov settings) */
2680         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2681
2682         /* The params are stored in MOUNT_DATA_FILE and modified via
2683            tunefs.lustre, or set using lctl conf_param */
2684
2685         /* Processed in lustre_start_mgc */
2686         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
2687                 GOTO(end, rc);
2688
2689         /* Processed in ost/mdt */
2690         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
2691                 GOTO(end, rc);
2692
2693         /* Processed in mgs_write_log_ost */
2694         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
2695                 if (mti->mti_flags & LDD_F_PARAM) {
2696                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
2697                                            "changed with tunefs.lustre"
2698                                            "and --writeconf\n", ptr);
2699                         rc = -EPERM;
2700                 }
2701                 GOTO(end, rc);
2702         }
2703
2704         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
2705                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
2706                 GOTO(end, rc);
2707         }
2708
2709         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
2710                 /* Add a failover nidlist */
2711                 rc = 0;
2712                 /* We already processed failovers params for new
2713                    targets in mgs_write_log_target */
2714                 if (mti->mti_flags & LDD_F_PARAM) {
2715                         CDEBUG(D_MGS, "Adding failnode\n");
2716                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
2717                 }
2718                 GOTO(end, rc);
2719         }
2720
2721         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
2722                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
2723                 GOTO(end, rc);
2724         }
2725
2726         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
2727                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
2728                 GOTO(end, rc);
2729         }
2730
2731         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
2732                 /* active=0 means off, anything else means on */
2733                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
2734                 int i;
2735
2736                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2737                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
2738                                            "be (de)activated.\n",
2739                                            mti->mti_svname);
2740                         GOTO(end, rc = -EINVAL);
2741                 }
2742                 LCONSOLE_WARN("Permanently %sactivating %s\n",
2743                               flag ? "de": "re", mti->mti_svname);
2744                 /* Modify clilov */
2745                 rc = name_create(&logname, mti->mti_fsname, "-client");
2746                 if (rc)
2747                         GOTO(end, rc);
2748                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2749                                 mti->mti_svname, "add osc", flag);
2750                 name_destroy(&logname);
2751                 if (rc)
2752                         goto active_err;
2753                 /* Modify mdtlov */
2754                 /* Add to all MDT logs for CMD */
2755                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2756                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2757                                 continue;
2758                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2759                         if (rc)
2760                                 GOTO(end, rc);
2761                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
2762                                         mti->mti_svname, "add osc", flag);
2763                         name_destroy(&logname);
2764                         if (rc)
2765                                 goto active_err;
2766                 }
2767         active_err:
2768                 if (rc) {
2769                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
2770                                            "log (%d). No permanent "
2771                                            "changes were made to the "
2772                                            "config log.\n",
2773                                            mti->mti_svname, rc);
2774                         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
2775                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
2776                                                    " because the log"
2777                                                    "is in the old 1.4"
2778                                                    "style. Consider "
2779                                                    " --writeconf to "
2780                                                    "update the logs.\n");
2781                         GOTO(end, rc);
2782                 }
2783                 /* Fall through to osc proc for deactivating live OSC
2784                    on running MDT / clients. */
2785         }
2786         /* Below here, let obd's XXX_process_config methods handle it */
2787
2788         /* All lov. in proc */
2789         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
2790                 char *mdtlovname;
2791
2792                 CDEBUG(D_MGS, "lov param %s\n", ptr);
2793                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
2794                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
2795                                            "set on the MDT, not %s. "
2796                                            "Ignoring.\n",
2797                                            mti->mti_svname);
2798                         GOTO(end, rc = 0);
2799                 }
2800
2801                 /* Modify mdtlov */
2802                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2803                         GOTO(end, rc = -ENODEV);
2804
2805                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
2806                                              mti->mti_stripe_index);
2807                 if (rc)
2808                         GOTO(end, rc);
2809                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2810                                   &mgi->mgi_bufs, mdtlovname, ptr);
2811                 name_destroy(&logname);
2812                 name_destroy(&mdtlovname);
2813                 if (rc)
2814                         GOTO(end, rc);
2815
2816                 /* Modify clilov */
2817                 rc = name_create(&logname, mti->mti_fsname, "-client");
2818                 if (rc)
2819                         GOTO(end, rc);
2820                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2821                                   fsdb->fsdb_clilov, ptr);
2822                 name_destroy(&logname);
2823                 GOTO(end, rc);
2824         }
2825
2826         /* All osc., mdc., llite. params in proc */
2827         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
2828             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
2829             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
2830                 char *cname;
2831
2832                 if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2833                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
2834                                            " cannot be modified. Consider"
2835                                            " updating the configuration with"
2836                                            " --writeconf\n",
2837                                            mti->mti_svname);
2838                         GOTO(end, rc = -EINVAL);
2839                 }
2840                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
2841                         rc = name_create(&cname, mti->mti_fsname, "-client");
2842                         /* Add the client type to match the obdname in
2843                            class_config_llog_handler */
2844                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2845                         rc = name_create(&cname, mti->mti_svname, "-mdc");
2846                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2847                         rc = name_create(&cname, mti->mti_svname, "-osc");
2848                 } else {
2849                         GOTO(end, rc = -EINVAL);
2850                 }
2851                 if (rc)
2852                         GOTO(end, rc);
2853
2854                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2855
2856                 /* Modify client */
2857                 rc = name_create(&logname, mti->mti_fsname, "-client");
2858                 if (rc) {
2859                         name_destroy(&cname);
2860                         GOTO(end, rc);
2861                 }
2862                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2863                                   cname, ptr);
2864
2865                 /* osc params affect the MDT as well */
2866                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2867                         int i;
2868
2869                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2870                                 if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2871                                         continue;
2872                                 name_destroy(&cname);
2873                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
2874                                                          fsdb, i);
2875                                 name_destroy(&logname);
2876                                 if (rc)
2877                                         break;
2878                                 rc = name_create_mdt(&logname,
2879                                                      mti->mti_fsname, i);
2880                                 if (rc)
2881                                         break;
2882                                 if (!mgs_log_is_empty(env, mgs, logname)) {
2883                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
2884                                                           mti, logname,
2885                                                           &mgi->mgi_bufs,
2886                                                           cname, ptr);
2887                                         if (rc)
2888                                                 break;
2889                                 }
2890                         }
2891                 }
2892                 name_destroy(&logname);
2893                 name_destroy(&cname);
2894                 GOTO(end, rc);
2895         }
2896
2897         /* All mdt. params in proc */
2898         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
2899                 int i;
2900                 __u32 idx;
2901
2902                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2903                 if (strncmp(mti->mti_svname, mti->mti_fsname,
2904                             MTI_NAME_MAXLEN) == 0)
2905                         /* device is unspecified completely? */
2906                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
2907                 else
2908                         rc = server_name2index(mti->mti_svname, &idx, NULL);
2909                 if (rc < 0)
2910                         goto active_err;
2911                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
2912                         goto active_err;
2913                 if (rc & LDD_F_SV_ALL) {
2914                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2915                                 if (!cfs_test_bit(i,
2916                                                   fsdb->fsdb_mdt_index_map))
2917                                         continue;
2918                                 rc = name_create_mdt(&logname,
2919                                                 mti->mti_fsname, i);
2920                                 if (rc)
2921                                         goto active_err;
2922                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2923                                                   logname, &mgi->mgi_bufs,
2924                                                   logname, ptr);
2925                                 name_destroy(&logname);
2926                                 if (rc)
2927                                         goto active_err;
2928                         }
2929                 } else {
2930                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2931                                           mti->mti_svname, &mgi->mgi_bufs,
2932                                           mti->mti_svname, ptr);
2933                         if (rc)
2934                                 goto active_err;
2935                 }
2936                 GOTO(end, rc);
2937         }
2938
2939         /* All mdd., ost. params in proc */
2940         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
2941             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
2942                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2943                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2944                         GOTO(end, rc = -ENODEV);
2945
2946                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2947                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
2948                 GOTO(end, rc);
2949         }
2950
2951         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
2952         rc2 = -ENOSYS;
2953
2954 end:
2955         if (rc)
2956                 CERROR("err %d on param '%s'\n", rc, ptr);
2957
2958         RETURN(rc ?: rc2);
2959 }
2960
2961 /* Not implementing automatic failover nid addition at this time. */
2962 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
2963                       struct mgs_target_info *mti)
2964 {
2965 #if 0
2966         struct fs_db *fsdb;
2967         int rc;
2968         ENTRY;
2969
2970         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
2971         if (rc)
2972                 RETURN(rc);
2973
2974         if (mgs_log_is_empty(obd, mti->mti_svname))
2975                 /* should never happen */
2976                 RETURN(-ENOENT);
2977
2978         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
2979
2980         /* FIXME We can just check mti->params to see if we're already in
2981            the failover list.  Modify mti->params for rewriting back at
2982            server_register_target(). */
2983
2984         cfs_mutex_lock(&fsdb->fsdb_mutex);
2985         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
2986         cfs_mutex_unlock(&fsdb->fsdb_mutex);
2987
2988         RETURN(rc);
2989 #endif
2990         return 0;
2991 }
2992
2993 int mgs_write_log_target(const struct lu_env *env,
2994                          struct mgs_device *mgs,
2995                          struct mgs_target_info *mti,
2996                          struct fs_db *fsdb)
2997 {
2998         int rc = -EINVAL;
2999         char *buf, *params;
3000         ENTRY;
3001
3002         /* set/check the new target index */
3003         rc = mgs_set_index(env, mgs, mti);
3004         if (rc < 0) {
3005                 CERROR("Can't get index (%d)\n", rc);
3006                 RETURN(rc);
3007         }
3008
3009         if (rc == EALREADY) {
3010                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3011                               mti->mti_stripe_index, mti->mti_svname);
3012                 /* We would like to mark old log sections as invalid
3013                    and add new log sections in the client and mdt logs.
3014                    But if we add new sections, then live clients will
3015                    get repeat setup instructions for already running
3016                    osc's. So don't update the client/mdt logs. */
3017                 mti->mti_flags &= ~LDD_F_UPDATE;
3018         }
3019
3020         cfs_mutex_lock(&fsdb->fsdb_mutex);
3021
3022         if (mti->mti_flags &
3023             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3024                 /* Generate a log from scratch */
3025                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3026                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3027                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3028                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3029                 } else {
3030                         CERROR("Unknown target type %#x, can't create log for "
3031                                "%s\n", mti->mti_flags, mti->mti_svname);
3032                 }
3033                 if (rc) {
3034                         CERROR("Can't write logs for %s (%d)\n",
3035                                mti->mti_svname, rc);
3036                         GOTO(out_up, rc);
3037                 }
3038         } else {
3039                 /* Just update the params from tunefs in mgs_write_log_params */
3040                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3041                 mti->mti_flags |= LDD_F_PARAM;
3042         }
3043
3044         /* allocate temporary buffer, where class_get_next_param will
3045            make copy of a current  parameter */
3046         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3047         if (buf == NULL)
3048                 GOTO(out_up, rc = -ENOMEM);
3049         params = mti->mti_params;
3050         while (params != NULL) {
3051                 rc = class_get_next_param(&params, buf);
3052                 if (rc) {
3053                         if (rc == 1)
3054                                 /* there is no next parameter, that is
3055                                    not an error */
3056                                 rc = 0;
3057                         break;
3058                 }
3059                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3060                        params, buf);
3061                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3062                 if (rc)
3063                         break;
3064         }
3065
3066         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3067
3068 out_up:
3069         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3070         RETURN(rc);
3071 }
3072
3073 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3074 {
3075         struct llog_ctxt        *ctxt;
3076         int                      rc = 0;
3077
3078         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3079         if (ctxt == NULL) {
3080                 CERROR("%s: MGS config context doesn't exist\n",
3081                        mgs->mgs_obd->obd_name);
3082                 rc = -ENODEV;
3083         } else {
3084                 rc = llog_erase(env, ctxt, NULL, name);
3085                 /* llog may not exist */
3086                 if (rc == -ENOENT)
3087                         rc = 0;
3088                 llog_ctxt_put(ctxt);
3089         }
3090
3091         if (rc)
3092                 CERROR("%s: failed to clear log %s: %d\n",
3093                        mgs->mgs_obd->obd_name, name, rc);
3094
3095         return rc;
3096 }
3097
3098 /* erase all logs for the given fs */
3099 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3100 {
3101         struct fs_db *fsdb;
3102         cfs_list_t list;
3103         struct mgs_direntry *dirent, *n;
3104         int rc, len = strlen(fsname);
3105         char *suffix;
3106         ENTRY;
3107
3108         /* Find all the logs in the CONFIGS directory */
3109         rc = class_dentry_readdir(env, mgs, &list);
3110         if (rc)
3111                 RETURN(rc);
3112
3113         cfs_mutex_lock(&mgs->mgs_mutex);
3114
3115         /* Delete the fs db */
3116         fsdb = mgs_find_fsdb(mgs, fsname);
3117         if (fsdb)
3118                 mgs_free_fsdb(mgs, fsdb);
3119
3120         cfs_mutex_unlock(&mgs->mgs_mutex);
3121
3122         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3123                 cfs_list_del(&dirent->list);
3124                 suffix = strrchr(dirent->name, '-');
3125                 if (suffix != NULL) {
3126                         if ((len == suffix - dirent->name) &&
3127                             (strncmp(fsname, dirent->name, len) == 0)) {
3128                                 CDEBUG(D_MGS, "Removing log %s\n",
3129                                        dirent->name);
3130                                 mgs_erase_log(env, mgs, dirent->name);
3131                         }
3132                 }
3133                 mgs_direntry_free(dirent);
3134         }
3135
3136         RETURN(rc);
3137 }
3138
3139 /* from llog_swab */
3140 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3141 {
3142         int i;
3143         ENTRY;
3144
3145         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3146         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3147
3148         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3149         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3150         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3151         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3152
3153         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3154         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3155                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3156                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3157                                i, lcfg->lcfg_buflens[i],
3158                                lustre_cfg_string(lcfg, i));
3159                 }
3160         EXIT;
3161 }
3162
3163 /* Set a permanent (config log) param for a target or fs
3164  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3165  *             buf1 contains the single parameter
3166  */
3167 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3168                  struct lustre_cfg *lcfg, char *fsname)
3169 {
3170         struct fs_db *fsdb;
3171         struct mgs_target_info *mti;
3172         char *devname, *param;
3173         char *ptr, *tmp;
3174         __u32 index;
3175         int rc = 0;
3176         ENTRY;
3177
3178         print_lustre_cfg(lcfg);
3179
3180         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3181         devname = lustre_cfg_string(lcfg, 0);
3182         param = lustre_cfg_string(lcfg, 1);
3183         if (!devname) {
3184                 /* Assume device name embedded in param:
3185                    lustre-OST0000.osc.max_dirty_mb=32 */
3186                 ptr = strchr(param, '.');
3187                 if (ptr) {
3188                         devname = param;
3189                         *ptr = 0;
3190                         param = ptr + 1;
3191                 }
3192         }
3193         if (!devname) {
3194                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3195                 RETURN(-ENOSYS);
3196         }
3197
3198         /* Extract fsname */
3199         ptr = strrchr(devname, '-');
3200         memset(fsname, 0, MTI_NAME_MAXLEN);
3201         if (ptr && (server_name2index(ptr, &index, NULL) >= 0)) {
3202                 /* param related to llite isn't allowed to set by OST or MDT */
3203                 if (strncmp(param, PARAM_LLITE, sizeof(PARAM_LLITE)) == 0)
3204                         RETURN(-EINVAL);
3205
3206                 strncpy(fsname, devname, ptr - devname);
3207         } else {
3208                 /* assume devname is the fsname */
3209                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3210         }
3211         fsname[MTI_NAME_MAXLEN - 1] = 0;
3212         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3213
3214         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3215         if (rc)
3216                 RETURN(rc);
3217         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3218             cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3219                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3220                        "is '%s'\n", fsname, devname);
3221                 mgs_free_fsdb(mgs, fsdb);
3222                 RETURN(-EINVAL);
3223         }
3224
3225         /* Create a fake mti to hold everything */
3226         OBD_ALLOC_PTR(mti);
3227         if (!mti)
3228                 GOTO(out, rc = -ENOMEM);
3229         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3230         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3231         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3232         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3233         if (rc < 0)
3234                 /* Not a valid server; may be only fsname */
3235                 rc = 0;
3236         else
3237                 /* Strip -osc or -mdc suffix from svname */
3238                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3239                                      mti->mti_svname))
3240                         GOTO(out, rc = -EINVAL);
3241
3242         mti->mti_flags = rc | LDD_F_PARAM;
3243
3244         cfs_mutex_lock(&fsdb->fsdb_mutex);
3245         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3246         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3247
3248         /*
3249          * Revoke lock so everyone updates.  Should be alright if
3250          * someone was already reading while we were updating the logs,
3251          * so we don't really need to hold the lock while we're
3252          * writing (above).
3253          */
3254         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3255 out:
3256         OBD_FREE_PTR(mti);
3257         RETURN(rc);
3258 }
3259
3260 static int mgs_write_log_pool(const struct lu_env *env,
3261                               struct mgs_device *mgs, char *logname,
3262                               struct fs_db *fsdb, char *lovname,
3263                               enum lcfg_command_type cmd,
3264                               char *poolname, char *fsname,
3265                               char *ostname, char *comment)
3266 {
3267         struct llog_handle *llh = NULL;
3268         int rc;
3269
3270         rc = record_start_log(env, mgs, &llh, logname);
3271         if (rc)
3272                 return rc;
3273         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3274         if (rc)
3275                 goto out;
3276         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3277         if (rc)
3278                 goto out;
3279         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3280 out:
3281         record_end_log(env, &llh);
3282         return rc;
3283 }
3284
3285 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3286                  enum lcfg_command_type cmd, char *fsname,
3287                  char *poolname, char *ostname)
3288 {
3289         struct fs_db *fsdb;
3290         char *lovname;
3291         char *logname;
3292         char *label = NULL, *canceled_label = NULL;
3293         int label_sz;
3294         struct mgs_target_info *mti = NULL;
3295         int rc, i;
3296         ENTRY;
3297
3298         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3299         if (rc) {
3300                 CERROR("Can't get db for %s\n", fsname);
3301                 RETURN(rc);
3302         }
3303         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3304                 CERROR("%s is not defined\n", fsname);
3305                 mgs_free_fsdb(mgs, fsdb);
3306                 RETURN(-EINVAL);
3307         }
3308
3309         label_sz = 10 + strlen(fsname) + strlen(poolname);
3310
3311         /* check if ostname match fsname */
3312         if (ostname != NULL) {
3313                 char *ptr;
3314
3315                 ptr = strrchr(ostname, '-');
3316                 if ((ptr == NULL) ||
3317                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3318                         RETURN(-EINVAL);
3319                 label_sz += strlen(ostname);
3320         }
3321
3322         OBD_ALLOC(label, label_sz);
3323         if (label == NULL)
3324                 RETURN(-ENOMEM);
3325
3326         switch(cmd) {
3327         case LCFG_POOL_NEW:
3328                 sprintf(label,
3329                         "new %s.%s", fsname, poolname);
3330                 break;
3331         case LCFG_POOL_ADD:
3332                 sprintf(label,
3333                         "add %s.%s.%s", fsname, poolname, ostname);
3334                 break;
3335         case LCFG_POOL_REM:
3336                 OBD_ALLOC(canceled_label, label_sz);
3337                 if (canceled_label == NULL)
3338                         GOTO(out_label, rc = -ENOMEM);
3339                 sprintf(label,
3340                         "rem %s.%s.%s", fsname, poolname, ostname);
3341                 sprintf(canceled_label,
3342                         "add %s.%s.%s", fsname, poolname, ostname);
3343                 break;
3344         case LCFG_POOL_DEL:
3345                 OBD_ALLOC(canceled_label, label_sz);
3346                 if (canceled_label == NULL)
3347                         GOTO(out_label, rc = -ENOMEM);
3348                 sprintf(label,
3349                         "del %s.%s", fsname, poolname);
3350                 sprintf(canceled_label,
3351                         "new %s.%s", fsname, poolname);
3352                 break;
3353         default:
3354                 break;
3355         }
3356
3357         cfs_mutex_lock(&fsdb->fsdb_mutex);
3358
3359         if (canceled_label != NULL) {
3360                 OBD_ALLOC_PTR(mti);
3361                 if (mti == NULL)
3362                         GOTO(out_cancel, rc = -ENOMEM);
3363         }
3364
3365         /* write pool def to all MDT logs */
3366         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3367                  if (cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3368                         rc = name_create_mdt_and_lov(&logname, &lovname,
3369                                                      fsdb, i);
3370                         if (rc) {
3371                                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3372                                 GOTO(out_mti, rc);
3373                         }
3374                         if (canceled_label != NULL) {
3375                                 strcpy(mti->mti_svname, "lov pool");
3376                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3377                                                 lovname, canceled_label,
3378                                                 CM_SKIP);
3379                         }
3380
3381                         if (rc >= 0)
3382                                 rc = mgs_write_log_pool(env, mgs, logname,
3383                                                         fsdb, lovname, cmd,
3384                                                         fsname, poolname,
3385                                                         ostname, label);
3386                         name_destroy(&logname);
3387                         name_destroy(&lovname);
3388                         if (rc) {
3389                                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3390                                 GOTO(out_mti, rc);
3391                         }
3392                 }
3393         }
3394
3395         rc = name_create(&logname, fsname, "-client");
3396         if (rc) {
3397                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3398                 GOTO(out_mti, rc);
3399         }
3400         if (canceled_label != NULL) {
3401                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3402                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3403                 if (rc < 0) {
3404                         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3405                         name_destroy(&logname);
3406                         GOTO(out_mti, rc);
3407                 }
3408         }
3409
3410         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3411                                 cmd, fsname, poolname, ostname, label);
3412         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3413         name_destroy(&logname);
3414         /* request for update */
3415         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3416
3417         EXIT;
3418 out_mti:
3419         if (mti != NULL)
3420                 OBD_FREE_PTR(mti);
3421 out_cancel:
3422         if (canceled_label != NULL)
3423                 OBD_FREE(canceled_label, label_sz);
3424 out_label:
3425         OBD_FREE(label, label_sz);
3426         return rc;
3427 }
3428
3429 #if 0
3430 /******************** unused *********************/
3431 static int mgs_backup_llog(struct obd_device *obd, char* fsname)
3432 {
3433         struct file *filp, *bak_filp;
3434         struct lvfs_run_ctxt saved;
3435         char *logname, *buf;
3436         loff_t soff = 0 , doff = 0;
3437         int count = 4096, len;
3438         int rc = 0;
3439
3440         OBD_ALLOC(logname, PATH_MAX);
3441         if (logname == NULL)
3442                 return -ENOMEM;
3443
3444         OBD_ALLOC(buf, count);
3445         if (!buf)
3446                 GOTO(out , rc = -ENOMEM);
3447
3448         len = snprintf(logname, PATH_MAX, "%s/%s.bak",
3449                        MOUNT_CONFIGS_DIR, fsname);
3450
3451         if (len >= PATH_MAX - 1) {
3452                 GOTO(out, -ENAMETOOLONG);
3453         }
3454
3455         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3456
3457         bak_filp = l_filp_open(logname, O_RDWR|O_CREAT|O_TRUNC, 0660);
3458         if (IS_ERR(bak_filp)) {
3459                 rc = PTR_ERR(bak_filp);
3460                 CERROR("backup logfile open %s: %d\n", logname, rc);
3461                 GOTO(pop, rc);
3462         }
3463         sprintf(logname, "%s/%s", MOUNT_CONFIGS_DIR, fsname);
3464         filp = l_filp_open(logname, O_RDONLY, 0);
3465         if (IS_ERR(filp)) {
3466                 rc = PTR_ERR(filp);
3467                 CERROR("logfile open %s: %d\n", logname, rc);
3468                 GOTO(close1f, rc);
3469         }
3470
3471         while ((rc = lustre_fread(filp, buf, count, &soff)) > 0) {
3472                 rc = lustre_fwrite(bak_filp, buf, count, &doff);
3473                 break;
3474         }
3475
3476         filp_close(filp, 0);
3477 close1f:
3478         filp_close(bak_filp, 0);
3479 pop:
3480         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3481 out:
3482         if (buf)
3483                 OBD_FREE(buf, count);
3484         OBD_FREE(logname, PATH_MAX);
3485         return rc;
3486 }
3487
3488 #endif