Whamcloud - gitweb
LU-7160 mgs: Skip processing .bak files on MGS
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 /* filter out ".bak" files */
104                 /* sizeof(".bak") - 1 == 3 */
105                 if (key_sz >= 3 &&
106                     !memcmp(".bak", key + key_sz - 3, 3)) {
107                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
108                                key_sz, key);
109                         goto next;
110                 }
111
112                 de = mgs_direntry_alloc(key_sz + 1);
113                 if (de == NULL) {
114                         rc = -ENOMEM;
115                         break;
116                 }
117
118                 memcpy(de->mde_name, key, key_sz);
119                 de->mde_name[key_sz] = 0;
120
121                 list_add(&de->mde_list, log_list);
122
123 next:
124                 rc = iops->next(env, it);
125         } while (rc == 0);
126         if (rc > 0)
127                 rc = 0;
128
129         iops->put(env, it);
130
131 fini:
132         iops->fini(env, it);
133         if (rc)
134                 CERROR("%s: key failed when listing %s: rc = %d\n",
135                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
136         RETURN(rc);
137 }
138
139 /******************** DB functions *********************/
140
141 static inline int name_create(char **newname, char *prefix, char *suffix)
142 {
143         LASSERT(newname);
144         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
145         if (!*newname)
146                 return -ENOMEM;
147         sprintf(*newname, "%s%s", prefix, suffix);
148         return 0;
149 }
150
151 static inline void name_destroy(char **name)
152 {
153         if (*name)
154                 OBD_FREE(*name, strlen(*name) + 1);
155         *name = NULL;
156 }
157
158 struct mgs_fsdb_handler_data
159 {
160         struct fs_db   *fsdb;
161         __u32           ver;
162 };
163
164 /* from the (client) config log, figure out:
165         1. which ost's/mdt's are configured (by index)
166         2. what the last config step is
167         3. COMPAT_18 osc name
168 */
169 /* It might be better to have a separate db file, instead of parsing the info
170    out of the client log.  This is slow and potentially error-prone. */
171 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
172                             struct llog_rec_hdr *rec, void *data)
173 {
174         struct mgs_fsdb_handler_data *d = data;
175         struct fs_db *fsdb = d->fsdb;
176         int cfg_len = rec->lrh_len;
177         char *cfg_buf = (char*) (rec + 1);
178         struct lustre_cfg *lcfg;
179         __u32 index;
180         int rc = 0;
181         ENTRY;
182
183         if (rec->lrh_type != OBD_CFG_REC) {
184                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
185                 RETURN(-EINVAL);
186         }
187
188         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
189         if (rc) {
190                 CERROR("Insane cfg\n");
191                 RETURN(rc);
192         }
193
194         lcfg = (struct lustre_cfg *)cfg_buf;
195
196         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
197                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
198
199         /* Figure out ost indicies */
200         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
201         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
202             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
203                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
204                                        NULL, 10);
205                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
206                        lustre_cfg_string(lcfg, 1), index,
207                        lustre_cfg_string(lcfg, 2));
208                 set_bit(index, fsdb->fsdb_ost_index_map);
209         }
210
211         /* Figure out mdt indicies */
212         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
213         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
214             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
215                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
216                                        &index, NULL);
217                 if (rc != LDD_F_SV_TYPE_MDT) {
218                         CWARN("Unparsable MDC name %s, assuming index 0\n",
219                               lustre_cfg_string(lcfg, 0));
220                         index = 0;
221                 }
222                 rc = 0;
223                 CDEBUG(D_MGS, "MDT index is %u\n", index);
224                 set_bit(index, fsdb->fsdb_mdt_index_map);
225                 fsdb->fsdb_mdt_count ++;
226         }
227
228         /**
229          * figure out the old config. fsdb_gen = 0 means old log
230          * It is obsoleted and not supported anymore
231          */
232         if (fsdb->fsdb_gen == 0) {
233                 CERROR("Old config format is not supported\n");
234                 RETURN(-EINVAL);
235         }
236
237         /*
238          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
239          */
240         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
241             lcfg->lcfg_command == LCFG_ATTACH &&
242             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
243                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
244                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
245                         CWARN("MDT using 1.8 OSC name scheme\n");
246                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
247                 }
248         }
249
250         if (lcfg->lcfg_command == LCFG_MARKER) {
251                 struct cfg_marker *marker;
252                 marker = lustre_cfg_buf(lcfg, 1);
253
254                 d->ver = marker->cm_vers;
255
256                 /* Keep track of the latest marker step */
257                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
258         }
259
260         RETURN(rc);
261 }
262
263 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
264 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
265                                   struct mgs_device *mgs,
266                                   struct fs_db *fsdb)
267 {
268         char                            *logname;
269         struct llog_handle              *loghandle;
270         struct llog_ctxt                *ctxt;
271         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
272         int rc;
273
274         ENTRY;
275
276         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
277         LASSERT(ctxt != NULL);
278         rc = name_create(&logname, fsdb->fsdb_name, "-client");
279         if (rc)
280                 GOTO(out_put, rc);
281         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
282         if (rc)
283                 GOTO(out_pop, rc);
284
285         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
286         if (rc)
287                 GOTO(out_close, rc);
288
289         if (llog_get_size(loghandle) <= 1)
290                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
291
292         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
293         CDEBUG(D_INFO, "get_db = %d\n", rc);
294 out_close:
295         llog_close(env, loghandle);
296 out_pop:
297         name_destroy(&logname);
298 out_put:
299         llog_ctxt_put(ctxt);
300
301         RETURN(rc);
302 }
303
304 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
305 {
306         struct mgs_tgt_srpc_conf *tgtconf;
307
308         /* free target-specific rules */
309         while (fsdb->fsdb_srpc_tgt) {
310                 tgtconf = fsdb->fsdb_srpc_tgt;
311                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
312
313                 LASSERT(tgtconf->mtsc_tgt);
314
315                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
316                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
317                 OBD_FREE_PTR(tgtconf);
318         }
319
320         /* free general rules */
321         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
322 }
323
324 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
325 {
326         struct fs_db *fsdb;
327         struct list_head *tmp;
328
329         list_for_each(tmp, &mgs->mgs_fs_db_list) {
330                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
331                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
332                         return fsdb;
333         }
334         return NULL;
335 }
336
337 /* caller must hold the mgs->mgs_fs_db_lock */
338 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
339                                   struct mgs_device *mgs, char *fsname)
340 {
341         struct fs_db *fsdb;
342         int rc;
343         ENTRY;
344
345         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
346                 CERROR("fsname %s is too long\n", fsname);
347                 RETURN(ERR_PTR(-EINVAL));
348         }
349
350         OBD_ALLOC_PTR(fsdb);
351         if (!fsdb)
352                 RETURN(ERR_PTR(-ENOMEM));
353
354         strcpy(fsdb->fsdb_name, fsname);
355         mutex_init(&fsdb->fsdb_mutex);
356         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
357         fsdb->fsdb_gen = 1;
358
359         if (strcmp(fsname, MGSSELF_NAME) == 0) {
360                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
361                 fsdb->fsdb_mgs = mgs;
362         } else {
363                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
364                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
365                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
366                         CERROR("No memory for index maps\n");
367                         GOTO(err, rc = -ENOMEM);
368                 }
369
370                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
371                 if (rc)
372                         GOTO(err, rc);
373                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
374                 if (rc)
375                         GOTO(err, rc);
376
377                 /* initialise data for NID table */
378                 mgs_ir_init_fs(env, mgs, fsdb);
379
380                 lproc_mgs_add_live(mgs, fsdb);
381         }
382
383         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
384
385         RETURN(fsdb);
386 err:
387         if (fsdb->fsdb_ost_index_map)
388                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
389         if (fsdb->fsdb_mdt_index_map)
390                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
391         name_destroy(&fsdb->fsdb_clilov);
392         name_destroy(&fsdb->fsdb_clilmv);
393         OBD_FREE_PTR(fsdb);
394         RETURN(ERR_PTR(rc));
395 }
396
397 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
398 {
399         /* wait for anyone with the sem */
400         mutex_lock(&fsdb->fsdb_mutex);
401         lproc_mgs_del_live(mgs, fsdb);
402         list_del(&fsdb->fsdb_list);
403
404         /* deinitialize fsr */
405         mgs_ir_fini_fs(mgs, fsdb);
406
407         if (fsdb->fsdb_ost_index_map)
408                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
409         if (fsdb->fsdb_mdt_index_map)
410                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
411         name_destroy(&fsdb->fsdb_clilov);
412         name_destroy(&fsdb->fsdb_clilmv);
413         mgs_free_fsdb_srpc(fsdb);
414         mutex_unlock(&fsdb->fsdb_mutex);
415         OBD_FREE_PTR(fsdb);
416 }
417
418 int mgs_init_fsdb_list(struct mgs_device *mgs)
419 {
420         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
421         return 0;
422 }
423
424 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
425 {
426         struct fs_db *fsdb;
427         struct list_head *tmp, *tmp2;
428
429         mutex_lock(&mgs->mgs_mutex);
430         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
431                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
432                 mgs_free_fsdb(mgs, fsdb);
433         }
434         mutex_unlock(&mgs->mgs_mutex);
435         return 0;
436 }
437
438 int mgs_find_or_make_fsdb(const struct lu_env *env,
439                           struct mgs_device *mgs, char *name,
440                           struct fs_db **dbh)
441 {
442         struct fs_db *fsdb;
443         int rc = 0;
444
445         ENTRY;
446         mutex_lock(&mgs->mgs_mutex);
447         fsdb = mgs_find_fsdb(mgs, name);
448         if (fsdb) {
449                 mutex_unlock(&mgs->mgs_mutex);
450                 *dbh = fsdb;
451                 RETURN(0);
452         }
453
454         CDEBUG(D_MGS, "Creating new db\n");
455         fsdb = mgs_new_fsdb(env, mgs, name);
456         /* lock fsdb_mutex until the db is loaded from llogs */
457         if (!IS_ERR(fsdb))
458                 mutex_lock(&fsdb->fsdb_mutex);
459         mutex_unlock(&mgs->mgs_mutex);
460         if (IS_ERR(fsdb))
461                 RETURN(PTR_ERR(fsdb));
462
463         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
464                 /* populate the db from the client llog */
465                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
466                 if (rc) {
467                         CERROR("Can't get db from client log %d\n", rc);
468                         GOTO(out_free, rc);
469                 }
470         }
471
472         /* populate srpc rules from params llog */
473         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
474         if (rc) {
475                 CERROR("Can't get db from params log %d\n", rc);
476                 GOTO(out_free, rc);
477         }
478
479         mutex_unlock(&fsdb->fsdb_mutex);
480         *dbh = fsdb;
481
482         RETURN(0);
483
484 out_free:
485         mutex_unlock(&fsdb->fsdb_mutex);
486         mgs_free_fsdb(mgs, fsdb);
487         return rc;
488 }
489
490 /* 1 = index in use
491    0 = index unused
492    -1= empty client log */
493 int mgs_check_index(const struct lu_env *env,
494                     struct mgs_device *mgs,
495                     struct mgs_target_info *mti)
496 {
497         struct fs_db *fsdb;
498         void *imap;
499         int rc = 0;
500         ENTRY;
501
502         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
503
504         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
505         if (rc) {
506                 CERROR("Can't get db for %s\n", mti->mti_fsname);
507                 RETURN(rc);
508         }
509
510         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
511                 RETURN(-1);
512
513         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
514                 imap = fsdb->fsdb_ost_index_map;
515         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
516                 imap = fsdb->fsdb_mdt_index_map;
517         else
518                 RETURN(-EINVAL);
519
520         if (test_bit(mti->mti_stripe_index, imap))
521                 RETURN(1);
522         RETURN(0);
523 }
524
525 static __inline__ int next_index(void *index_map, int map_len)
526 {
527         int i;
528         for (i = 0; i < map_len * 8; i++)
529                 if (!test_bit(i, index_map)) {
530                          return i;
531                  }
532         CERROR("max index %d exceeded.\n", i);
533         return -1;
534 }
535
536 /* Return codes:
537         0  newly marked as in use
538         <0 err
539         +EALREADY for update of an old index */
540 static int mgs_set_index(const struct lu_env *env,
541                          struct mgs_device *mgs,
542                          struct mgs_target_info *mti)
543 {
544         struct fs_db *fsdb;
545         void *imap;
546         int rc = 0;
547         ENTRY;
548
549         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
550         if (rc) {
551                 CERROR("Can't get db for %s\n", mti->mti_fsname);
552                 RETURN(rc);
553         }
554
555         mutex_lock(&fsdb->fsdb_mutex);
556         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
557                 imap = fsdb->fsdb_ost_index_map;
558         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
559                 imap = fsdb->fsdb_mdt_index_map;
560         } else {
561                 GOTO(out_up, rc = -EINVAL);
562         }
563
564         if (mti->mti_flags & LDD_F_NEED_INDEX) {
565                 rc = next_index(imap, INDEX_MAP_SIZE);
566                 if (rc == -1)
567                         GOTO(out_up, rc = -ERANGE);
568                 mti->mti_stripe_index = rc;
569                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
570                         fsdb->fsdb_mdt_count ++;
571         }
572
573         /* the last index(0xffff) is reserved for default value. */
574         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
575                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
576                                    "but index must be less than %u.\n",
577                                    mti->mti_svname, mti->mti_stripe_index,
578                                    INDEX_MAP_SIZE * 8 - 1);
579                 GOTO(out_up, rc = -ERANGE);
580         }
581
582         if (test_bit(mti->mti_stripe_index, imap)) {
583                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
584                     !(mti->mti_flags & LDD_F_WRITECONF)) {
585                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
586                                            "%d, but that index is already in "
587                                            "use. Use --writeconf to force\n",
588                                            mti->mti_svname,
589                                            mti->mti_stripe_index);
590                         GOTO(out_up, rc = -EADDRINUSE);
591                 } else {
592                         CDEBUG(D_MGS, "Server %s updating index %d\n",
593                                mti->mti_svname, mti->mti_stripe_index);
594                         GOTO(out_up, rc = EALREADY);
595                 }
596         }
597
598         set_bit(mti->mti_stripe_index, imap);
599         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
600         mutex_unlock(&fsdb->fsdb_mutex);
601         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
602                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
603
604         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
605                mti->mti_stripe_index);
606
607         RETURN(0);
608 out_up:
609         mutex_unlock(&fsdb->fsdb_mutex);
610         return rc;
611 }
612
613 struct mgs_modify_lookup {
614         struct cfg_marker mml_marker;
615         int               mml_modified;
616 };
617
618 static int mgs_modify_handler(const struct lu_env *env,
619                               struct llog_handle *llh,
620                               struct llog_rec_hdr *rec, void *data)
621 {
622         struct mgs_modify_lookup *mml = data;
623         struct cfg_marker *marker;
624         struct lustre_cfg *lcfg = REC_DATA(rec);
625         int cfg_len = REC_DATA_LEN(rec);
626         int rc;
627         ENTRY;
628
629         if (rec->lrh_type != OBD_CFG_REC) {
630                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
631                 RETURN(-EINVAL);
632         }
633
634         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
635         if (rc) {
636                 CERROR("Insane cfg\n");
637                 RETURN(rc);
638         }
639
640         /* We only care about markers */
641         if (lcfg->lcfg_command != LCFG_MARKER)
642                 RETURN(0);
643
644         marker = lustre_cfg_buf(lcfg, 1);
645         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
646             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
647             !(marker->cm_flags & CM_SKIP)) {
648                 /* Found a non-skipped marker match */
649                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
650                        rec->lrh_index, marker->cm_step,
651                        marker->cm_flags, mml->mml_marker.cm_flags,
652                        marker->cm_tgtname, marker->cm_comment);
653                 /* Overwrite the old marker llog entry */
654                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
655                 marker->cm_flags |= mml->mml_marker.cm_flags;
656                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
657                 rc = llog_write(env, llh, rec, rec->lrh_index);
658                 if (!rc)
659                          mml->mml_modified++;
660         }
661
662         RETURN(rc);
663 }
664
665 /**
666  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
667  * Return code:
668  * 0 - modified successfully,
669  * 1 - no modification was done
670  * negative - error
671  */
672 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
673                       struct fs_db *fsdb, struct mgs_target_info *mti,
674                       char *logname, char *devname, char *comment, int flags)
675 {
676         struct llog_handle *loghandle;
677         struct llog_ctxt *ctxt;
678         struct mgs_modify_lookup *mml;
679         int rc;
680
681         ENTRY;
682
683         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
684         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
685                flags);
686
687         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
688         LASSERT(ctxt != NULL);
689         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
690         if (rc < 0) {
691                 if (rc == -ENOENT)
692                         rc = 0;
693                 GOTO(out_pop, rc);
694         }
695
696         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
697         if (rc)
698                 GOTO(out_close, rc);
699
700         if (llog_get_size(loghandle) <= 1)
701                 GOTO(out_close, rc = 0);
702
703         OBD_ALLOC_PTR(mml);
704         if (!mml)
705                 GOTO(out_close, rc = -ENOMEM);
706         if (strlcpy(mml->mml_marker.cm_comment, comment,
707                     sizeof(mml->mml_marker.cm_comment)) >=
708             sizeof(mml->mml_marker.cm_comment))
709                 GOTO(out_free, rc = -E2BIG);
710         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
711                     sizeof(mml->mml_marker.cm_tgtname)) >=
712             sizeof(mml->mml_marker.cm_tgtname))
713                 GOTO(out_free, rc = -E2BIG);
714         /* Modify mostly means cancel */
715         mml->mml_marker.cm_flags = flags;
716         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
717         mml->mml_modified = 0;
718         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
719                           NULL);
720         if (!rc && !mml->mml_modified)
721                 rc = 1;
722
723 out_free:
724         OBD_FREE_PTR(mml);
725
726 out_close:
727         llog_close(env, loghandle);
728 out_pop:
729         if (rc < 0)
730                 CERROR("%s: modify %s/%s failed: rc = %d\n",
731                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
732         llog_ctxt_put(ctxt);
733         RETURN(rc);
734 }
735
736 /** This structure is passed to mgs_replace_handler */
737 struct mgs_replace_uuid_lookup {
738         /* Nids are replaced for this target device */
739         struct mgs_target_info target;
740         /* Temporary modified llog */
741         struct llog_handle *temp_llh;
742         /* Flag is set if in target block*/
743         int in_target_device;
744         /* Nids already added. Just skip (multiple nids) */
745         int device_nids_added;
746         /* Flag is set if this block should not be copied */
747         int skip_it;
748 };
749
750 /**
751  * Check: a) if block should be skipped
752  * b) is it target block
753  *
754  * \param[in] lcfg
755  * \param[in] mrul
756  *
757  * \retval 0 should not to be skipped
758  * \retval 1 should to be skipped
759  */
760 static int check_markers(struct lustre_cfg *lcfg,
761                          struct mgs_replace_uuid_lookup *mrul)
762 {
763          struct cfg_marker *marker;
764
765         /* Track markers. Find given device */
766         if (lcfg->lcfg_command == LCFG_MARKER) {
767                 marker = lustre_cfg_buf(lcfg, 1);
768                 /* Clean llog from records marked as CM_EXCLUDE.
769                    CM_SKIP records are used for "active" command
770                    and can be restored if needed */
771                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
772                     (CM_EXCLUDE | CM_START)) {
773                         mrul->skip_it = 1;
774                         return 1;
775                 }
776
777                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
778                     (CM_EXCLUDE | CM_END)) {
779                         mrul->skip_it = 0;
780                         return 1;
781                 }
782
783                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
784                         LASSERT(!(marker->cm_flags & CM_START) ||
785                                 !(marker->cm_flags & CM_END));
786                         if (marker->cm_flags & CM_START) {
787                                 mrul->in_target_device = 1;
788                                 mrul->device_nids_added = 0;
789                         } else if (marker->cm_flags & CM_END)
790                                 mrul->in_target_device = 0;
791                 }
792         }
793
794         return 0;
795 }
796
797 static int record_base(const struct lu_env *env, struct llog_handle *llh,
798                      char *cfgname, lnet_nid_t nid, int cmd,
799                      char *s1, char *s2, char *s3, char *s4)
800 {
801         struct mgs_thread_info  *mgi = mgs_env_info(env);
802         struct llog_cfg_rec     *lcr;
803         int rc;
804
805         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
806                cmd, s1, s2, s3, s4);
807
808         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
809         if (s1)
810                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
811         if (s2)
812                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
813         if (s3)
814                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
815         if (s4)
816                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
817
818         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
819         if (lcr == NULL)
820                 return -ENOMEM;
821
822         lcr->lcr_cfg.lcfg_nid = nid;
823         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
824
825         lustre_cfg_rec_free(lcr);
826
827         if (rc < 0)
828                 CDEBUG(D_MGS,
829                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
830                        cfgname, cmd, s1, s2, s3, s4, rc);
831         return rc;
832 }
833
834 static inline int record_add_uuid(const struct lu_env *env,
835                                   struct llog_handle *llh,
836                                   uint64_t nid, char *uuid)
837 {
838         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
839                            NULL, NULL, NULL);
840 }
841
842 static inline int record_add_conn(const struct lu_env *env,
843                                   struct llog_handle *llh,
844                                   char *devname, char *uuid)
845 {
846         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
847                            NULL, NULL, NULL);
848 }
849
850 static inline int record_attach(const struct lu_env *env,
851                                 struct llog_handle *llh, char *devname,
852                                 char *type, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
855                            NULL, NULL);
856 }
857
858 static inline int record_setup(const struct lu_env *env,
859                                struct llog_handle *llh, char *devname,
860                                char *s1, char *s2, char *s3, char *s4)
861 {
862         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
863 }
864
865 /**
866  * \retval <0 record processing error
867  * \retval n record is processed. No need copy original one.
868  * \retval 0 record is not processed.
869  */
870 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
871                            struct mgs_replace_uuid_lookup *mrul)
872 {
873         int nids_added = 0;
874         lnet_nid_t nid;
875         char *ptr;
876         int rc;
877
878         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
879                 /* LCFG_ADD_UUID command found. Let's skip original command
880                    and add passed nids */
881                 ptr = mrul->target.mti_params;
882                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
883                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
884                                "device %s\n", libcfs_nid2str(nid),
885                                 mrul->target.mti_params,
886                                 mrul->target.mti_svname);
887                         rc = record_add_uuid(env,
888                                              mrul->temp_llh, nid,
889                                              mrul->target.mti_params);
890                         if (!rc)
891                                 nids_added++;
892                 }
893
894                 if (nids_added == 0) {
895                         CERROR("No new nids were added, nid %s with uuid %s, "
896                                "device %s\n", libcfs_nid2str(nid),
897                                mrul->target.mti_params,
898                                mrul->target.mti_svname);
899                         RETURN(-ENXIO);
900                 } else {
901                         mrul->device_nids_added = 1;
902                 }
903
904                 return nids_added;
905         }
906
907         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
908                 /* LCFG_SETUP command found. UUID should be changed */
909                 rc = record_setup(env,
910                                   mrul->temp_llh,
911                                   /* devname the same */
912                                   lustre_cfg_string(lcfg, 0),
913                                   /* s1 is not changed */
914                                   lustre_cfg_string(lcfg, 1),
915                                   /* new uuid should be
916                                   the full nidlist */
917                                   mrul->target.mti_params,
918                                   /* s3 is not changed */
919                                   lustre_cfg_string(lcfg, 3),
920                                   /* s4 is not changed */
921                                   lustre_cfg_string(lcfg, 4));
922                 return rc ? rc : 1;
923         }
924
925         /* Another commands in target device block */
926         return 0;
927 }
928
929 /**
930  * Handler that called for every record in llog.
931  * Records are processed in order they placed in llog.
932  *
933  * \param[in] llh       log to be processed
934  * \param[in] rec       current record
935  * \param[in] data      mgs_replace_uuid_lookup structure
936  *
937  * \retval 0    success
938  */
939 static int mgs_replace_handler(const struct lu_env *env,
940                                struct llog_handle *llh,
941                                struct llog_rec_hdr *rec,
942                                void *data)
943 {
944         struct mgs_replace_uuid_lookup *mrul;
945         struct lustre_cfg *lcfg = REC_DATA(rec);
946         int cfg_len = REC_DATA_LEN(rec);
947         int rc;
948         ENTRY;
949
950         mrul = (struct mgs_replace_uuid_lookup *)data;
951
952         if (rec->lrh_type != OBD_CFG_REC) {
953                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
954                        rec->lrh_type, lcfg->lcfg_command,
955                        lustre_cfg_string(lcfg, 0),
956                        lustre_cfg_string(lcfg, 1));
957                 RETURN(-EINVAL);
958         }
959
960         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
961         if (rc) {
962                 /* Do not copy any invalidated records */
963                 GOTO(skip_out, rc = 0);
964         }
965
966         rc = check_markers(lcfg, mrul);
967         if (rc || mrul->skip_it)
968                 GOTO(skip_out, rc = 0);
969
970         /* Write to new log all commands outside target device block */
971         if (!mrul->in_target_device)
972                 GOTO(copy_out, rc = 0);
973
974         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
975            (failover nids) for this target, assuming that if then
976            primary is changing then so is the failover */
977         if (mrul->device_nids_added &&
978             (lcfg->lcfg_command == LCFG_ADD_UUID ||
979              lcfg->lcfg_command == LCFG_ADD_CONN))
980                 GOTO(skip_out, rc = 0);
981
982         rc = process_command(env, lcfg, mrul);
983         if (rc < 0)
984                 RETURN(rc);
985
986         if (rc)
987                 RETURN(0);
988 copy_out:
989         /* Record is placed in temporary llog as is */
990         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
991
992         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
993                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
994                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
995         RETURN(rc);
996
997 skip_out:
998         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
999                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1000                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1001         RETURN(rc);
1002 }
1003
1004 static int mgs_log_is_empty(const struct lu_env *env,
1005                             struct mgs_device *mgs, char *name)
1006 {
1007         struct llog_ctxt        *ctxt;
1008         int                      rc;
1009
1010         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1011         LASSERT(ctxt != NULL);
1012
1013         rc = llog_is_empty(env, ctxt, name);
1014         llog_ctxt_put(ctxt);
1015         return rc;
1016 }
1017
1018 static int mgs_replace_nids_log(const struct lu_env *env,
1019                                 struct obd_device *mgs, struct fs_db *fsdb,
1020                                 char *logname, char *devname, char *nids)
1021 {
1022         struct llog_handle *orig_llh, *backup_llh;
1023         struct llog_ctxt *ctxt;
1024         struct mgs_replace_uuid_lookup *mrul;
1025         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1026         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1027         char *backup;
1028         int rc, rc2;
1029         ENTRY;
1030
1031         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1032
1033         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1034         LASSERT(ctxt != NULL);
1035
1036         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1037                 /* Log is empty. Nothing to replace */
1038                 GOTO(out_put, rc = 0);
1039         }
1040
1041         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1042         if (backup == NULL)
1043                 GOTO(out_put, rc = -ENOMEM);
1044
1045         sprintf(backup, "%s.bak", logname);
1046
1047         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1048         if (rc == 0) {
1049                 /* Now erase original log file. Connections are not allowed.
1050                    Backup is already saved */
1051                 rc = llog_erase(env, ctxt, NULL, logname);
1052                 if (rc < 0)
1053                         GOTO(out_free, rc);
1054         } else if (rc != -ENOENT) {
1055                 CERROR("%s: can't make backup for %s: rc = %d\n",
1056                        mgs->obd_name, logname, rc);
1057                 GOTO(out_free,rc);
1058         }
1059
1060         /* open local log */
1061         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1062         if (rc)
1063                 GOTO(out_restore, rc);
1064
1065         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1066         if (rc)
1067                 GOTO(out_closel, rc);
1068
1069         /* open backup llog */
1070         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1071                        LLOG_OPEN_EXISTS);
1072         if (rc)
1073                 GOTO(out_closel, rc);
1074
1075         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1076         if (rc)
1077                 GOTO(out_close, rc);
1078
1079         if (llog_get_size(backup_llh) <= 1)
1080                 GOTO(out_close, rc = 0);
1081
1082         OBD_ALLOC_PTR(mrul);
1083         if (!mrul)
1084                 GOTO(out_close, rc = -ENOMEM);
1085         /* devname is only needed information to replace UUID records */
1086         strlcpy(mrul->target.mti_svname, devname,
1087                 sizeof(mrul->target.mti_svname));
1088         /* parse nids later */
1089         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1090         /* Copy records to this temporary llog */
1091         mrul->temp_llh = orig_llh;
1092
1093         rc = llog_process(env, backup_llh, mgs_replace_handler,
1094                           (void *)mrul, NULL);
1095         OBD_FREE_PTR(mrul);
1096 out_close:
1097         rc2 = llog_close(NULL, backup_llh);
1098         if (!rc)
1099                 rc = rc2;
1100 out_closel:
1101         rc2 = llog_close(NULL, orig_llh);
1102         if (!rc)
1103                 rc = rc2;
1104
1105 out_restore:
1106         if (rc) {
1107                 CERROR("%s: llog should be restored: rc = %d\n",
1108                        mgs->obd_name, rc);
1109                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1110                                   logname);
1111                 if (rc2 < 0)
1112                         CERROR("%s: can't restore backup %s: rc = %d\n",
1113                                mgs->obd_name, logname, rc2);
1114         }
1115
1116 out_free:
1117         OBD_FREE(backup, strlen(backup) + 1);
1118
1119 out_put:
1120         llog_ctxt_put(ctxt);
1121
1122         if (rc)
1123                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1124                        mgs->obd_name, logname, rc);
1125
1126         RETURN(rc);
1127 }
1128
1129 /**
1130  * Parse device name and get file system name and/or device index
1131  *
1132  * \param[in]   devname device name (ex. lustre-MDT0000)
1133  * \param[out]  fsname  file system name(optional)
1134  * \param[out]  index   device index(optional)
1135  *
1136  * \retval 0    success
1137  */
1138 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1139 {
1140         int rc;
1141         ENTRY;
1142
1143         /* Extract fsname */
1144         if (fsname) {
1145                 rc = server_name2fsname(devname, fsname, NULL);
1146                 if (rc < 0) {
1147                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1148                                devname);
1149                         RETURN(-EINVAL);
1150                 }
1151         }
1152
1153         if (index) {
1154                 rc = server_name2index(devname, index, NULL);
1155                 if (rc < 0) {
1156                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1157                                devname);
1158                         RETURN(-EINVAL);
1159                 }
1160         }
1161
1162         RETURN(0);
1163 }
1164
1165 /* This is only called during replace_nids */
1166 static int only_mgs_is_running(struct obd_device *mgs_obd)
1167 {
1168         /* TDB: Is global variable with devices count exists? */
1169         int num_devices = get_devices_count();
1170         int num_exports = 0;
1171         struct obd_export *exp;
1172
1173         spin_lock(&mgs_obd->obd_dev_lock);
1174         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1175                 /* skip self export */
1176                 if (exp == mgs_obd->obd_self_export)
1177                         continue;
1178                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1179                         continue;
1180
1181                 ++num_exports;
1182
1183                 CERROR("%s: node %s still connected during replace_nids "
1184                        "connect_flags:%llx\n",
1185                        mgs_obd->obd_name,
1186                        libcfs_nid2str(exp->exp_nid_stats->nid),
1187                        exp_connect_flags(exp));
1188
1189         }
1190         spin_unlock(&mgs_obd->obd_dev_lock);
1191
1192         /* osd, MGS and MGC + self_export
1193            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1194         return (num_devices <= 3) && (num_exports == 0);
1195 }
1196
1197 static int name_create_mdt(char **logname, char *fsname, int i)
1198 {
1199         char mdt_index[9];
1200
1201         sprintf(mdt_index, "-MDT%04x", i);
1202         return name_create(logname, fsname, mdt_index);
1203 }
1204
1205 /**
1206  * Replace nids for \a device to \a nids values
1207  *
1208  * \param obd           MGS obd device
1209  * \param devname       nids need to be replaced for this device
1210  * (ex. lustre-OST0000)
1211  * \param nids          nids list (ex. nid1,nid2,nid3)
1212  *
1213  * \retval 0    success
1214  */
1215 int mgs_replace_nids(const struct lu_env *env,
1216                      struct mgs_device *mgs,
1217                      char *devname, char *nids)
1218 {
1219         /* Assume fsname is part of device name */
1220         char fsname[MTI_NAME_MAXLEN];
1221         int rc;
1222         __u32 index;
1223         char *logname;
1224         struct fs_db *fsdb;
1225         unsigned int i;
1226         int conn_state;
1227         struct obd_device *mgs_obd = mgs->mgs_obd;
1228         ENTRY;
1229
1230         /* We can only change NIDs if no other nodes are connected */
1231         spin_lock(&mgs_obd->obd_dev_lock);
1232         conn_state = mgs_obd->obd_no_conn;
1233         mgs_obd->obd_no_conn = 1;
1234         spin_unlock(&mgs_obd->obd_dev_lock);
1235
1236         /* We can not change nids if not only MGS is started */
1237         if (!only_mgs_is_running(mgs_obd)) {
1238                 CERROR("Only MGS is allowed to be started\n");
1239                 GOTO(out, rc = -EINPROGRESS);
1240         }
1241
1242         /* Get fsname and index*/
1243         rc = mgs_parse_devname(devname, fsname, &index);
1244         if (rc)
1245                 GOTO(out, rc);
1246
1247         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1248         if (rc) {
1249                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1250                 GOTO(out, rc);
1251         }
1252
1253         /* Process client llogs */
1254         name_create(&logname, fsname, "-client");
1255         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1256         name_destroy(&logname);
1257         if (rc) {
1258                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1259                        fsname, devname, rc);
1260                 GOTO(out, rc);
1261         }
1262
1263         /* Process MDT llogs */
1264         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1265                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1266                         continue;
1267                 name_create_mdt(&logname, fsname, i);
1268                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1269                 name_destroy(&logname);
1270                 if (rc)
1271                         GOTO(out, rc);
1272         }
1273
1274 out:
1275         spin_lock(&mgs_obd->obd_dev_lock);
1276         mgs_obd->obd_no_conn = conn_state;
1277         spin_unlock(&mgs_obd->obd_dev_lock);
1278
1279         RETURN(rc);
1280 }
1281
1282 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1283                             char *devname, struct lov_desc *desc)
1284 {
1285         struct mgs_thread_info  *mgi = mgs_env_info(env);
1286         struct llog_cfg_rec     *lcr;
1287         int rc;
1288
1289         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1290         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1291         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1292         if (lcr == NULL)
1293                 return -ENOMEM;
1294
1295         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1296         lustre_cfg_rec_free(lcr);
1297         return rc;
1298 }
1299
1300 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1301                             char *devname, struct lmv_desc *desc)
1302 {
1303         struct mgs_thread_info  *mgi = mgs_env_info(env);
1304         struct llog_cfg_rec     *lcr;
1305         int rc;
1306
1307         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1308         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1309         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1310         if (lcr == NULL)
1311                 return -ENOMEM;
1312
1313         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1314         lustre_cfg_rec_free(lcr);
1315         return rc;
1316 }
1317
1318 static inline int record_mdc_add(const struct lu_env *env,
1319                                  struct llog_handle *llh,
1320                                  char *logname, char *mdcuuid,
1321                                  char *mdtuuid, char *index,
1322                                  char *gen)
1323 {
1324         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1325                            mdtuuid,index,gen,mdcuuid);
1326 }
1327
1328 static inline int record_lov_add(const struct lu_env *env,
1329                                  struct llog_handle *llh,
1330                                  char *lov_name, char *ost_uuid,
1331                                  char *index, char *gen)
1332 {
1333         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1334                            ost_uuid, index, gen, NULL);
1335 }
1336
1337 static inline int record_mount_opt(const struct lu_env *env,
1338                                    struct llog_handle *llh,
1339                                    char *profile, char *lov_name,
1340                                    char *mdc_name)
1341 {
1342         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1343                            profile, lov_name, mdc_name, NULL);
1344 }
1345
1346 static int record_marker(const struct lu_env *env,
1347                          struct llog_handle *llh,
1348                          struct fs_db *fsdb, __u32 flags,
1349                          char *tgtname, char *comment)
1350 {
1351         struct mgs_thread_info *mgi = mgs_env_info(env);
1352         struct llog_cfg_rec *lcr;
1353         int rc;
1354         int cplen = 0;
1355
1356         if (flags & CM_START)
1357                 fsdb->fsdb_gen++;
1358         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1359         mgi->mgi_marker.cm_flags = flags;
1360         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1361         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1362                         sizeof(mgi->mgi_marker.cm_tgtname));
1363         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1364                 return -E2BIG;
1365         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1366                         sizeof(mgi->mgi_marker.cm_comment));
1367         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1368                 return -E2BIG;
1369         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1370         mgi->mgi_marker.cm_canceltime = 0;
1371         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1372         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1373                             sizeof(mgi->mgi_marker));
1374         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1375         if (lcr == NULL)
1376                 return -ENOMEM;
1377
1378         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1379         lustre_cfg_rec_free(lcr);
1380         return rc;
1381 }
1382
1383 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1384                             struct llog_handle **llh, char *name)
1385 {
1386         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1387         struct llog_ctxt        *ctxt;
1388         int                      rc = 0;
1389         ENTRY;
1390
1391         if (*llh)
1392                 GOTO(out, rc = -EBUSY);
1393
1394         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1395         if (!ctxt)
1396                 GOTO(out, rc = -ENODEV);
1397         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1398
1399         rc = llog_open_create(env, ctxt, llh, NULL, name);
1400         if (rc)
1401                 GOTO(out_ctxt, rc);
1402         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1403         if (rc)
1404                 llog_close(env, *llh);
1405 out_ctxt:
1406         llog_ctxt_put(ctxt);
1407 out:
1408         if (rc) {
1409                 CERROR("%s: can't start log %s: rc = %d\n",
1410                        mgs->mgs_obd->obd_name, name, rc);
1411                 *llh = NULL;
1412         }
1413         RETURN(rc);
1414 }
1415
1416 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1417 {
1418         int rc;
1419
1420         rc = llog_close(env, *llh);
1421         *llh = NULL;
1422
1423         return rc;
1424 }
1425
1426 /******************** config "macros" *********************/
1427
1428 /* write an lcfg directly into a log (with markers) */
1429 static int mgs_write_log_direct(const struct lu_env *env,
1430                                 struct mgs_device *mgs, struct fs_db *fsdb,
1431                                 char *logname, struct llog_cfg_rec *lcr,
1432                                 char *devname, char *comment)
1433 {
1434         struct llog_handle *llh = NULL;
1435         int rc;
1436
1437         ENTRY;
1438
1439         rc = record_start_log(env, mgs, &llh, logname);
1440         if (rc)
1441                 RETURN(rc);
1442
1443         /* FIXME These should be a single journal transaction */
1444         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1445         if (rc)
1446                 GOTO(out_end, rc);
1447         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1448         if (rc)
1449                 GOTO(out_end, rc);
1450         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1451         if (rc)
1452                 GOTO(out_end, rc);
1453 out_end:
1454         record_end_log(env, &llh);
1455         RETURN(rc);
1456 }
1457
1458 /* write the lcfg in all logs for the given fs */
1459 static int mgs_write_log_direct_all(const struct lu_env *env,
1460                                     struct mgs_device *mgs,
1461                                     struct fs_db *fsdb,
1462                                     struct mgs_target_info *mti,
1463                                     struct llog_cfg_rec *lcr, char *devname,
1464                                     char *comment, int server_only)
1465 {
1466         struct list_head         log_list;
1467         struct mgs_direntry     *dirent, *n;
1468         char                    *fsname = mti->mti_fsname;
1469         int                      rc = 0, len = strlen(fsname);
1470
1471         ENTRY;
1472         /* Find all the logs in the CONFIGS directory */
1473         rc = class_dentry_readdir(env, mgs, &log_list);
1474         if (rc)
1475                 RETURN(rc);
1476
1477         /* Could use fsdb index maps instead of directory listing */
1478         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1479                 list_del_init(&dirent->mde_list);
1480                 /* don't write to sptlrpc rule log */
1481                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1482                         goto next;
1483
1484                 /* caller wants write server logs only */
1485                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1486                         goto next;
1487
1488                 if (strlen(dirent->mde_name) <= len ||
1489                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1490                     dirent->mde_name[len] != '-')
1491                         goto next;
1492
1493                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1494                 /* Erase any old settings of this same parameter */
1495                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1496                                 devname, comment, CM_SKIP);
1497                 if (rc < 0)
1498                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1499                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1500                 if (lcr == NULL)
1501                         goto next;
1502                 /* Write the new one */
1503                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1504                                           lcr, devname, comment);
1505                 if (rc != 0)
1506                         CERROR("%s: writing log %s: rc = %d\n",
1507                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1508 next:
1509                 mgs_direntry_free(dirent);
1510         }
1511
1512         RETURN(rc);
1513 }
1514
1515 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1516                                     struct mgs_device *mgs,
1517                                     struct fs_db *fsdb,
1518                                     struct mgs_target_info *mti,
1519                                     int index, char *logname);
1520 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1521                                     struct mgs_device *mgs,
1522                                     struct fs_db *fsdb,
1523                                     struct mgs_target_info *mti,
1524                                     char *logname, char *suffix, char *lovname,
1525                                     enum lustre_sec_part sec_part, int flags);
1526 static int name_create_mdt_and_lov(char **logname, char **lovname,
1527                                    struct fs_db *fsdb, int i);
1528
1529 static int add_param(char *params, char *key, char *val)
1530 {
1531         char *start = params + strlen(params);
1532         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1533         int keylen = 0;
1534
1535         if (key != NULL)
1536                 keylen = strlen(key);
1537         if (start + 1 + keylen + strlen(val) >= end) {
1538                 CERROR("params are too long: %s %s%s\n",
1539                        params, key != NULL ? key : "", val);
1540                 return -EINVAL;
1541         }
1542
1543         sprintf(start, " %s%s", key != NULL ? key : "", val);
1544         return 0;
1545 }
1546
1547 /**
1548  * Walk through client config log record and convert the related records
1549  * into the target.
1550  **/
1551 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1552                                          struct llog_handle *llh,
1553                                          struct llog_rec_hdr *rec, void *data)
1554 {
1555         struct mgs_device *mgs;
1556         struct obd_device *obd;
1557         struct mgs_target_info *mti, *tmti;
1558         struct fs_db *fsdb;
1559         int cfg_len = rec->lrh_len;
1560         char *cfg_buf = (char*) (rec + 1);
1561         struct lustre_cfg *lcfg;
1562         int rc = 0;
1563         struct llog_handle *mdt_llh = NULL;
1564         static int got_an_osc_or_mdc = 0;
1565         /* 0: not found any osc/mdc;
1566            1: found osc;
1567            2: found mdc;
1568         */
1569         static int last_step = -1;
1570         int cplen = 0;
1571
1572         ENTRY;
1573
1574         mti = ((struct temp_comp*)data)->comp_mti;
1575         tmti = ((struct temp_comp*)data)->comp_tmti;
1576         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1577         obd = ((struct temp_comp *)data)->comp_obd;
1578         mgs = lu2mgs_dev(obd->obd_lu_dev);
1579         LASSERT(mgs);
1580
1581         if (rec->lrh_type != OBD_CFG_REC) {
1582                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1583                 RETURN(-EINVAL);
1584         }
1585
1586         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1587         if (rc) {
1588                 CERROR("Insane cfg\n");
1589                 RETURN(rc);
1590         }
1591
1592         lcfg = (struct lustre_cfg *)cfg_buf;
1593
1594         if (lcfg->lcfg_command == LCFG_MARKER) {
1595                 struct cfg_marker *marker;
1596                 marker = lustre_cfg_buf(lcfg, 1);
1597                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1598                     (marker->cm_flags & CM_START) &&
1599                      !(marker->cm_flags & CM_SKIP)) {
1600                         got_an_osc_or_mdc = 1;
1601                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1602                                         sizeof(tmti->mti_svname));
1603                         if (cplen >= sizeof(tmti->mti_svname))
1604                                 RETURN(-E2BIG);
1605                         rc = record_start_log(env, mgs, &mdt_llh,
1606                                               mti->mti_svname);
1607                         if (rc)
1608                                 RETURN(rc);
1609                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1610                                            mti->mti_svname, "add osc(copied)");
1611                         record_end_log(env, &mdt_llh);
1612                         last_step = marker->cm_step;
1613                         RETURN(rc);
1614                 }
1615                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1616                     (marker->cm_flags & CM_END) &&
1617                      !(marker->cm_flags & CM_SKIP)) {
1618                         LASSERT(last_step == marker->cm_step);
1619                         last_step = -1;
1620                         got_an_osc_or_mdc = 0;
1621                         memset(tmti, 0, sizeof(*tmti));
1622                         rc = record_start_log(env, mgs, &mdt_llh,
1623                                               mti->mti_svname);
1624                         if (rc)
1625                                 RETURN(rc);
1626                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1627                                            mti->mti_svname, "add osc(copied)");
1628                         record_end_log(env, &mdt_llh);
1629                         RETURN(rc);
1630                 }
1631                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1632                     (marker->cm_flags & CM_START) &&
1633                      !(marker->cm_flags & CM_SKIP)) {
1634                         got_an_osc_or_mdc = 2;
1635                         last_step = marker->cm_step;
1636                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1637                                strlen(marker->cm_tgtname));
1638
1639                         RETURN(rc);
1640                 }
1641                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1642                     (marker->cm_flags & CM_END) &&
1643                      !(marker->cm_flags & CM_SKIP)) {
1644                         LASSERT(last_step == marker->cm_step);
1645                         last_step = -1;
1646                         got_an_osc_or_mdc = 0;
1647                         memset(tmti, 0, sizeof(*tmti));
1648                         RETURN(rc);
1649                 }
1650         }
1651
1652         if (got_an_osc_or_mdc == 0 || last_step < 0)
1653                 RETURN(rc);
1654
1655         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1656                 __u64 nodenid = lcfg->lcfg_nid;
1657
1658                 if (strlen(tmti->mti_uuid) == 0) {
1659                         /* target uuid not set, this config record is before
1660                          * LCFG_SETUP, this nid is one of target node nid.
1661                          */
1662                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1663                         tmti->mti_nid_count++;
1664                 } else {
1665                         char nidstr[LNET_NIDSTR_SIZE];
1666
1667                         /* failover node nid */
1668                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1669                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1670                                         nidstr);
1671                 }
1672
1673                 RETURN(rc);
1674         }
1675
1676         if (lcfg->lcfg_command == LCFG_SETUP) {
1677                 char *target;
1678
1679                 target = lustre_cfg_string(lcfg, 1);
1680                 memcpy(tmti->mti_uuid, target, strlen(target));
1681                 RETURN(rc);
1682         }
1683
1684         /* ignore client side sptlrpc_conf_log */
1685         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1686                 RETURN(rc);
1687
1688         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1689                 int index;
1690
1691                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1692                         RETURN (-EINVAL);
1693
1694                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1695                        strlen(mti->mti_fsname));
1696                 tmti->mti_stripe_index = index;
1697
1698                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1699                                               mti->mti_stripe_index,
1700                                               mti->mti_svname);
1701                 memset(tmti, 0, sizeof(*tmti));
1702                 RETURN(rc);
1703         }
1704
1705         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1706                 int index;
1707                 char mdt_index[9];
1708                 char *logname, *lovname;
1709
1710                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1711                                              mti->mti_stripe_index);
1712                 if (rc)
1713                         RETURN(rc);
1714                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1715
1716                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1717                         name_destroy(&logname);
1718                         name_destroy(&lovname);
1719                         RETURN(-EINVAL);
1720                 }
1721
1722                 tmti->mti_stripe_index = index;
1723                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1724                                          mdt_index, lovname,
1725                                          LUSTRE_SP_MDT, 0);
1726                 name_destroy(&logname);
1727                 name_destroy(&lovname);
1728                 RETURN(rc);
1729         }
1730         RETURN(rc);
1731 }
1732
1733 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1734 /* stealed from mgs_get_fsdb_from_llog*/
1735 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1736                                               struct mgs_device *mgs,
1737                                               char *client_name,
1738                                               struct temp_comp* comp)
1739 {
1740         struct llog_handle *loghandle;
1741         struct mgs_target_info *tmti;
1742         struct llog_ctxt *ctxt;
1743         int rc;
1744
1745         ENTRY;
1746
1747         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1748         LASSERT(ctxt != NULL);
1749
1750         OBD_ALLOC_PTR(tmti);
1751         if (tmti == NULL)
1752                 GOTO(out_ctxt, rc = -ENOMEM);
1753
1754         comp->comp_tmti = tmti;
1755         comp->comp_obd = mgs->mgs_obd;
1756
1757         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1758                        LLOG_OPEN_EXISTS);
1759         if (rc < 0) {
1760                 if (rc == -ENOENT)
1761                         rc = 0;
1762                 GOTO(out_pop, rc);
1763         }
1764
1765         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1766         if (rc)
1767                 GOTO(out_close, rc);
1768
1769         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1770                                   (void *)comp, NULL, false);
1771         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1772 out_close:
1773         llog_close(env, loghandle);
1774 out_pop:
1775         OBD_FREE_PTR(tmti);
1776 out_ctxt:
1777         llog_ctxt_put(ctxt);
1778         RETURN(rc);
1779 }
1780
1781 /* lmv is the second thing for client logs */
1782 /* copied from mgs_write_log_lov. Please refer to that.  */
1783 static int mgs_write_log_lmv(const struct lu_env *env,
1784                              struct mgs_device *mgs,
1785                              struct fs_db *fsdb,
1786                              struct mgs_target_info *mti,
1787                              char *logname, char *lmvname)
1788 {
1789         struct llog_handle *llh = NULL;
1790         struct lmv_desc *lmvdesc;
1791         char *uuid;
1792         int rc = 0;
1793         ENTRY;
1794
1795         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1796
1797         OBD_ALLOC_PTR(lmvdesc);
1798         if (lmvdesc == NULL)
1799                 RETURN(-ENOMEM);
1800         lmvdesc->ld_active_tgt_count = 0;
1801         lmvdesc->ld_tgt_count = 0;
1802         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1803         uuid = (char *)lmvdesc->ld_uuid.uuid;
1804
1805         rc = record_start_log(env, mgs, &llh, logname);
1806         if (rc)
1807                 GOTO(out_free, rc);
1808         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1809         if (rc)
1810                 GOTO(out_end, rc);
1811         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1812         if (rc)
1813                 GOTO(out_end, rc);
1814         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1815         if (rc)
1816                 GOTO(out_end, rc);
1817         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1818         if (rc)
1819                 GOTO(out_end, rc);
1820 out_end:
1821         record_end_log(env, &llh);
1822 out_free:
1823         OBD_FREE_PTR(lmvdesc);
1824         RETURN(rc);
1825 }
1826
1827 /* lov is the first thing in the mdt and client logs */
1828 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1829                              struct fs_db *fsdb, struct mgs_target_info *mti,
1830                              char *logname, char *lovname)
1831 {
1832         struct llog_handle *llh = NULL;
1833         struct lov_desc *lovdesc;
1834         char *uuid;
1835         int rc = 0;
1836         ENTRY;
1837
1838         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1839
1840         /*
1841         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1842         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1843               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1844         */
1845
1846         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1847         OBD_ALLOC_PTR(lovdesc);
1848         if (lovdesc == NULL)
1849                 RETURN(-ENOMEM);
1850         lovdesc->ld_magic = LOV_DESC_MAGIC;
1851         lovdesc->ld_tgt_count = 0;
1852         /* Defaults.  Can be changed later by lcfg config_param */
1853         lovdesc->ld_default_stripe_count = 1;
1854         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1855         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1856         lovdesc->ld_default_stripe_offset = -1;
1857         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1858         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1859         /* can these be the same? */
1860         uuid = (char *)lovdesc->ld_uuid.uuid;
1861
1862         /* This should always be the first entry in a log.
1863         rc = mgs_clear_log(obd, logname); */
1864         rc = record_start_log(env, mgs, &llh, logname);
1865         if (rc)
1866                 GOTO(out_free, rc);
1867         /* FIXME these should be a single journal transaction */
1868         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1869         if (rc)
1870                 GOTO(out_end, rc);
1871         rc = record_attach(env, llh, lovname, "lov", uuid);
1872         if (rc)
1873                 GOTO(out_end, rc);
1874         rc = record_lov_setup(env, llh, lovname, lovdesc);
1875         if (rc)
1876                 GOTO(out_end, rc);
1877         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1878         if (rc)
1879                 GOTO(out_end, rc);
1880         EXIT;
1881 out_end:
1882         record_end_log(env, &llh);
1883 out_free:
1884         OBD_FREE_PTR(lovdesc);
1885         return rc;
1886 }
1887
1888 /* add failnids to open log */
1889 static int mgs_write_log_failnids(const struct lu_env *env,
1890                                   struct mgs_target_info *mti,
1891                                   struct llog_handle *llh,
1892                                   char *cliname)
1893 {
1894         char *failnodeuuid = NULL;
1895         char *ptr = mti->mti_params;
1896         lnet_nid_t nid;
1897         int rc = 0;
1898
1899         /*
1900         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1901         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1902         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1903         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1904         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1905         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1906         */
1907
1908         /*
1909          * Pull failnid info out of params string, which may contain something
1910          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1911          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1912          * etc.  However, convert_hostnames() should have caught those.
1913          */
1914         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1915                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1916                         char nidstr[LNET_NIDSTR_SIZE];
1917
1918                         if (failnodeuuid == NULL) {
1919                                 /* We don't know the failover node name,
1920                                  * so just use the first nid as the uuid */
1921                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1922                                 rc = name_create(&failnodeuuid, nidstr, "");
1923                                 if (rc != 0)
1924                                         return rc;
1925                         }
1926                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1927                                 "client %s\n",
1928                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1929                                 failnodeuuid, cliname);
1930                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1931                         /*
1932                          * If *ptr is ':', we have added all NIDs for
1933                          * failnodeuuid.
1934                          */
1935                         if (*ptr == ':') {
1936                                 rc = record_add_conn(env, llh, cliname,
1937                                                      failnodeuuid);
1938                                 name_destroy(&failnodeuuid);
1939                                 failnodeuuid = NULL;
1940                         }
1941                 }
1942                 if (failnodeuuid) {
1943                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1944                         name_destroy(&failnodeuuid);
1945                         failnodeuuid = NULL;
1946                 }
1947         }
1948
1949         return rc;
1950 }
1951
1952 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1953                                     struct mgs_device *mgs,
1954                                     struct fs_db *fsdb,
1955                                     struct mgs_target_info *mti,
1956                                     char *logname, char *lmvname)
1957 {
1958         struct llog_handle *llh = NULL;
1959         char *mdcname = NULL;
1960         char *nodeuuid = NULL;
1961         char *mdcuuid = NULL;
1962         char *lmvuuid = NULL;
1963         char index[6];
1964         char nidstr[LNET_NIDSTR_SIZE];
1965         int i, rc;
1966         ENTRY;
1967
1968         if (mgs_log_is_empty(env, mgs, logname)) {
1969                 CERROR("log is empty! Logical error\n");
1970                 RETURN(-EINVAL);
1971         }
1972
1973         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1974                mti->mti_svname, logname, lmvname);
1975
1976         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1977         rc = name_create(&nodeuuid, nidstr, "");
1978         if (rc)
1979                 RETURN(rc);
1980         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1981         if (rc)
1982                 GOTO(out_free, rc);
1983         rc = name_create(&mdcuuid, mdcname, "_UUID");
1984         if (rc)
1985                 GOTO(out_free, rc);
1986         rc = name_create(&lmvuuid, lmvname, "_UUID");
1987         if (rc)
1988                 GOTO(out_free, rc);
1989
1990         rc = record_start_log(env, mgs, &llh, logname);
1991         if (rc)
1992                 GOTO(out_free, rc);
1993         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1994                            "add mdc");
1995         if (rc)
1996                 GOTO(out_end, rc);
1997         for (i = 0; i < mti->mti_nid_count; i++) {
1998                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1999                         libcfs_nid2str_r(mti->mti_nids[i],
2000                                          nidstr, sizeof(nidstr)));
2001
2002                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2003                 if (rc)
2004                         GOTO(out_end, rc);
2005         }
2006
2007         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2008         if (rc)
2009                 GOTO(out_end, rc);
2010         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2011                           NULL, NULL);
2012         if (rc)
2013                 GOTO(out_end, rc);
2014         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2015         if (rc)
2016                 GOTO(out_end, rc);
2017         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2018         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2019                             index, "1");
2020         if (rc)
2021                 GOTO(out_end, rc);
2022         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2023                            "add mdc");
2024         if (rc)
2025                 GOTO(out_end, rc);
2026 out_end:
2027         record_end_log(env, &llh);
2028 out_free:
2029         name_destroy(&lmvuuid);
2030         name_destroy(&mdcuuid);
2031         name_destroy(&mdcname);
2032         name_destroy(&nodeuuid);
2033         RETURN(rc);
2034 }
2035
2036 static inline int name_create_lov(char **lovname, char *mdtname,
2037                                   struct fs_db *fsdb, int index)
2038 {
2039         /* COMPAT_180 */
2040         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2041                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2042         else
2043                 return name_create(lovname, mdtname, "-mdtlov");
2044 }
2045
2046 static int name_create_mdt_and_lov(char **logname, char **lovname,
2047                                    struct fs_db *fsdb, int i)
2048 {
2049         int rc;
2050
2051         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2052         if (rc)
2053                 return rc;
2054         /* COMPAT_180 */
2055         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2056                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2057         else
2058                 rc = name_create(lovname, *logname, "-mdtlov");
2059         if (rc) {
2060                 name_destroy(logname);
2061                 *logname = NULL;
2062         }
2063         return rc;
2064 }
2065
2066 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2067                                       struct fs_db *fsdb, int i)
2068 {
2069         char suffix[16];
2070
2071         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2072                 sprintf(suffix, "-osc");
2073         else
2074                 sprintf(suffix, "-osc-MDT%04x", i);
2075         return name_create(oscname, ostname, suffix);
2076 }
2077
2078 /* add new mdc to already existent MDS */
2079 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2080                                     struct mgs_device *mgs,
2081                                     struct fs_db *fsdb,
2082                                     struct mgs_target_info *mti,
2083                                     int mdt_index, char *logname)
2084 {
2085         struct llog_handle      *llh = NULL;
2086         char    *nodeuuid = NULL;
2087         char    *ospname = NULL;
2088         char    *lovuuid = NULL;
2089         char    *mdtuuid = NULL;
2090         char    *svname = NULL;
2091         char    *mdtname = NULL;
2092         char    *lovname = NULL;
2093         char    index_str[16];
2094         char    nidstr[LNET_NIDSTR_SIZE];
2095         int     i, rc;
2096
2097         ENTRY;
2098         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2099                 CERROR("log is empty! Logical error\n");
2100                 RETURN (-EINVAL);
2101         }
2102
2103         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2104                logname);
2105
2106         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2107         if (rc)
2108                 RETURN(rc);
2109
2110         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2111         rc = name_create(&nodeuuid, nidstr, "");
2112         if (rc)
2113                 GOTO(out_destory, rc);
2114
2115         rc = name_create(&svname, mdtname, "-osp");
2116         if (rc)
2117                 GOTO(out_destory, rc);
2118
2119         sprintf(index_str, "-MDT%04x", mdt_index);
2120         rc = name_create(&ospname, svname, index_str);
2121         if (rc)
2122                 GOTO(out_destory, rc);
2123
2124         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2125         if (rc)
2126                 GOTO(out_destory, rc);
2127
2128         rc = name_create(&lovuuid, lovname, "_UUID");
2129         if (rc)
2130                 GOTO(out_destory, rc);
2131
2132         rc = name_create(&mdtuuid, mdtname, "_UUID");
2133         if (rc)
2134                 GOTO(out_destory, rc);
2135
2136         rc = record_start_log(env, mgs, &llh, logname);
2137         if (rc)
2138                 GOTO(out_destory, rc);
2139
2140         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2141                            "add osp");
2142         if (rc)
2143                 GOTO(out_destory, rc);
2144
2145         for (i = 0; i < mti->mti_nid_count; i++) {
2146                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2147                         libcfs_nid2str_r(mti->mti_nids[i],
2148                                          nidstr, sizeof(nidstr)));
2149                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2150                 if (rc)
2151                         GOTO(out_end, rc);
2152         }
2153
2154         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2155         if (rc)
2156                 GOTO(out_end, rc);
2157
2158         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2159                           NULL, NULL);
2160         if (rc)
2161                 GOTO(out_end, rc);
2162
2163         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2164         if (rc)
2165                 GOTO(out_end, rc);
2166
2167         /* Add mdc(osp) to lod */
2168         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2169         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2170                          index_str, "1", NULL);
2171         if (rc)
2172                 GOTO(out_end, rc);
2173
2174         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2175         if (rc)
2176                 GOTO(out_end, rc);
2177
2178 out_end:
2179         record_end_log(env, &llh);
2180
2181 out_destory:
2182         name_destroy(&mdtuuid);
2183         name_destroy(&lovuuid);
2184         name_destroy(&lovname);
2185         name_destroy(&ospname);
2186         name_destroy(&svname);
2187         name_destroy(&nodeuuid);
2188         name_destroy(&mdtname);
2189         RETURN(rc);
2190 }
2191
2192 static int mgs_write_log_mdt0(const struct lu_env *env,
2193                               struct mgs_device *mgs,
2194                               struct fs_db *fsdb,
2195                               struct mgs_target_info *mti)
2196 {
2197         char *log = mti->mti_svname;
2198         struct llog_handle *llh = NULL;
2199         char *uuid, *lovname;
2200         char mdt_index[6];
2201         char *ptr = mti->mti_params;
2202         int rc = 0, failout = 0;
2203         ENTRY;
2204
2205         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2206         if (uuid == NULL)
2207                 RETURN(-ENOMEM);
2208
2209         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2210                 failout = (strncmp(ptr, "failout", 7) == 0);
2211
2212         rc = name_create(&lovname, log, "-mdtlov");
2213         if (rc)
2214                 GOTO(out_free, rc);
2215         if (mgs_log_is_empty(env, mgs, log)) {
2216                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2217                 if (rc)
2218                         GOTO(out_lod, rc);
2219         }
2220
2221         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2222
2223         rc = record_start_log(env, mgs, &llh, log);
2224         if (rc)
2225                 GOTO(out_lod, rc);
2226
2227         /* add MDT itself */
2228
2229         /* FIXME this whole fn should be a single journal transaction */
2230         sprintf(uuid, "%s_UUID", log);
2231         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2232         if (rc)
2233                 GOTO(out_lod, rc);
2234         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2235         if (rc)
2236                 GOTO(out_end, rc);
2237         rc = record_mount_opt(env, llh, log, lovname, NULL);
2238         if (rc)
2239                 GOTO(out_end, rc);
2240         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2241                         failout ? "n" : "f");
2242         if (rc)
2243                 GOTO(out_end, rc);
2244         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2245         if (rc)
2246                 GOTO(out_end, rc);
2247 out_end:
2248         record_end_log(env, &llh);
2249 out_lod:
2250         name_destroy(&lovname);
2251 out_free:
2252         OBD_FREE(uuid, sizeof(struct obd_uuid));
2253         RETURN(rc);
2254 }
2255
2256 /* envelope method for all layers log */
2257 static int mgs_write_log_mdt(const struct lu_env *env,
2258                              struct mgs_device *mgs,
2259                              struct fs_db *fsdb,
2260                              struct mgs_target_info *mti)
2261 {
2262         struct mgs_thread_info *mgi = mgs_env_info(env);
2263         struct llog_handle *llh = NULL;
2264         char *cliname;
2265         int rc, i = 0;
2266         ENTRY;
2267
2268         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2269
2270         if (mti->mti_uuid[0] == '\0') {
2271                 /* Make up our own uuid */
2272                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2273                          "%s_UUID", mti->mti_svname);
2274         }
2275
2276         /* add mdt */
2277         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2278         if (rc)
2279                 RETURN(rc);
2280         /* Append the mdt info to the client log */
2281         rc = name_create(&cliname, mti->mti_fsname, "-client");
2282         if (rc)
2283                 RETURN(rc);
2284
2285         if (mgs_log_is_empty(env, mgs, cliname)) {
2286                 /* Start client log */
2287                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2288                                        fsdb->fsdb_clilov);
2289                 if (rc)
2290                         GOTO(out_free, rc);
2291                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2292                                        fsdb->fsdb_clilmv);
2293                 if (rc)
2294                         GOTO(out_free, rc);
2295         }
2296
2297         /*
2298         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2299         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2300         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2301         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2302         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2303         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2304         */
2305
2306         /* copy client info about lov/lmv */
2307         mgi->mgi_comp.comp_mti = mti;
2308         mgi->mgi_comp.comp_fsdb = fsdb;
2309
2310         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2311                                                 &mgi->mgi_comp);
2312         if (rc)
2313                 GOTO(out_free, rc);
2314         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2315                                       fsdb->fsdb_clilmv);
2316         if (rc)
2317                 GOTO(out_free, rc);
2318
2319         /* add mountopts */
2320         rc = record_start_log(env, mgs, &llh, cliname);
2321         if (rc)
2322                 GOTO(out_free, rc);
2323
2324         rc = record_marker(env, llh, fsdb, CM_START, cliname,
2325                            "mount opts");
2326         if (rc)
2327                 GOTO(out_end, rc);
2328         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2329                               fsdb->fsdb_clilmv);
2330         if (rc)
2331                 GOTO(out_end, rc);
2332         rc = record_marker(env, llh, fsdb, CM_END, cliname,
2333                            "mount opts");
2334
2335         if (rc)
2336                 GOTO(out_end, rc);
2337
2338         /* for_all_existing_mdt except current one */
2339         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2340                 if (i !=  mti->mti_stripe_index &&
2341                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2342                         char *logname;
2343
2344                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2345                         if (rc)
2346                                 GOTO(out_end, rc);
2347
2348                         /* NB: If the log for the MDT is empty, it means
2349                          * the MDT is only added to the index
2350                          * map, and not being process yet, i.e. this
2351                          * is an unregistered MDT, see mgs_write_log_target().
2352                          * so we should skip it. Otherwise
2353                          *
2354                          * 1. MGS get register request for MDT1 and MDT2.
2355                          *
2356                          * 2. Then both MDT1 and MDT2 are added into
2357                          * fsdb_mdt_index_map. (see mgs_set_index()).
2358                          *
2359                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2360                          * generate the config log, here, it will regard MDT2
2361                          * as an existent MDT, and generate "add osp" for
2362                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2363                          * MDT0002 config log is still empty, so it will
2364                          * add "add osp" even before "lov setup", which
2365                          * will definitly cause trouble.
2366                          *
2367                          * 4. MDT1 registeration finished, fsdb_mutex is
2368                          * released, then MDT2 get in, then in above
2369                          * mgs_steal_llog_for_mdt_from_client(), it will
2370                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2371                          * which will cause another trouble.*/
2372                         if (!mgs_log_is_empty(env, mgs, logname))
2373                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2374                                                               mti, i, logname);
2375
2376                         name_destroy(&logname);
2377                         if (rc)
2378                                 GOTO(out_end, rc);
2379                 }
2380         }
2381 out_end:
2382         record_end_log(env, &llh);
2383 out_free:
2384         name_destroy(&cliname);
2385         RETURN(rc);
2386 }
2387
2388 /* Add the ost info to the client/mdt lov */
2389 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2390                                     struct mgs_device *mgs, struct fs_db *fsdb,
2391                                     struct mgs_target_info *mti,
2392                                     char *logname, char *suffix, char *lovname,
2393                                     enum lustre_sec_part sec_part, int flags)
2394 {
2395         struct llog_handle *llh = NULL;
2396         char *nodeuuid = NULL;
2397         char *oscname = NULL;
2398         char *oscuuid = NULL;
2399         char *lovuuid = NULL;
2400         char *svname = NULL;
2401         char index[6];
2402         char nidstr[LNET_NIDSTR_SIZE];
2403         int i, rc;
2404         ENTRY;
2405
2406         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2407                 mti->mti_svname, logname);
2408
2409         if (mgs_log_is_empty(env, mgs, logname)) {
2410                 CERROR("log is empty! Logical error\n");
2411                 RETURN(-EINVAL);
2412         }
2413
2414         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2415         rc = name_create(&nodeuuid, nidstr, "");
2416         if (rc)
2417                 RETURN(rc);
2418         rc = name_create(&svname, mti->mti_svname, "-osc");
2419         if (rc)
2420                 GOTO(out_free, rc);
2421
2422         /* for the system upgraded from old 1.8, keep using the old osc naming
2423          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2424         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2425                 rc = name_create(&oscname, svname, "");
2426         else
2427                 rc = name_create(&oscname, svname, suffix);
2428         if (rc)
2429                 GOTO(out_free, rc);
2430
2431         rc = name_create(&oscuuid, oscname, "_UUID");
2432         if (rc)
2433                 GOTO(out_free, rc);
2434         rc = name_create(&lovuuid, lovname, "_UUID");
2435         if (rc)
2436                 GOTO(out_free, rc);
2437
2438
2439         /*
2440         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2441         multihomed (#4)
2442         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2443         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2444         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2445         failover (#6,7)
2446         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2447         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2448         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2449         */
2450
2451         rc = record_start_log(env, mgs, &llh, logname);
2452         if (rc)
2453                 GOTO(out_free, rc);
2454
2455         /* FIXME these should be a single journal transaction */
2456         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2457                            "add osc");
2458         if (rc)
2459                 GOTO(out_end, rc);
2460
2461         /* NB: don't change record order, because upon MDT steal OSC config
2462          * from client, it treats all nids before LCFG_SETUP as target nids
2463          * (multiple interfaces), while nids after as failover node nids.
2464          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2465          */
2466         for (i = 0; i < mti->mti_nid_count; i++) {
2467                 CDEBUG(D_MGS, "add nid %s\n",
2468                         libcfs_nid2str_r(mti->mti_nids[i],
2469                                          nidstr, sizeof(nidstr)));
2470                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2471                 if (rc)
2472                         GOTO(out_end, rc);
2473         }
2474         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2475         if (rc)
2476                 GOTO(out_end, rc);
2477         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2478                           NULL, NULL);
2479         if (rc)
2480                 GOTO(out_end, rc);
2481         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2482         if (rc)
2483                 GOTO(out_end, rc);
2484
2485         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2486
2487         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2488         if (rc)
2489                 GOTO(out_end, rc);
2490         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2491                            "add osc");
2492         if (rc)
2493                 GOTO(out_end, rc);
2494 out_end:
2495         record_end_log(env, &llh);
2496 out_free:
2497         name_destroy(&lovuuid);
2498         name_destroy(&oscuuid);
2499         name_destroy(&oscname);
2500         name_destroy(&svname);
2501         name_destroy(&nodeuuid);
2502         RETURN(rc);
2503 }
2504
2505 static int mgs_write_log_ost(const struct lu_env *env,
2506                              struct mgs_device *mgs, struct fs_db *fsdb,
2507                              struct mgs_target_info *mti)
2508 {
2509         struct llog_handle *llh = NULL;
2510         char *logname, *lovname;
2511         char *ptr = mti->mti_params;
2512         int rc, flags = 0, failout = 0, i;
2513         ENTRY;
2514
2515         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2516
2517         /* The ost startup log */
2518
2519         /* If the ost log already exists, that means that someone reformatted
2520            the ost and it called target_add again. */
2521         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2522                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2523                                    "exists, yet the server claims it never "
2524                                    "registered. It may have been reformatted, "
2525                                    "or the index changed. writeconf the MDT to "
2526                                    "regenerate all logs.\n", mti->mti_svname);
2527                 RETURN(-EALREADY);
2528         }
2529
2530         /*
2531         attach obdfilter ost1 ost1_UUID
2532         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2533         */
2534         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2535                 failout = (strncmp(ptr, "failout", 7) == 0);
2536         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2537         if (rc)
2538                 RETURN(rc);
2539         /* FIXME these should be a single journal transaction */
2540         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2541         if (rc)
2542                 GOTO(out_end, rc);
2543         if (*mti->mti_uuid == '\0')
2544                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2545                          "%s_UUID", mti->mti_svname);
2546         rc = record_attach(env, llh, mti->mti_svname,
2547                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2548         if (rc)
2549                 GOTO(out_end, rc);
2550         rc = record_setup(env, llh, mti->mti_svname,
2551                           "dev"/*ignored*/, "type"/*ignored*/,
2552                           failout ? "n" : "f", NULL/*options*/);
2553         if (rc)
2554                 GOTO(out_end, rc);
2555         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2556         if (rc)
2557                 GOTO(out_end, rc);
2558 out_end:
2559         record_end_log(env, &llh);
2560         if (rc)
2561                 RETURN(rc);
2562         /* We also have to update the other logs where this osc is part of
2563            the lov */
2564
2565         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2566                 /* If we're upgrading, the old mdt log already has our
2567                    entry. Let's do a fake one for fun. */
2568                 /* Note that we can't add any new failnids, since we don't
2569                    know the old osc names. */
2570                 flags = CM_SKIP | CM_UPGRADE146;
2571
2572         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2573                 /* If the update flag isn't set, don't update client/mdt
2574                    logs. */
2575                 flags |= CM_SKIP;
2576                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2577                               "the MDT first to regenerate it.\n",
2578                               mti->mti_svname);
2579         }
2580
2581         /* Add ost to all MDT lov defs */
2582         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2583                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2584                         char mdt_index[9];
2585
2586                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2587                                                      i);
2588                         if (rc)
2589                                 RETURN(rc);
2590                         sprintf(mdt_index, "-MDT%04x", i);
2591                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2592                                                       logname, mdt_index,
2593                                                       lovname, LUSTRE_SP_MDT,
2594                                                       flags);
2595                         name_destroy(&logname);
2596                         name_destroy(&lovname);
2597                         if (rc)
2598                                 RETURN(rc);
2599                 }
2600         }
2601
2602         /* Append ost info to the client log */
2603         rc = name_create(&logname, mti->mti_fsname, "-client");
2604         if (rc)
2605                 RETURN(rc);
2606         if (mgs_log_is_empty(env, mgs, logname)) {
2607                 /* Start client log */
2608                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2609                                        fsdb->fsdb_clilov);
2610                 if (rc)
2611                         GOTO(out_free, rc);
2612                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2613                                        fsdb->fsdb_clilmv);
2614                 if (rc)
2615                         GOTO(out_free, rc);
2616         }
2617         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2618                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2619 out_free:
2620         name_destroy(&logname);
2621         RETURN(rc);
2622 }
2623
2624 static __inline__ int mgs_param_empty(char *ptr)
2625 {
2626         char *tmp;
2627
2628         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2629                 return 1;
2630         return 0;
2631 }
2632
2633 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2634                                           struct mgs_device *mgs,
2635                                           struct fs_db *fsdb,
2636                                           struct mgs_target_info *mti,
2637                                           char *logname, char *cliname)
2638 {
2639         int rc;
2640         struct llog_handle *llh = NULL;
2641
2642         if (mgs_param_empty(mti->mti_params)) {
2643                 /* Remove _all_ failnids */
2644                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2645                                 mti->mti_svname, "add failnid", CM_SKIP);
2646                 return rc < 0 ? rc : 0;
2647         }
2648
2649         /* Otherwise failover nids are additive */
2650         rc = record_start_log(env, mgs, &llh, logname);
2651         if (rc)
2652                 return rc;
2653                 /* FIXME this should be a single journal transaction */
2654         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2655                            "add failnid");
2656         if (rc)
2657                 goto out_end;
2658         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2659         if (rc)
2660                 goto out_end;
2661         rc = record_marker(env, llh, fsdb, CM_END,
2662                            mti->mti_svname, "add failnid");
2663 out_end:
2664         record_end_log(env, &llh);
2665         return rc;
2666 }
2667
2668
2669 /* Add additional failnids to an existing log.
2670    The mdc/osc must have been added to logs first */
2671 /* tcp nids must be in dotted-quad ascii -
2672    we can't resolve hostnames from the kernel. */
2673 static int mgs_write_log_add_failnid(const struct lu_env *env,
2674                                      struct mgs_device *mgs,
2675                                      struct fs_db *fsdb,
2676                                      struct mgs_target_info *mti)
2677 {
2678         char *logname, *cliname;
2679         int rc;
2680         ENTRY;
2681
2682         /* FIXME we currently can't erase the failnids
2683          * given when a target first registers, since they aren't part of
2684          * an "add uuid" stanza */
2685
2686         /* Verify that we know about this target */
2687         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2688                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2689                                    "yet. It must be started before failnids "
2690                                    "can be added.\n", mti->mti_svname);
2691                 RETURN(-ENOENT);
2692         }
2693
2694         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2695         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2696                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2697         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2698                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2699         } else {
2700                 RETURN(-EINVAL);
2701         }
2702         if (rc)
2703                 RETURN(rc);
2704         /* Add failover nids to the client log */
2705         rc = name_create(&logname, mti->mti_fsname, "-client");
2706         if (rc) {
2707                 name_destroy(&cliname);
2708                 RETURN(rc);
2709         }
2710         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2711         name_destroy(&logname);
2712         name_destroy(&cliname);
2713         if (rc)
2714                 RETURN(rc);
2715
2716         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2717                 /* Add OST failover nids to the MDT logs as well */
2718                 int i;
2719
2720                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2721                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2722                                 continue;
2723                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2724                         if (rc)
2725                                 RETURN(rc);
2726                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2727                                                  fsdb, i);
2728                         if (rc) {
2729                                 name_destroy(&logname);
2730                                 RETURN(rc);
2731                         }
2732                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2733                                                             mti, logname,
2734                                                             cliname);
2735                         name_destroy(&cliname);
2736                         name_destroy(&logname);
2737                         if (rc)
2738                                 RETURN(rc);
2739                 }
2740         }
2741
2742         RETURN(rc);
2743 }
2744
2745 static int mgs_wlp_lcfg(const struct lu_env *env,
2746                         struct mgs_device *mgs, struct fs_db *fsdb,
2747                         struct mgs_target_info *mti,
2748                         char *logname, struct lustre_cfg_bufs *bufs,
2749                         char *tgtname, char *ptr)
2750 {
2751         char comment[MTI_NAME_MAXLEN];
2752         char *tmp;
2753         struct llog_cfg_rec *lcr;
2754         int rc, del;
2755
2756         /* Erase any old settings of this same parameter */
2757         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2758         comment[MTI_NAME_MAXLEN - 1] = 0;
2759         /* But don't try to match the value. */
2760         tmp = strchr(comment, '=');
2761         if (tmp != NULL)
2762                 *tmp = 0;
2763         /* FIXME we should skip settings that are the same as old values */
2764         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2765         if (rc < 0)
2766                 return rc;
2767         del = mgs_param_empty(ptr);
2768
2769         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2770                       "Setting" : "Modifying", tgtname, comment, logname);
2771         if (del) {
2772                 /* mgs_modify() will return 1 if nothing had to be done */
2773                 if (rc == 1)
2774                         rc = 0;
2775                 return rc;
2776         }
2777
2778         lustre_cfg_bufs_reset(bufs, tgtname);
2779         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2780         if (mti->mti_flags & LDD_F_PARAM2)
2781                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2782
2783         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2784                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2785         if (lcr == NULL)
2786                 return -ENOMEM;
2787
2788         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2789                                   comment);
2790         lustre_cfg_rec_free(lcr);
2791         return rc;
2792 }
2793
2794 static int mgs_write_log_param2(const struct lu_env *env,
2795                                 struct mgs_device *mgs,
2796                                 struct fs_db *fsdb,
2797                                 struct mgs_target_info *mti, char *ptr)
2798 {
2799         struct lustre_cfg_bufs  bufs;
2800         int                     rc = 0;
2801         ENTRY;
2802
2803         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2804         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2805                           mti->mti_svname, ptr);
2806
2807         RETURN(rc);
2808 }
2809
2810 /* write global variable settings into log */
2811 static int mgs_write_log_sys(const struct lu_env *env,
2812                              struct mgs_device *mgs, struct fs_db *fsdb,
2813                              struct mgs_target_info *mti, char *sys, char *ptr)
2814 {
2815         struct mgs_thread_info  *mgi = mgs_env_info(env);
2816         struct lustre_cfg       *lcfg;
2817         struct llog_cfg_rec     *lcr;
2818         char *tmp, sep;
2819         int rc, cmd, convert = 1;
2820
2821         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2822                 cmd = LCFG_SET_TIMEOUT;
2823         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2824                 cmd = LCFG_SET_LDLM_TIMEOUT;
2825         /* Check for known params here so we can return error to lctl */
2826         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2827                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2828                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2829                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2830                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2831                 cmd = LCFG_PARAM;
2832         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2833                 convert = 0; /* Don't convert string value to integer */
2834                 cmd = LCFG_PARAM;
2835         } else {
2836                 return -EINVAL;
2837         }
2838
2839         if (mgs_param_empty(ptr))
2840                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2841         else
2842                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2843
2844         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2845         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2846         if (!convert && *tmp != '\0')
2847                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2848         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2849         if (lcr == NULL)
2850                 return -ENOMEM;
2851
2852         lcfg = &lcr->lcr_cfg;
2853         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2854         /* truncate the comment to the parameter name */
2855         ptr = tmp - 1;
2856         sep = *ptr;
2857         *ptr = '\0';
2858         /* modify all servers and clients */
2859         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2860                                       *tmp == '\0' ? NULL : lcr,
2861                                       mti->mti_fsname, sys, 0);
2862         if (rc == 0 && *tmp != '\0') {
2863                 switch (cmd) {
2864                 case LCFG_SET_TIMEOUT:
2865                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2866                                 class_process_config(lcfg);
2867                         break;
2868                 case LCFG_SET_LDLM_TIMEOUT:
2869                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2870                                 class_process_config(lcfg);
2871                         break;
2872                 default:
2873                         break;
2874                 }
2875         }
2876         *ptr = sep;
2877         lustre_cfg_rec_free(lcr);
2878         return rc;
2879 }
2880
2881 /* write quota settings into log */
2882 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2883                                struct fs_db *fsdb, struct mgs_target_info *mti,
2884                                char *quota, char *ptr)
2885 {
2886         struct mgs_thread_info  *mgi = mgs_env_info(env);
2887         struct llog_cfg_rec     *lcr;
2888         char                    *tmp;
2889         char                     sep;
2890         int                      rc, cmd = LCFG_PARAM;
2891
2892         /* support only 'meta' and 'data' pools so far */
2893         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2894             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2895                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2896                        "& quota.ost are)\n", ptr);
2897                 return -EINVAL;
2898         }
2899
2900         if (*tmp == '\0') {
2901                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2902         } else {
2903                 CDEBUG(D_MGS, "global '%s'\n", quota);
2904
2905                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2906                     strcmp(tmp, "none") != 0) {
2907                         CERROR("enable option(%s) isn't supported\n", tmp);
2908                         return -EINVAL;
2909                 }
2910         }
2911
2912         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2913         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2914         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2915         if (lcr == NULL)
2916                 return -ENOMEM;
2917
2918         /* truncate the comment to the parameter name */
2919         ptr = tmp - 1;
2920         sep = *ptr;
2921         *ptr = '\0';
2922
2923         /* XXX we duplicated quota enable information in all server
2924          *     config logs, it should be moved to a separate config
2925          *     log once we cleanup the config log for global param. */
2926         /* modify all servers */
2927         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2928                                       *tmp == '\0' ? NULL : lcr,
2929                                       mti->mti_fsname, quota, 1);
2930         *ptr = sep;
2931         lustre_cfg_rec_free(lcr);
2932         return rc < 0 ? rc : 0;
2933 }
2934
2935 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2936                                    struct mgs_device *mgs,
2937                                    struct fs_db *fsdb,
2938                                    struct mgs_target_info *mti,
2939                                    char *param)
2940 {
2941         struct mgs_thread_info  *mgi = mgs_env_info(env);
2942         struct llog_cfg_rec     *lcr;
2943         struct llog_handle      *llh = NULL;
2944         char                    *logname;
2945         char                    *comment, *ptr;
2946         int                      rc, len;
2947
2948         ENTRY;
2949
2950         /* get comment */
2951         ptr = strchr(param, '=');
2952         LASSERT(ptr != NULL);
2953         len = ptr - param;
2954
2955         OBD_ALLOC(comment, len + 1);
2956         if (comment == NULL)
2957                 RETURN(-ENOMEM);
2958         strncpy(comment, param, len);
2959         comment[len] = '\0';
2960
2961         /* prepare lcfg */
2962         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2963         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2964         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2965         if (lcr == NULL)
2966                 GOTO(out_comment, rc = -ENOMEM);
2967
2968         /* construct log name */
2969         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2970         if (rc < 0)
2971                 GOTO(out_lcfg, rc);
2972
2973         if (mgs_log_is_empty(env, mgs, logname)) {
2974                 rc = record_start_log(env, mgs, &llh, logname);
2975                 if (rc < 0)
2976                         GOTO(out, rc);
2977                 record_end_log(env, &llh);
2978         }
2979
2980         /* obsolete old one */
2981         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2982                         comment, CM_SKIP);
2983         if (rc < 0)
2984                 GOTO(out, rc);
2985         /* write the new one */
2986         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2987                                   mti->mti_svname, comment);
2988         if (rc)
2989                 CERROR("%s: error writing log %s: rc = %d\n",
2990                        mgs->mgs_obd->obd_name, logname, rc);
2991 out:
2992         name_destroy(&logname);
2993 out_lcfg:
2994         lustre_cfg_rec_free(lcr);
2995 out_comment:
2996         OBD_FREE(comment, len + 1);
2997         RETURN(rc);
2998 }
2999
3000 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3001                                         char *param)
3002 {
3003         char    *ptr;
3004
3005         /* disable the adjustable udesc parameter for now, i.e. use default
3006          * setting that client always ship udesc to MDT if possible. to enable
3007          * it simply remove the following line */
3008         goto error_out;
3009
3010         ptr = strchr(param, '=');
3011         if (ptr == NULL)
3012                 goto error_out;
3013         *ptr++ = '\0';
3014
3015         if (strcmp(param, PARAM_SRPC_UDESC))
3016                 goto error_out;
3017
3018         if (strcmp(ptr, "yes") == 0) {
3019                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3020                 CWARN("Enable user descriptor shipping from client to MDT\n");
3021         } else if (strcmp(ptr, "no") == 0) {
3022                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3023                 CWARN("Disable user descriptor shipping from client to MDT\n");
3024         } else {
3025                 *(ptr - 1) = '=';
3026                 goto error_out;
3027         }
3028         return 0;
3029
3030 error_out:
3031         CERROR("Invalid param: %s\n", param);
3032         return -EINVAL;
3033 }
3034
3035 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3036                                   const char *svname,
3037                                   char *param)
3038 {
3039         struct sptlrpc_rule      rule;
3040         struct sptlrpc_rule_set *rset;
3041         int                      rc;
3042         ENTRY;
3043
3044         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3045                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3046                 RETURN(-EINVAL);
3047         }
3048
3049         if (strncmp(param, PARAM_SRPC_UDESC,
3050                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3051                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3052         }
3053
3054         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3055                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3056                 RETURN(-EINVAL);
3057         }
3058
3059         param += sizeof(PARAM_SRPC_FLVR) - 1;
3060
3061         rc = sptlrpc_parse_rule(param, &rule);
3062         if (rc)
3063                 RETURN(rc);
3064
3065         /* mgs rules implies must be mgc->mgs */
3066         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3067                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3068                      rule.sr_from != LUSTRE_SP_ANY) ||
3069                     (rule.sr_to != LUSTRE_SP_MGS &&
3070                      rule.sr_to != LUSTRE_SP_ANY))
3071                         RETURN(-EINVAL);
3072         }
3073
3074         /* preapre room for this coming rule. svcname format should be:
3075          * - fsname: general rule
3076          * - fsname-tgtname: target-specific rule
3077          */
3078         if (strchr(svname, '-')) {
3079                 struct mgs_tgt_srpc_conf *tgtconf;
3080                 int                       found = 0;
3081
3082                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3083                      tgtconf = tgtconf->mtsc_next) {
3084                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3085                                 found = 1;
3086                                 break;
3087                         }
3088                 }
3089
3090                 if (!found) {
3091                         int name_len;
3092
3093                         OBD_ALLOC_PTR(tgtconf);
3094                         if (tgtconf == NULL)
3095                                 RETURN(-ENOMEM);
3096
3097                         name_len = strlen(svname);
3098
3099                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3100                         if (tgtconf->mtsc_tgt == NULL) {
3101                                 OBD_FREE_PTR(tgtconf);
3102                                 RETURN(-ENOMEM);
3103                         }
3104                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3105
3106                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3107                         fsdb->fsdb_srpc_tgt = tgtconf;
3108                 }
3109
3110                 rset = &tgtconf->mtsc_rset;
3111         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3112                 /* put _mgs related srpc rule directly in mgs ruleset */
3113                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3114         } else {
3115                 rset = &fsdb->fsdb_srpc_gen;
3116         }
3117
3118         rc = sptlrpc_rule_set_merge(rset, &rule);
3119
3120         RETURN(rc);
3121 }
3122
3123 static int mgs_srpc_set_param(const struct lu_env *env,
3124                               struct mgs_device *mgs,
3125                               struct fs_db *fsdb,
3126                               struct mgs_target_info *mti,
3127                               char *param)
3128 {
3129         char                   *copy;
3130         int                     rc, copy_size;
3131         ENTRY;
3132
3133 #ifndef HAVE_GSS
3134         RETURN(-EINVAL);
3135 #endif
3136         /* keep a copy of original param, which could be destroied
3137          * during parsing */
3138         copy_size = strlen(param) + 1;
3139         OBD_ALLOC(copy, copy_size);
3140         if (copy == NULL)
3141                 return -ENOMEM;
3142         memcpy(copy, param, copy_size);
3143
3144         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3145         if (rc)
3146                 goto out_free;
3147
3148         /* previous steps guaranteed the syntax is correct */
3149         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3150         if (rc)
3151                 goto out_free;
3152
3153         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3154                 /*
3155                  * for mgs rules, make them effective immediately.
3156                  */
3157                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3158                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3159                                                  &fsdb->fsdb_srpc_gen);
3160         }
3161
3162 out_free:
3163         OBD_FREE(copy, copy_size);
3164         RETURN(rc);
3165 }
3166
3167 struct mgs_srpc_read_data {
3168         struct fs_db   *msrd_fsdb;
3169         int             msrd_skip;
3170 };
3171
3172 static int mgs_srpc_read_handler(const struct lu_env *env,
3173                                  struct llog_handle *llh,
3174                                  struct llog_rec_hdr *rec, void *data)
3175 {
3176         struct mgs_srpc_read_data *msrd = data;
3177         struct cfg_marker         *marker;
3178         struct lustre_cfg         *lcfg = REC_DATA(rec);
3179         char                      *svname, *param;
3180         int                        cfg_len, rc;
3181         ENTRY;
3182
3183         if (rec->lrh_type != OBD_CFG_REC) {
3184                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3185                 RETURN(-EINVAL);
3186         }
3187
3188         cfg_len = REC_DATA_LEN(rec);
3189
3190         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3191         if (rc) {
3192                 CERROR("Insane cfg\n");
3193                 RETURN(rc);
3194         }
3195
3196         if (lcfg->lcfg_command == LCFG_MARKER) {
3197                 marker = lustre_cfg_buf(lcfg, 1);
3198
3199                 if (marker->cm_flags & CM_START &&
3200                     marker->cm_flags & CM_SKIP)
3201                         msrd->msrd_skip = 1;
3202                 if (marker->cm_flags & CM_END)
3203                         msrd->msrd_skip = 0;
3204
3205                 RETURN(0);
3206         }
3207
3208         if (msrd->msrd_skip)
3209                 RETURN(0);
3210
3211         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3212                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3213                 RETURN(0);
3214         }
3215
3216         svname = lustre_cfg_string(lcfg, 0);
3217         if (svname == NULL) {
3218                 CERROR("svname is empty\n");
3219                 RETURN(0);
3220         }
3221
3222         param = lustre_cfg_string(lcfg, 1);
3223         if (param == NULL) {
3224                 CERROR("param is empty\n");
3225                 RETURN(0);
3226         }
3227
3228         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3229         if (rc)
3230                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3231
3232         RETURN(0);
3233 }
3234
3235 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3236                                 struct mgs_device *mgs,
3237                                 struct fs_db *fsdb)
3238 {
3239         struct llog_handle