Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / obdclass / obd_config.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/obd_config.c
37  *
38  * Config API
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42 #ifdef __KERNEL__
43 #include <obd_class.h>
44 #include <linux/string.h>
45 #else
46 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <obd.h>
49 #endif
50 #include <lustre_disk.h>
51 #include <lustre_log.h>
52 #include <lprocfs_status.h>
53 #include <libcfs/list.h>
54 #include <lustre_param.h>
55 #include <class_hash.h>
56
57 static lustre_hash_ops_t uuid_hash_ops;
58 static lustre_hash_ops_t nid_hash_ops;
59 static lustre_hash_ops_t nid_stat_hash_ops;
60
61 /*********** string parsing utils *********/
62
63 /* returns 0 if we find this key in the buffer, else 1 */
64 int class_find_param(char *buf, char *key, char **valp)
65 {
66         char *ptr;
67
68         if (!buf)
69                 return 1;
70
71         if ((ptr = strstr(buf, key)) == NULL)
72                 return 1;
73
74         if (valp)
75                 *valp = ptr + strlen(key);
76
77         return 0;
78 }
79
80 /* returns 0 if this is the first key in the buffer, else 1.
81    valp points to first char after key. */
82 int class_match_param(char *buf, char *key, char **valp)
83 {
84         if (!buf)
85                 return 1;
86
87         if (memcmp(buf, key, strlen(key)) != 0)
88                 return 1;
89
90         if (valp)
91                 *valp = buf + strlen(key);
92
93         return 0;
94 }
95
96 /* 0 is good nid,
97    1 not found
98    < 0 error
99    endh is set to next separator */
100 int class_parse_nid(char *buf, lnet_nid_t *nid, char **endh)
101 {
102         char tmp, *endp;
103
104         if (!buf)
105                 return 1;
106         while (*buf == ',' || *buf == ':')
107                 buf++;
108         if (*buf == ' ' || *buf == '/' || *buf == '\0')
109                 return 1;
110
111         /* nid separators or end of nids */
112         endp = strpbrk(buf, ",: /");
113         if (endp == NULL)
114                 endp = buf + strlen(buf);
115
116         tmp = *endp;
117         *endp = '\0';
118         *nid = libcfs_str2nid(buf);
119         if (*nid == LNET_NID_ANY) {
120                 LCONSOLE_ERROR_MSG(0x159, "Can't parse NID '%s'\n", buf);
121                 *endp = tmp;
122                 return -EINVAL;
123         }
124         *endp = tmp;
125
126         if (endh)
127                 *endh = endp;
128         CDEBUG(D_INFO, "Nid %s\n", libcfs_nid2str(*nid));
129         return 0;
130 }
131
132 EXPORT_SYMBOL(class_find_param);
133 EXPORT_SYMBOL(class_match_param);
134 EXPORT_SYMBOL(class_parse_nid);
135
136 /********************** class fns **********************/
137
138 /* Create a new device and set the type, name and uuid.  If
139  * successful, the new device can be accessed by either name or uuid.
140  */
141 int class_attach(struct lustre_cfg *lcfg)
142 {
143         struct obd_device *obd = NULL;
144         char *typename, *name, *uuid;
145         int rc, len;
146         ENTRY;
147
148         if (!LUSTRE_CFG_BUFLEN(lcfg, 1)) {
149                 CERROR("No type passed!\n");
150                 RETURN(-EINVAL);
151         }
152         typename = lustre_cfg_string(lcfg, 1);
153
154         if (!LUSTRE_CFG_BUFLEN(lcfg, 0)) {
155                 CERROR("No name passed!\n");
156                 RETURN(-EINVAL);
157         }
158         name = lustre_cfg_string(lcfg, 0);
159
160         if (!LUSTRE_CFG_BUFLEN(lcfg, 2)) {
161                 CERROR("No UUID passed!\n");
162                 RETURN(-EINVAL);
163         }
164         uuid = lustre_cfg_string(lcfg, 2);
165
166         CDEBUG(D_IOCTL, "attach type %s name: %s uuid: %s\n",
167                MKSTR(typename), MKSTR(name), MKSTR(uuid));
168
169         /* Mountconf transitional hack, should go away after 1.6.
170            1.4.7 uses the old names, so translate back if the
171            mountconf flag is set.
172            1.6 should set this flag, and translate the other way here
173            if not set. */
174         if (lcfg->lcfg_flags & LCFG_FLG_MOUNTCONF){
175                 char *tmp = NULL;
176                 if (strcmp(typename, "mds") == 0)
177                         tmp = "mdt";
178                 if (strcmp(typename, "mdt") == 0)
179                         tmp = "mds";
180                 if (strcmp(typename, "osd") == 0)
181                         tmp = "obdfilter";
182                 if (tmp) {
183                         LCONSOLE_WARN("Using type %s for %s %s\n", tmp,
184                                       MKSTR(typename), MKSTR(name));
185                         typename = tmp;
186                 }
187         }
188
189         obd = class_newdev(typename, name);
190         if (IS_ERR(obd)) {
191                 /* Already exists or out of obds */
192                 rc = PTR_ERR(obd);
193                 obd = NULL;
194                 CERROR("Cannot create device %s of type %s : %d\n",
195                        name, typename, rc);
196                 GOTO(out, rc);
197         }
198         LASSERTF(obd != NULL, "Cannot get obd device %s of type %s\n",
199                  name, typename);
200         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
201                  "obd %p obd_magic %08X != %08X\n",
202                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
203         LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0,
204                  "%p obd_name %s != %s\n", obd, obd->obd_name, name);
205
206         rwlock_init(&obd->obd_pool_lock);
207         obd->obd_pool_limit = 0;
208         obd->obd_pool_slv = 0;
209
210         CFS_INIT_LIST_HEAD(&obd->obd_exports);
211         CFS_INIT_LIST_HEAD(&obd->obd_delayed_exports);
212         CFS_INIT_LIST_HEAD(&obd->obd_exports_timed);
213         CFS_INIT_LIST_HEAD(&obd->obd_nid_stats);
214         spin_lock_init(&obd->obd_nid_lock);
215         spin_lock_init(&obd->obd_dev_lock);
216         sema_init(&obd->obd_dev_sem, 1);
217         spin_lock_init(&obd->obd_osfs_lock);
218         /* obd->obd_osfs_age must be set to a value in the distant
219          * past to guarantee a fresh statfs is fetched on mount. */
220         obd->obd_osfs_age = cfs_time_shift_64(-1000);
221
222         /* XXX belongs in setup not attach  */
223         /* recovery data */
224         cfs_init_timer(&obd->obd_recovery_timer);
225         spin_lock_init(&obd->obd_processing_task_lock);
226         cfs_waitq_init(&obd->obd_next_transno_waitq);
227         cfs_waitq_init(&obd->obd_evict_inprogress_waitq);
228         cfs_waitq_init(&obd->obd_llog_waitq);
229         init_mutex(&obd->obd_llog_alloc);
230         init_mutex(&obd->obd_llog_cat_process);
231         CFS_INIT_LIST_HEAD(&obd->obd_recovery_queue);
232         CFS_INIT_LIST_HEAD(&obd->obd_delayed_reply_queue);
233
234         len = strlen(uuid);
235         if (len >= sizeof(obd->obd_uuid)) {
236                 CERROR("uuid must be < %d bytes long\n",
237                        (int)sizeof(obd->obd_uuid));
238                 GOTO(out, rc = -EINVAL);
239         }
240         memcpy(obd->obd_uuid.uuid, uuid, len);
241
242         /* do the attach */
243         if (OBP(obd, attach)) {
244                 rc = OBP(obd,attach)(obd, sizeof *lcfg, lcfg);
245                 if (rc)
246                         GOTO(out, rc = -EINVAL);
247         }
248
249         /* Detach drops this */
250         spin_lock(&obd->obd_dev_lock);
251         atomic_set(&obd->obd_refcount, 1);
252         spin_unlock(&obd->obd_dev_lock);
253
254         obd->obd_attached = 1;
255         CDEBUG(D_IOCTL, "OBD: dev %d attached type %s with refcount %d\n",
256                obd->obd_minor, typename, atomic_read(&obd->obd_refcount));
257         RETURN(0);
258  out:
259         if (obd != NULL) {
260                 class_release_dev(obd);
261         }
262         return rc;
263 }
264
265 int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
266 {
267         int err = 0;
268         struct obd_export *exp;
269         ENTRY;
270
271         LASSERT(obd != NULL);
272         LASSERTF(obd == class_num2obd(obd->obd_minor),
273                  "obd %p != obd_devs[%d] %p\n",
274                  obd, obd->obd_minor, class_num2obd(obd->obd_minor));
275         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
276                  "obd %p obd_magic %08x != %08x\n",
277                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
278
279         /* have we attached a type to this device? */
280         if (!obd->obd_attached) {
281                 CERROR("Device %d not attached\n", obd->obd_minor);
282                 RETURN(-ENODEV);
283         }
284
285         if (obd->obd_set_up) {
286                 CERROR("Device %d already setup (type %s)\n",
287                        obd->obd_minor, obd->obd_type->typ_name);
288                 RETURN(-EEXIST);
289         }
290
291         /* is someone else setting us up right now? (attach inits spinlock) */
292         spin_lock(&obd->obd_dev_lock);
293         if (obd->obd_starting) {
294                 spin_unlock(&obd->obd_dev_lock);
295                 CERROR("Device %d setup in progress (type %s)\n",
296                        obd->obd_minor, obd->obd_type->typ_name);
297                 RETURN(-EEXIST);
298         }
299         /* just leave this on forever.  I can't use obd_set_up here because
300            other fns check that status, and we're not actually set up yet. */
301         obd->obd_starting = 1;
302         obd->obd_uuid_hash = NULL;
303         obd->obd_nid_hash = NULL;
304         obd->obd_nid_stats_hash = NULL;
305         spin_unlock(&obd->obd_dev_lock);
306
307         /* create an uuid-export lustre hash */
308         obd->obd_uuid_hash = lustre_hash_init("UUID_HASH", 7, 7,
309                                               &uuid_hash_ops, 0);
310         if (!obd->obd_uuid_hash)
311                 GOTO(err_hash, err = -ENOMEM);
312
313         /* create a nid-export lustre hash */
314         obd->obd_nid_hash = lustre_hash_init("NID_HASH", 7, 7,
315                                              &nid_hash_ops, 0);
316         if (!obd->obd_nid_hash)
317                 GOTO(err_hash, err = -ENOMEM);
318
319         /* create a nid-stats lustre hash */
320         obd->obd_nid_stats_hash = lustre_hash_init("NID_STATS", 7, 7,
321                                                    &nid_stat_hash_ops, 0);
322         if (!obd->obd_nid_stats_hash)
323                 GOTO(err_hash, err = -ENOMEM);
324
325         exp = class_new_export(obd, &obd->obd_uuid);
326         if (IS_ERR(exp))
327                 GOTO(err_hash, err = PTR_ERR(exp));
328
329         obd->obd_self_export = exp;
330         list_del_init(&exp->exp_obd_chain_timed);
331         class_export_put(exp);
332
333         err = obd_setup(obd, sizeof(*lcfg), lcfg);
334         if (err)
335                 GOTO(err_exp, err);
336
337         obd->obd_set_up = 1;
338         spin_lock(&obd->obd_dev_lock);
339         /* cleanup drops this */
340         class_incref(obd);
341         spin_unlock(&obd->obd_dev_lock);
342
343         CDEBUG(D_IOCTL, "finished setup of obd %s (uuid %s)\n",
344                obd->obd_name, obd->obd_uuid.uuid);
345
346         RETURN(0);
347 err_exp:
348         class_unlink_export(obd->obd_self_export);
349         obd->obd_self_export = NULL;
350 err_hash:
351         if (obd->obd_uuid_hash) {
352                 lustre_hash_exit(obd->obd_uuid_hash);
353                 obd->obd_uuid_hash = NULL;
354         }
355         if (obd->obd_nid_hash) {
356                 lustre_hash_exit(obd->obd_nid_hash);
357                 obd->obd_nid_hash = NULL;
358         }
359         if (obd->obd_nid_stats_hash) {
360                 lustre_hash_exit(obd->obd_nid_stats_hash);
361                 obd->obd_nid_stats_hash = NULL;
362         }
363         obd->obd_starting = 0;
364         CERROR("setup %s failed (%d)\n", obd->obd_name, err);
365         return err;
366 }
367
368 int class_detach(struct obd_device *obd, struct lustre_cfg *lcfg)
369 {
370         ENTRY;
371
372         if (obd->obd_set_up) {
373                 CERROR("OBD device %d still set up\n", obd->obd_minor);
374                 RETURN(-EBUSY);
375         }
376
377         spin_lock(&obd->obd_dev_lock);
378         if (!obd->obd_attached) {
379                 spin_unlock(&obd->obd_dev_lock);
380                 CERROR("OBD device %d not attached\n", obd->obd_minor);
381                 RETURN(-ENODEV);
382         }
383         obd->obd_attached = 0;
384         spin_unlock(&obd->obd_dev_lock);
385
386         CDEBUG(D_IOCTL, "detach on obd %s (uuid %s)\n",
387                obd->obd_name, obd->obd_uuid.uuid);
388
389         class_decref(obd);
390
391         /* not strictly necessary, but cleans up eagerly */
392         obd_zombie_impexp_cull();
393
394         RETURN(0);
395 }
396
397 static void dump_exports(struct obd_device *obd)
398 {
399         struct obd_export *exp;
400
401         spin_lock(&obd->obd_dev_lock);
402         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
403                 struct ptlrpc_reply_state *rs;
404                 struct ptlrpc_reply_state *first_reply = NULL;
405                 int                        nreplies = 0;
406
407                 spin_lock(&exp->exp_lock);
408                 list_for_each_entry (rs, &exp->exp_outstanding_replies,
409                                      rs_exp_list) {
410                         if (nreplies == 0)
411                                 first_reply = rs;
412                         nreplies++;
413                 }
414                 spin_unlock(&exp->exp_lock);
415
416                 CDEBUG(D_IOCTL, "%s: %p %s %s %d %d %d: %p %s\n",
417                        obd->obd_name, exp, exp->exp_client_uuid.uuid,
418                        obd_export_nid2str(exp),
419                        atomic_read(&exp->exp_refcount),
420                        exp->exp_failed, nreplies, first_reply,
421                        nreplies > 3 ? "..." : "");
422         }
423         spin_unlock(&obd->obd_dev_lock);
424 }
425
426 int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
427 {
428         int err = 0;
429         char *flag;
430         ENTRY;
431
432         OBD_RACE(OBD_FAIL_LDLM_RECOV_CLIENTS);
433
434         if (!obd->obd_set_up) {
435                 CERROR("Device %d not setup\n", obd->obd_minor);
436                 RETURN(-ENODEV);
437         }
438
439         spin_lock(&obd->obd_dev_lock);
440         if (obd->obd_stopping) {
441                 spin_unlock(&obd->obd_dev_lock);
442                 CERROR("OBD %d already stopping\n", obd->obd_minor);
443                 RETURN(-ENODEV);
444         }
445         /* Leave this on forever */
446         obd->obd_stopping = 1;
447         spin_unlock(&obd->obd_dev_lock);
448
449         if (lcfg->lcfg_bufcount >= 2 && LUSTRE_CFG_BUFLEN(lcfg, 1) > 0) {
450                 for (flag = lustre_cfg_string(lcfg, 1); *flag != 0; flag++)
451                         switch (*flag) {
452                         case 'F':
453                                 obd->obd_force = 1;
454                                 break;
455                         case 'A':
456                                 LCONSOLE_WARN("Failing over %s\n",
457                                               obd->obd_name);
458                                 obd->obd_fail = 1;
459                                 obd->obd_no_transno = 1;
460                                 obd->obd_no_recov = 1;
461                                 /* Set the obd readonly if we can */
462                                 if (OBP(obd, iocontrol))
463                                         obd_iocontrol(OBD_IOC_SET_READONLY,
464                                                       obd->obd_self_export,
465                                                       0, NULL, NULL);
466                                 break;
467                         default:
468                                 CERROR("unrecognised flag '%c'\n",
469                                        *flag);
470                         }
471         }
472
473         /* The three references that should be remaining are the
474          * obd_self_export and the attach and setup references. */
475         if (atomic_read(&obd->obd_refcount) > 3) {
476 #if 0           /* We should never fail to cleanup with mountconf */
477                 if (!(obd->obd_fail || obd->obd_force)) {
478                         CERROR("OBD %s is still busy with %d references\n"
479                                "You should stop active file system users,"
480                                " or use the --force option to cleanup.\n",
481                                obd->obd_name, atomic_read(&obd->obd_refcount));
482                         dump_exports(obd);
483                         /* Allow a failed cleanup to try again. */
484                         obd->obd_stopping = 0;
485                         RETURN(-EBUSY);
486                 }
487 #endif
488                 /* refcounf - 3 might be the number of real exports
489                    (excluding self export). But class_incref is called
490                    by other things as well, so don't count on it. */
491                 CDEBUG(D_IOCTL, "%s: forcing exports to disconnect: %d\n",
492                        obd->obd_name, atomic_read(&obd->obd_refcount) - 3);
493                 dump_exports(obd);
494                 class_disconnect_exports(obd);
495         }
496
497         LASSERT(obd->obd_self_export);
498
499         /* destroy an uuid-export hash body */
500         lustre_hash_exit(obd->obd_uuid_hash);
501
502         /* destroy a nid-export hash body */
503         lustre_hash_exit(obd->obd_nid_hash);
504
505         /* destroy a nid-stats hash body */
506         lustre_hash_exit(obd->obd_nid_stats_hash);
507
508         /* Precleanup stage 1, we must make sure all exports (other than the
509            self-export) get destroyed. */
510         err = obd_precleanup(obd, OBD_CLEANUP_EXPORTS);
511         if (err)
512                 CERROR("Precleanup %s returned %d\n",
513                        obd->obd_name, err);
514         class_decref(obd);
515         obd->obd_set_up = 0;
516
517         RETURN(0);
518 }
519
520 struct obd_device *class_incref(struct obd_device *obd)
521 {
522         atomic_inc(&obd->obd_refcount);
523         CDEBUG(D_INFO, "incref %s (%p) now %d\n", obd->obd_name, obd,
524                atomic_read(&obd->obd_refcount));
525
526         return obd;
527 }
528
529 void class_decref(struct obd_device *obd)
530 {
531         int err;
532         int refs;
533
534         spin_lock(&obd->obd_dev_lock);
535         atomic_dec(&obd->obd_refcount);
536         refs = atomic_read(&obd->obd_refcount);
537         spin_unlock(&obd->obd_dev_lock);
538
539         CDEBUG(D_INFO, "Decref %s (%p) now %d\n", obd->obd_name, obd, refs);
540
541         if ((refs == 1) && obd->obd_stopping) {
542                 /* All exports (other than the self-export) have been
543                    destroyed; there should be no more in-progress ops
544                    by this point.*/
545                 /* if we're not stopping, we didn't finish setup */
546                 /* Precleanup stage 2,  do other type-specific
547                    cleanup requiring the self-export. */
548                 err = obd_precleanup(obd, OBD_CLEANUP_SELF_EXP);
549                 if (err)
550                         CERROR("Precleanup %s returned %d\n",
551                                obd->obd_name, err);
552
553                 spin_lock(&obd->obd_self_export->exp_lock);
554                 obd->obd_self_export->exp_flags |= exp_flags_from_obd(obd);
555                 spin_unlock(&obd->obd_self_export->exp_lock);
556
557                 /* note that we'll recurse into class_decref again */
558                 class_unlink_export(obd->obd_self_export);
559                 return;
560         }
561
562         if (refs == 0) {
563                 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
564                        obd->obd_name, obd->obd_uuid.uuid);
565                 LASSERT(!obd->obd_attached);
566                 if (obd->obd_stopping) {
567                         /* If we're not stopping, we were never set up */
568                         err = obd_cleanup(obd);
569                         if (err)
570                                 CERROR("Cleanup %s returned %d\n",
571                                        obd->obd_name, err);
572                 }
573                 if (OBP(obd, detach)) {
574                         err = OBP(obd,detach)(obd);
575                         if (err)
576                                 CERROR("Detach returned %d\n", err);
577                 }
578                 class_release_dev(obd);
579         }
580 }
581
582 int class_add_conn(struct obd_device *obd, struct lustre_cfg *lcfg)
583 {
584         struct obd_import *imp;
585         struct obd_uuid uuid;
586         int rc;
587         ENTRY;
588
589         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1 ||
590             LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(struct obd_uuid)) {
591                 CERROR("invalid conn_uuid\n");
592                 RETURN(-EINVAL);
593         }
594         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) &&
595             strcmp(obd->obd_type->typ_name, LUSTRE_OSC_NAME) &&
596             strcmp(obd->obd_type->typ_name, LUSTRE_MGC_NAME)) {
597                 CERROR("can't add connection on non-client dev\n");
598                 RETURN(-EINVAL);
599         }
600
601         imp = obd->u.cli.cl_import;
602         if (!imp) {
603                 CERROR("try to add conn on immature client dev\n");
604                 RETURN(-EINVAL);
605         }
606
607         obd_str2uuid(&uuid, lustre_cfg_string(lcfg, 1));
608         rc = obd_add_conn(imp, &uuid, lcfg->lcfg_num);
609
610         RETURN(rc);
611 }
612
613 int class_del_conn(struct obd_device *obd, struct lustre_cfg *lcfg)
614 {
615         struct obd_import *imp;
616         struct obd_uuid uuid;
617         int rc;
618         ENTRY;
619
620         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1 ||
621             LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(struct obd_uuid)) {
622                 CERROR("invalid conn_uuid\n");
623                 RETURN(-EINVAL);
624         }
625         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) &&
626             strcmp(obd->obd_type->typ_name, LUSTRE_OSC_NAME)) {
627                 CERROR("can't del connection on non-client dev\n");
628                 RETURN(-EINVAL);
629         }
630
631         imp = obd->u.cli.cl_import;
632         if (!imp) {
633                 CERROR("try to del conn on immature client dev\n");
634                 RETURN(-EINVAL);
635         }
636
637         obd_str2uuid(&uuid, lustre_cfg_string(lcfg, 1));
638         rc = obd_del_conn(imp, &uuid);
639
640         RETURN(rc);
641 }
642
643 CFS_LIST_HEAD(lustre_profile_list);
644
645 struct lustre_profile *class_get_profile(char * prof)
646 {
647         struct lustre_profile *lprof;
648
649         ENTRY;
650         list_for_each_entry(lprof, &lustre_profile_list, lp_list) {
651                 if (!strcmp(lprof->lp_profile, prof)) {
652                         RETURN(lprof);
653                 }
654         }
655         RETURN(NULL);
656 }
657
658 int class_add_profile(int proflen, char *prof, int osclen, char *osc,
659                       int mdclen, char *mdc)
660 {
661         struct lustre_profile *lprof;
662         int err = 0;
663         ENTRY;
664
665         CDEBUG(D_CONFIG, "Add profile %s\n", prof);
666
667         OBD_ALLOC(lprof, sizeof(*lprof));
668         if (lprof == NULL)
669                 RETURN(-ENOMEM);
670         CFS_INIT_LIST_HEAD(&lprof->lp_list);
671
672         LASSERT(proflen == (strlen(prof) + 1));
673         OBD_ALLOC(lprof->lp_profile, proflen);
674         if (lprof->lp_profile == NULL)
675                 GOTO(out, err = -ENOMEM);
676         memcpy(lprof->lp_profile, prof, proflen);
677
678         LASSERT(osclen == (strlen(osc) + 1));
679         OBD_ALLOC(lprof->lp_osc, osclen);
680         if (lprof->lp_osc == NULL)
681                 GOTO(out, err = -ENOMEM);
682         memcpy(lprof->lp_osc, osc, osclen);
683
684         if (mdclen > 0) {
685                 LASSERT(mdclen == (strlen(mdc) + 1));
686                 OBD_ALLOC(lprof->lp_mdc, mdclen);
687                 if (lprof->lp_mdc == NULL)
688                         GOTO(out, err = -ENOMEM);
689                 memcpy(lprof->lp_mdc, mdc, mdclen);
690         }
691
692         list_add(&lprof->lp_list, &lustre_profile_list);
693         RETURN(err);
694
695 out:
696         if (lprof->lp_mdc)
697                 OBD_FREE(lprof->lp_mdc, mdclen);
698         if (lprof->lp_osc)
699                 OBD_FREE(lprof->lp_osc, osclen);
700         if (lprof->lp_profile)
701                 OBD_FREE(lprof->lp_profile, proflen);
702         OBD_FREE(lprof, sizeof(*lprof));
703         RETURN(err);
704 }
705
706 void class_del_profile(char *prof)
707 {
708         struct lustre_profile *lprof;
709         ENTRY;
710
711         CDEBUG(D_CONFIG, "Del profile %s\n", prof);
712
713         lprof = class_get_profile(prof);
714         if (lprof) {
715                 list_del(&lprof->lp_list);
716                 OBD_FREE(lprof->lp_profile, strlen(lprof->lp_profile) + 1);
717                 OBD_FREE(lprof->lp_osc, strlen(lprof->lp_osc) + 1);
718                 if (lprof->lp_mdc)
719                         OBD_FREE(lprof->lp_mdc, strlen(lprof->lp_mdc) + 1);
720                 OBD_FREE(lprof, sizeof *lprof);
721         }
722         EXIT;
723 }
724
725 /* COMPAT_146 */
726 void class_del_profiles(void)
727 {
728         struct lustre_profile *lprof, *n;
729         ENTRY;
730
731         list_for_each_entry_safe(lprof, n, &lustre_profile_list, lp_list) {
732                 list_del(&lprof->lp_list);
733                 OBD_FREE(lprof->lp_profile, strlen(lprof->lp_profile) + 1);
734                 OBD_FREE(lprof->lp_osc, strlen(lprof->lp_osc) + 1);
735                 if (lprof->lp_mdc)
736                         OBD_FREE(lprof->lp_mdc, strlen(lprof->lp_mdc) + 1);
737                 OBD_FREE(lprof, sizeof *lprof);
738         }
739         EXIT;
740 }
741
742 /* We can't call ll_process_config directly because it lives in a module that
743    must be loaded after this one. */
744 static int (*client_process_config)(struct lustre_cfg *lcfg) = NULL;
745
746 void lustre_register_client_process_config(int (*cpc)(struct lustre_cfg *lcfg))
747 {
748         client_process_config = cpc;
749 }
750 EXPORT_SYMBOL(lustre_register_client_process_config);
751
752 int class_process_config(struct lustre_cfg *lcfg)
753 {
754         struct obd_device *obd;
755         int err;
756
757         LASSERT(lcfg && !IS_ERR(lcfg));
758         CDEBUG(D_IOCTL, "processing cmd: %x\n", lcfg->lcfg_command);
759
760         /* Commands that don't need a device */
761         switch(lcfg->lcfg_command) {
762         case LCFG_ATTACH: {
763                 err = class_attach(lcfg);
764                 GOTO(out, err);
765         }
766         case LCFG_ADD_UUID: {
767                 CDEBUG(D_IOCTL, "adding mapping from uuid %s to nid "LPX64
768                        " (%s)\n", lustre_cfg_string(lcfg, 1),
769                        lcfg->lcfg_nid, libcfs_nid2str(lcfg->lcfg_nid));
770
771                 err = class_add_uuid(lustre_cfg_string(lcfg, 1), lcfg->lcfg_nid);
772                 GOTO(out, err);
773         }
774         case LCFG_DEL_UUID: {
775                 CDEBUG(D_IOCTL, "removing mappings for uuid %s\n",
776                        (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) == 0)
777                        ? "<all uuids>" : lustre_cfg_string(lcfg, 1));
778
779                 err = class_del_uuid(lustre_cfg_string(lcfg, 1));
780                 GOTO(out, err);
781         }
782         case LCFG_MOUNTOPT: {
783                 CDEBUG(D_IOCTL, "mountopt: profile %s osc %s mdc %s\n",
784                        lustre_cfg_string(lcfg, 1),
785                        lustre_cfg_string(lcfg, 2),
786                        lustre_cfg_string(lcfg, 3));
787                 /* set these mount options somewhere, so ll_fill_super
788                  * can find them. */
789                 err = class_add_profile(LUSTRE_CFG_BUFLEN(lcfg, 1),
790                                         lustre_cfg_string(lcfg, 1),
791                                         LUSTRE_CFG_BUFLEN(lcfg, 2),
792                                         lustre_cfg_string(lcfg, 2),
793                                         LUSTRE_CFG_BUFLEN(lcfg, 3),
794                                         lustre_cfg_string(lcfg, 3));
795                 GOTO(out, err);
796         }
797         case LCFG_DEL_MOUNTOPT: {
798                 CDEBUG(D_IOCTL, "mountopt: profile %s\n",
799                        lustre_cfg_string(lcfg, 1));
800                 class_del_profile(lustre_cfg_string(lcfg, 1));
801                 GOTO(out, err = 0);
802         }
803         case LCFG_SET_TIMEOUT: {
804                 CDEBUG(D_IOCTL, "changing lustre timeout from %d to %d\n",
805                        obd_timeout, lcfg->lcfg_num);
806                 obd_timeout = max(lcfg->lcfg_num, 1U);
807                 GOTO(out, err = 0);
808         }
809         case LCFG_SET_UPCALL: {
810                 LCONSOLE_ERROR_MSG(0x15a, "recovery upcall is deprecated\n");
811                 /* COMPAT_146 Don't fail on old configs */
812                 GOTO(out, err = 0);
813         }
814         case LCFG_MARKER: {
815                 struct cfg_marker *marker;
816                 marker = lustre_cfg_buf(lcfg, 1);
817                 CDEBUG(D_IOCTL, "marker %d (%#x) %.16s %s\n", marker->cm_step,
818                       marker->cm_flags, marker->cm_tgtname, marker->cm_comment);
819                 GOTO(out, err = 0);
820         }
821         case LCFG_PARAM: {
822                 /* llite has no obd */
823                 if ((class_match_param(lustre_cfg_string(lcfg, 1),
824                                        PARAM_LLITE, 0) == 0) &&
825                     client_process_config) {
826                         err = (*client_process_config)(lcfg);
827                         GOTO(out, err);
828                 }
829                 /* Fall through */
830                 break;
831         }
832         }
833
834         /* Commands that require a device */
835         obd = class_name2obd(lustre_cfg_string(lcfg, 0));
836         if (obd == NULL) {
837                 if (!LUSTRE_CFG_BUFLEN(lcfg, 0))
838                         CERROR("this lcfg command requires a device name\n");
839                 else
840                         CERROR("no device for: %s\n",
841                                lustre_cfg_string(lcfg, 0));
842
843                 GOTO(out, err = -EINVAL);
844         }
845
846         switch(lcfg->lcfg_command) {
847         case LCFG_SETUP: {
848                 err = class_setup(obd, lcfg);
849                 GOTO(out, err);
850         }
851         case LCFG_DETACH: {
852                 err = class_detach(obd, lcfg);
853                 GOTO(out, err = 0);
854         }
855         case LCFG_CLEANUP: {
856                 err = class_cleanup(obd, lcfg);
857                 GOTO(out, err = 0);
858         }
859         case LCFG_ADD_CONN: {
860                 err = class_add_conn(obd, lcfg);
861                 GOTO(out, err = 0);
862         }
863         case LCFG_DEL_CONN: {
864                 err = class_del_conn(obd, lcfg);
865                 GOTO(out, err = 0);
866         }
867         case LCFG_POOL_NEW: {
868                 err = obd_pool_new(obd, lustre_cfg_string(lcfg, 2));
869                 GOTO(out, err = 0);
870                 break;
871         }
872         case LCFG_POOL_ADD: {
873                 err = obd_pool_add(obd, lustre_cfg_string(lcfg, 2),
874                                    lustre_cfg_string(lcfg, 3));
875                 GOTO(out, err = 0);
876                 break;
877         }
878         case LCFG_POOL_REM: {
879                 err = obd_pool_rem(obd, lustre_cfg_string(lcfg, 2),
880                                    lustre_cfg_string(lcfg, 3));
881                 GOTO(out, err = 0);
882                 break;
883         }
884         case LCFG_POOL_DEL: {
885                 err = obd_pool_del(obd, lustre_cfg_string(lcfg, 2));
886                 GOTO(out, err = 0);
887                 break;
888         }
889         default: {
890                 err = obd_process_config(obd, sizeof(*lcfg), lcfg);
891                 GOTO(out, err);
892
893         }
894         }
895 out:
896         if ((err < 0) && !(lcfg->lcfg_command & LCFG_REQUIRED)) {
897                 CWARN("Ignoring error %d on optional command %#x\n", err,
898                       lcfg->lcfg_command);
899                 err = 0;
900         }
901         return err;
902 }
903
904 int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
905                              struct lustre_cfg *lcfg, void *data)
906 {
907 #ifdef __KERNEL__
908         struct lprocfs_vars *var;
909         char *key, *sval;
910         int i, vallen;
911         int matched = 0, j = 0;
912         int rc = 0;
913         ENTRY;
914
915         if (lcfg->lcfg_command != LCFG_PARAM) {
916                 CERROR("Unknown command: %x\n", lcfg->lcfg_command);
917                 RETURN(-EINVAL);
918         }
919
920         /* e.g. tunefs.lustre --param mdt.group_upcall=foo /r/tmp/lustre-mdt
921            or   lctl conf_param lustre-MDT0000.mdt.group_upcall=bar
922            or   lctl conf_param lustre-OST0000.osc.max_dirty_mb=36 */
923         for (i = 1; i < lcfg->lcfg_bufcount; i++) {
924                 key = lustre_cfg_buf(lcfg, i);
925                 /* Strip off prefix */
926                 class_match_param(key, prefix, &key);
927                 sval = strchr(key, '=');
928                 if (!sval || (*(sval + 1) == 0)) {
929                         CERROR("Can't parse param %s\n", key);
930                         rc = -EINVAL;
931                         /* continue parsing other params */
932                         continue;
933                 }
934                 sval++;
935                 vallen = strlen(sval);
936                 matched = 0;
937                 j = 0;
938                 /* Search proc entries */
939                 while (lvars[j].name) {
940                         var = &lvars[j];
941                         if (class_match_param(key, (char *)var->name, 0) == 0) {
942                                 matched++;
943                                 rc = -EROFS;
944                                 if (var->write_fptr) {
945                                         mm_segment_t oldfs;
946                                         oldfs = get_fs();
947                                         set_fs(KERNEL_DS);
948                                         rc = (var->write_fptr)(NULL, sval,
949                                                                vallen, data);
950                                         set_fs(oldfs);
951                                 }
952                                 if (rc < 0)
953                                         CERROR("writing proc entry %s err %d\n",
954                                                var->name, rc);
955                                 break;
956                         }
957                         j++;
958                 }
959                 if (!matched) {
960                         CERROR("%s: unknown param %s\n",
961                                (char *)lustre_cfg_string(lcfg, 0), key);
962                         /* rc = -EINVAL;        continue parsing other params */
963                 } else {
964                         LCONSOLE_INFO("%s.%.*s: set parameter %.*s=%s\n",
965                                       lustre_cfg_string(lcfg, 0),
966                                       (int)strlen(prefix) - 1, prefix,
967                                       (int)(sval - key - 1), key, sval);
968                 }
969         }
970
971         if (rc > 0)
972                 rc = 0;
973         RETURN(rc);
974 #else
975         CDEBUG(D_CONFIG, "liblustre can't process params.\n");
976         /* Don't throw config error */
977         RETURN(0);
978 #endif
979 }
980
981 int class_config_dump_handler(struct llog_handle * handle,
982                               struct llog_rec_hdr *rec, void *data);
983
984 #ifdef __KERNEL__
985 extern int lustre_check_exclusion(struct super_block *sb, char *svname);
986 #else
987 #define lustre_check_exclusion(a,b)  0
988 #endif
989
990 static int class_config_llog_handler(struct llog_handle * handle,
991                                      struct llog_rec_hdr *rec, void *data)
992 {
993         struct config_llog_instance *clli = data;
994         int cfg_len = rec->lrh_len;
995         char *cfg_buf = (char*) (rec + 1);
996         int rc = 0;
997         ENTRY;
998
999         //class_config_dump_handler(handle, rec, data);
1000
1001         switch (rec->lrh_type) {
1002         case OBD_CFG_REC: {
1003                 struct lustre_cfg *lcfg, *lcfg_new;
1004                 struct lustre_cfg_bufs bufs;
1005                 char *inst_name = NULL;
1006                 int inst_len = 0;
1007                 int inst = 0, swab = 0;
1008
1009                 lcfg = (struct lustre_cfg *)cfg_buf;
1010                 if (lcfg->lcfg_version == __swab32(LUSTRE_CFG_VERSION)) {
1011                         lustre_swab_lustre_cfg(lcfg);
1012                         swab = 1;
1013                 }
1014
1015                 rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1016                 if (rc)
1017                         GOTO(out, rc);
1018
1019                 /* Figure out config state info */
1020                 if (lcfg->lcfg_command == LCFG_MARKER) {
1021                         struct cfg_marker *marker = lustre_cfg_buf(lcfg, 1);
1022                         lustre_swab_cfg_marker(marker, swab,
1023                                                LUSTRE_CFG_BUFLEN(lcfg, 1));
1024                         CDEBUG(D_CONFIG, "Marker, inst_flg=%#x mark_flg=%#x\n",
1025                                clli->cfg_flags, marker->cm_flags);
1026                         if (marker->cm_flags & CM_START) {
1027                                 /* all previous flags off */
1028                                 clli->cfg_flags = CFG_F_MARKER;
1029                                 if (marker->cm_flags & CM_SKIP) {
1030                                         clli->cfg_flags |= CFG_F_SKIP;
1031                                         CDEBUG(D_CONFIG, "SKIP #%d\n",
1032                                                marker->cm_step);
1033                                 } else if ((marker->cm_flags & CM_EXCLUDE) ||
1034                                            lustre_check_exclusion(clli->cfg_sb,
1035                                                           marker->cm_tgtname)) {
1036                                         clli->cfg_flags |= CFG_F_EXCLUDE;
1037                                         CDEBUG(D_CONFIG, "EXCLUDE %d\n",
1038                                                marker->cm_step);
1039                                 }
1040                         } else if (marker->cm_flags & CM_END) {
1041                                 clli->cfg_flags = 0;
1042                         }
1043                 }
1044                 /* A config command without a start marker before it is
1045                    illegal (post 146) */
1046                 if (!(clli->cfg_flags & CFG_F_COMPAT146) &&
1047                     !(clli->cfg_flags & CFG_F_MARKER) &&
1048                     (lcfg->lcfg_command != LCFG_MARKER)) {
1049                         CWARN("Config not inside markers, ignoring! (%#x)\n",
1050                               clli->cfg_flags);
1051                         clli->cfg_flags |= CFG_F_SKIP;
1052                 }
1053
1054                 if (clli->cfg_flags & CFG_F_SKIP) {
1055                         CDEBUG(D_CONFIG, "skipping %#x\n",
1056                                clli->cfg_flags);
1057                         rc = 0;
1058                         /* No processing! */
1059                         break;
1060                 }
1061
1062                 /**
1063                  * For interop mode between 1.8 and 2.0:
1064                  * skip "lmv" configuration which exists since 2.0.
1065                  */
1066                 {
1067                         char *devname = lustre_cfg_string(lcfg, 0);
1068                         char *typename = lustre_cfg_string(lcfg, 1);
1069
1070                         if (devname)
1071                                 devname += strlen(devname) - strlen("clilmv");
1072
1073                         if ((lcfg->lcfg_command == LCFG_ATTACH && typename &&
1074                              strcmp(typename, "lmv") == 0) ||
1075                             (devname && strcmp(devname, "clilmv") == 0)) {
1076                                 CWARN("skipping 'lmv' config: cmd=%x,%s:%s\n",
1077                                        lcfg->lcfg_command, devname, typename);
1078                                 GOTO(out, rc = 0);
1079                         }
1080                 }
1081
1082                 if ((clli->cfg_flags & CFG_F_EXCLUDE) &&
1083                     (lcfg->lcfg_command == LCFG_LOV_ADD_OBD))
1084                         /* Add inactive instead */
1085                         lcfg->lcfg_command = LCFG_LOV_ADD_INA;
1086
1087                 lustre_cfg_bufs_init(&bufs, lcfg);
1088
1089                 if (clli && clli->cfg_instance &&
1090                     LUSTRE_CFG_BUFLEN(lcfg, 0) > 0){
1091                         inst = 1;
1092                         inst_len = LUSTRE_CFG_BUFLEN(lcfg, 0) +
1093                                 strlen(clli->cfg_instance) + 1;
1094                         OBD_ALLOC(inst_name, inst_len);
1095                         if (inst_name == NULL)
1096                                 GOTO(out, rc = -ENOMEM);
1097                         sprintf(inst_name, "%s-%s",
1098                                 lustre_cfg_string(lcfg, 0),
1099                                 clli->cfg_instance);
1100                         lustre_cfg_bufs_set_string(&bufs, 0, inst_name);
1101                         CDEBUG(D_CONFIG, "cmd %x, instance name: %s\n",
1102                                lcfg->lcfg_command, inst_name);
1103                 }
1104
1105                 /* we override the llog's uuid for clients, to insure they
1106                 are unique */
1107                 if (clli && clli->cfg_instance &&
1108                     lcfg->lcfg_command == LCFG_ATTACH) {
1109                         lustre_cfg_bufs_set_string(&bufs, 2,
1110                                                    clli->cfg_uuid.uuid);
1111                 }
1112
1113                 lcfg_new = lustre_cfg_new(lcfg->lcfg_command, &bufs);
1114
1115                 lcfg_new->lcfg_num   = lcfg->lcfg_num;
1116                 lcfg_new->lcfg_flags = lcfg->lcfg_flags;
1117
1118                 /* XXX Hack to try to remain binary compatible with
1119                  * pre-newconfig logs */
1120                 if (lcfg->lcfg_nal != 0 &&      /* pre-newconfig log? */
1121                     (lcfg->lcfg_nid >> 32) == 0) {
1122                         __u32 addr = (__u32)(lcfg->lcfg_nid & 0xffffffff);
1123
1124                         lcfg_new->lcfg_nid =
1125                                 LNET_MKNID(LNET_MKNET(lcfg->lcfg_nal, 0), addr);
1126                         CWARN("Converted pre-newconfig NAL %d NID %x to %s\n",
1127                               lcfg->lcfg_nal, addr,
1128                               libcfs_nid2str(lcfg_new->lcfg_nid));
1129                 } else {
1130                         lcfg_new->lcfg_nid = lcfg->lcfg_nid;
1131                 }
1132
1133                 lcfg_new->lcfg_nal = 0; /* illegal value for obsolete field */
1134
1135                 rc = class_process_config(lcfg_new);
1136                 lustre_cfg_free(lcfg_new);
1137
1138                 if (inst)
1139                         OBD_FREE(inst_name, inst_len);
1140                 break;
1141         }
1142         default:
1143                 CERROR("Unknown llog record type %#x encountered\n",
1144                        rec->lrh_type);
1145                 break;
1146         }
1147 out:
1148         if (rc) {
1149                 CERROR("Err %d on cfg command:\n", rc);
1150                 class_config_dump_handler(handle, rec, data);
1151         }
1152         RETURN(rc);
1153 }
1154
1155 int class_config_parse_llog(struct llog_ctxt *ctxt, char *name,
1156                             struct config_llog_instance *cfg)
1157 {
1158         struct llog_process_cat_data cd = {0, 0};
1159         struct llog_handle *llh;
1160         int rc, rc2;
1161         ENTRY;
1162
1163         CDEBUG(D_INFO, "looking up llog %s\n", name);
1164         rc = llog_create(ctxt, &llh, NULL, name);
1165         if (rc)
1166                 RETURN(rc);
1167
1168         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
1169         if (rc)
1170                 GOTO(parse_out, rc);
1171
1172         /* continue processing from where we last stopped to end-of-log */
1173         if (cfg)
1174                 cd.lpcd_first_idx = cfg->cfg_last_idx;
1175         cd.lpcd_last_idx = 0;
1176
1177         rc = llog_process(llh, class_config_llog_handler, cfg, &cd);
1178
1179         CDEBUG(D_CONFIG, "Processed log %s gen %d-%d (rc=%d)\n", name,
1180                cd.lpcd_first_idx + 1, cd.lpcd_last_idx, rc);
1181         if (cfg)
1182                 cfg->cfg_last_idx = cd.lpcd_last_idx;
1183
1184 parse_out:
1185         rc2 = llog_close(llh);
1186         if (rc == 0)
1187                 rc = rc2;
1188
1189         RETURN(rc);
1190 }
1191
1192 int class_config_dump_handler(struct llog_handle * handle,
1193                               struct llog_rec_hdr *rec, void *data)
1194 {
1195         int cfg_len = rec->lrh_len;
1196         char *cfg_buf = (char*) (rec + 1);
1197         char *outstr, *ptr, *end;
1198         int rc = 0;
1199         ENTRY;
1200
1201         OBD_ALLOC(outstr, 256);
1202         end = outstr + 256;
1203         ptr = outstr;
1204         if (!outstr) {
1205                 RETURN(-ENOMEM);
1206         }
1207         if (rec->lrh_type == OBD_CFG_REC) {
1208                 struct lustre_cfg *lcfg;
1209                 int i;
1210
1211                 rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1212                 if (rc)
1213                         GOTO(out, rc);
1214                 lcfg = (struct lustre_cfg *)cfg_buf;
1215
1216                 ptr += snprintf(ptr, end-ptr, "cmd=%05x ",
1217                                 lcfg->lcfg_command);
1218                 if (lcfg->lcfg_flags) {
1219                         ptr += snprintf(ptr, end-ptr, "flags=%#08x ",
1220                                         lcfg->lcfg_flags);
1221                 }
1222                 if (lcfg->lcfg_num) {
1223                         ptr += snprintf(ptr, end-ptr, "num=%#08x ",
1224                                         lcfg->lcfg_num);
1225                 }
1226                 if (lcfg->lcfg_nid) {
1227                         ptr += snprintf(ptr, end-ptr, "nid=%s("LPX64")\n     ",
1228                                         libcfs_nid2str(lcfg->lcfg_nid),
1229                                         lcfg->lcfg_nid);
1230                 }
1231                 if (lcfg->lcfg_command == LCFG_MARKER) {
1232                         struct cfg_marker *marker = lustre_cfg_buf(lcfg, 1);
1233                         ptr += snprintf(ptr, end-ptr, "marker=%d(%#x)%s '%s'",
1234                                         marker->cm_step, marker->cm_flags,
1235                                         marker->cm_tgtname, marker->cm_comment);
1236                 } else {
1237                         for (i = 0; i <  lcfg->lcfg_bufcount; i++) {
1238                                 ptr += snprintf(ptr, end-ptr, "%d:%s  ", i,
1239                                                 lustre_cfg_string(lcfg, i));
1240                         }
1241                 }
1242                 LCONSOLE(D_WARNING, "   %s\n", outstr);
1243         } else {
1244                 LCONSOLE(D_WARNING, "unhandled lrh_type: %#x\n", rec->lrh_type);
1245                 rc = -EINVAL;
1246         }
1247 out:
1248         OBD_FREE(outstr, 256);
1249         RETURN(rc);
1250 }
1251
1252 int class_config_dump_llog(struct llog_ctxt *ctxt, char *name,
1253                            struct config_llog_instance *cfg)
1254 {
1255         struct llog_handle *llh;
1256         int rc, rc2;
1257         ENTRY;
1258
1259         LCONSOLE_INFO("Dumping config log %s\n", name);
1260
1261         rc = llog_create(ctxt, &llh, NULL, name);
1262         if (rc)
1263                 RETURN(rc);
1264
1265         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
1266         if (rc)
1267                 GOTO(parse_out, rc);
1268
1269         rc = llog_process(llh, class_config_dump_handler, cfg, NULL);
1270 parse_out:
1271         rc2 = llog_close(llh);
1272         if (rc == 0)
1273                 rc = rc2;
1274
1275         LCONSOLE_INFO("End config log %s\n", name);
1276         RETURN(rc);
1277
1278 }
1279
1280 /* Cleanup and detach */
1281 int class_manual_cleanup(struct obd_device *obd)
1282 {
1283         struct lustre_cfg *lcfg;
1284         struct lustre_cfg_bufs bufs;
1285         int rc;
1286         char flags[3]="";
1287         ENTRY;
1288
1289         if (!obd) {
1290                 CERROR("empty cleanup\n");
1291                 RETURN(-EALREADY);
1292         }
1293
1294         if (obd->obd_force)
1295                 strcat(flags, "F");
1296         if (obd->obd_fail)
1297                 strcat(flags, "A");
1298
1299         CDEBUG(D_CONFIG, "Manual cleanup of %s (flags='%s')\n",
1300                obd->obd_name, flags);
1301
1302         lustre_cfg_bufs_reset(&bufs, obd->obd_name);
1303         lustre_cfg_bufs_set_string(&bufs, 1, flags);
1304         lcfg = lustre_cfg_new(LCFG_CLEANUP, &bufs);
1305
1306         rc = class_process_config(lcfg);
1307         if (rc) {
1308                 CERROR("cleanup failed %d: %s\n", rc, obd->obd_name);
1309                 GOTO(out, rc);
1310         }
1311
1312         /* the lcfg is almost the same for both ops */
1313         lcfg->lcfg_command = LCFG_DETACH;
1314         rc = class_process_config(lcfg);
1315         if (rc)
1316                 CERROR("detach failed %d: %s\n", rc, obd->obd_name);
1317 out:
1318         lustre_cfg_free(lcfg);
1319         RETURN(rc);
1320 }
1321
1322 /*
1323  * uuid<->export lustre hash operations
1324  */
1325
1326 static unsigned
1327 uuid_hash(lustre_hash_t *lh,  void *key, unsigned mask)
1328 {
1329         return lh_djb2_hash(((struct obd_uuid *)key)->uuid,
1330                             sizeof(((struct obd_uuid *)key)->uuid), mask);
1331 }
1332
1333 static void *
1334 uuid_key(struct hlist_node *hnode)
1335 {
1336         struct obd_export *exp;
1337
1338         exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash);
1339
1340         RETURN(&exp->exp_client_uuid);
1341 }
1342
1343 /*
1344  * NOTE: It is impossible to find an export that is in failed
1345  *       state with this function
1346  */
1347 static int
1348 uuid_compare(void *key, struct hlist_node *hnode)
1349 {
1350         struct obd_export *exp;
1351
1352         LASSERT(key);
1353         exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash);
1354
1355         RETURN(obd_uuid_equals((struct obd_uuid *)key,&exp->exp_client_uuid) &&
1356                !exp->exp_failed);
1357 }
1358
1359 static void *
1360 uuid_export_get(struct hlist_node *hnode)
1361 {
1362         struct obd_export *exp;
1363
1364         exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash);
1365         class_export_get(exp);
1366
1367         RETURN(exp);
1368 }
1369
1370 static void *
1371 uuid_export_put(struct hlist_node *hnode)
1372 {
1373         struct obd_export *exp;
1374
1375         exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash);
1376         class_export_put(exp);
1377
1378         RETURN(exp);
1379 }
1380
1381 static lustre_hash_ops_t uuid_hash_ops = {
1382         .lh_hash    = uuid_hash,
1383         .lh_key     = uuid_key,
1384         .lh_compare = uuid_compare,
1385         .lh_get     = uuid_export_get,
1386         .lh_put     = uuid_export_put,
1387 };
1388
1389
1390 /*
1391  * nid<->export hash operations
1392  */
1393
1394 static unsigned
1395 nid_hash(lustre_hash_t *lh,  void *key, unsigned mask)
1396 {
1397         return lh_djb2_hash(key, sizeof(lnet_nid_t), mask);
1398 }
1399
1400 static void *
1401 nid_key(struct hlist_node *hnode)
1402 {
1403         struct obd_export *exp;
1404
1405         exp = hlist_entry(hnode, struct obd_export, exp_nid_hash);
1406
1407         RETURN(&exp->exp_connection->c_peer.nid);
1408 }
1409
1410 /*
1411  * NOTE: It is impossible to find an export that is in failed
1412  *       state with this function
1413  */
1414 static int
1415 nid_compare(void *key, struct hlist_node *hnode)
1416 {
1417         struct obd_export *exp;
1418
1419         LASSERT(key);
1420         exp = hlist_entry(hnode, struct obd_export, exp_nid_hash);
1421
1422         RETURN(exp->exp_connection->c_peer.nid == *(lnet_nid_t *)key &&
1423                !exp->exp_failed);
1424 }
1425
1426 static void *
1427 nid_export_get(struct hlist_node *hnode)
1428 {
1429         struct obd_export *exp;
1430
1431         exp = hlist_entry(hnode, struct obd_export, exp_nid_hash);
1432         class_export_get(exp);
1433
1434         RETURN(exp);
1435 }
1436
1437 static void *
1438 nid_export_put(struct hlist_node *hnode)
1439 {
1440         struct obd_export *exp;
1441
1442         exp = hlist_entry(hnode, struct obd_export, exp_nid_hash);
1443         class_export_put(exp);
1444
1445         RETURN(exp);
1446 }
1447
1448 static lustre_hash_ops_t nid_hash_ops = {
1449         .lh_hash    = nid_hash,
1450         .lh_key     = nid_key,
1451         .lh_compare = nid_compare,
1452         .lh_get     = nid_export_get,
1453         .lh_put     = nid_export_put,
1454 };
1455
1456
1457 /*
1458  * nid<->nidstats hash operations
1459  */
1460
1461 static void *
1462 nidstats_key(struct hlist_node *hnode)
1463 {
1464         struct nid_stat *ns;
1465
1466         ns = hlist_entry(hnode, struct nid_stat, nid_hash);
1467
1468         RETURN(&ns->nid);
1469 }
1470
1471 static int
1472 nidstats_compare(void *key, struct hlist_node *hnode)
1473 {
1474         RETURN(*(lnet_nid_t *)nidstats_key(hnode) == *(lnet_nid_t *)key);
1475 }
1476
1477 static void *
1478 nidstats_get(struct hlist_node *hnode)
1479 {
1480         struct nid_stat *ns;
1481
1482         ns = hlist_entry(hnode, struct nid_stat, nid_hash);
1483         ns->nid_exp_ref_count++;
1484
1485         RETURN(ns);
1486 }
1487
1488 static void *
1489 nidstats_put(struct hlist_node *hnode)
1490 {
1491         struct nid_stat *ns;
1492
1493         ns = hlist_entry(hnode, struct nid_stat, nid_hash);
1494         ns->nid_exp_ref_count--;
1495
1496         RETURN(ns);
1497 }
1498
1499 static lustre_hash_ops_t nid_stat_hash_ops = {
1500         .lh_hash    = nid_hash,
1501         .lh_key     = nidstats_key,
1502         .lh_compare = nidstats_compare,
1503         .lh_get     = nidstats_get,
1504         .lh_put     = nidstats_put,
1505 };