Whamcloud - gitweb
LU-1625 test: reduce test duration for nfs mode
[fs/lustre-release.git] / lustre / cmm / cmm_device.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_device.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42 /**
43  * \addtogroup cmm
44  * @{
45  */
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/module.h>
50
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lprocfs_status.h>
54 #include <lustre_ver.h>
55 #include "cmm_internal.h"
56 #include "mdc_internal.h"
57 #ifdef HAVE_QUOTA_SUPPORT
58 # include <lustre_quota.h>
59 #endif
60
61 struct obd_ops cmm_obd_device_ops = {
62         .o_owner           = THIS_MODULE
63 };
64
65 static const struct lu_device_operations cmm_lu_ops;
66
67 static inline int lu_device_is_cmm(struct lu_device *d)
68 {
69         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
70 }
71
72 int cmm_root_get(const struct lu_env *env, struct md_device *md,
73                  struct lu_fid *fid)
74 {
75         struct cmm_device *cmm_dev = md2cmm_dev(md);
76         /* valid only on master MDS */
77         if (cmm_dev->cmm_local_num == 0)
78                 return cmm_child_ops(cmm_dev)->mdo_root_get(env,
79                                      cmm_dev->cmm_child, fid);
80         else
81                 return -EINVAL;
82 }
83
84 static int cmm_statfs(const struct lu_env *env, struct md_device *md,
85                       struct obd_statfs *sfs)
86 {
87         struct cmm_device *cmm_dev = md2cmm_dev(md);
88         int rc;
89
90         ENTRY;
91         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
92                                                 cmm_dev->cmm_child, sfs);
93         RETURN (rc);
94 }
95
96 static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
97                            int *md_size, int *cookie_size)
98 {
99         struct cmm_device *cmm_dev = md2cmm_dev(md);
100         int rc;
101         ENTRY;
102         rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
103                                                      md_size, cookie_size);
104         RETURN(rc);
105 }
106
107 static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
108                               int mode , unsigned long timeout, __u32 alg,
109                               struct lustre_capa_key *keys)
110 {
111         struct cmm_device *cmm_dev = md2cmm_dev(md);
112         int rc;
113         ENTRY;
114         LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
115         rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
116                                                         mode, timeout, alg,
117                                                         keys);
118         RETURN(rc);
119 }
120
121 static int cmm_update_capa_key(const struct lu_env *env,
122                                struct md_device *md,
123                                struct lustre_capa_key *key)
124 {
125         struct cmm_device *cmm_dev = md2cmm_dev(md);
126         int rc;
127         ENTRY;
128         rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
129                                                          cmm_dev->cmm_child,
130                                                          key);
131         RETURN(rc);
132 }
133
134 static int cmm_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
135                              int idx, void **h)
136 {
137         struct cmm_device *cmm_dev = md2cmm_dev(m);
138         int rc;
139         ENTRY;
140
141         rc = cmm_child_ops(cmm_dev)->mdo_llog_ctxt_get(env, cmm_dev->cmm_child,
142                                                        idx, h);
143         RETURN(rc);
144 }
145
146 #ifdef HAVE_QUOTA_SUPPORT
147 /**
148  * \name Quota functions
149  * @{
150  */
151 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
152 {
153         struct cmm_device *cmm_dev = md2cmm_dev(m);
154         int rc;
155         ENTRY;
156
157         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_notify(env,
158                                                           cmm_dev->cmm_child);
159         RETURN(rc);
160 }
161
162 static int cmm_quota_setup(const struct lu_env *env, struct md_device *m,
163                            void *data)
164 {
165         struct cmm_device *cmm_dev = md2cmm_dev(m);
166         int rc;
167         ENTRY;
168
169         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setup(env,
170                                                          cmm_dev->cmm_child,
171                                                          data);
172         RETURN(rc);
173 }
174
175 static int cmm_quota_cleanup(const struct lu_env *env, struct md_device *m)
176 {
177         struct cmm_device *cmm_dev = md2cmm_dev(m);
178         int rc;
179         ENTRY;
180
181         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_cleanup(env,
182                                                            cmm_dev->cmm_child);
183         RETURN(rc);
184 }
185
186 static int cmm_quota_recovery(const struct lu_env *env, struct md_device *m)
187 {
188         struct cmm_device *cmm_dev = md2cmm_dev(m);
189         int rc;
190         ENTRY;
191
192         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_recovery(env,
193                                                             cmm_dev->cmm_child);
194         RETURN(rc);
195 }
196
197 static int cmm_quota_check(const struct lu_env *env, struct md_device *m,
198                            __u32 type)
199 {
200         struct cmm_device *cmm_dev = md2cmm_dev(m);
201         int rc;
202         ENTRY;
203
204         /* disable quota for CMD case temporary. */
205         if (cmm_dev->cmm_tgt_count)
206                 RETURN(-EOPNOTSUPP);
207
208         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_check(env,
209                                                          cmm_dev->cmm_child,
210                                                          type);
211         RETURN(rc);
212 }
213
214 static int cmm_quota_on(const struct lu_env *env, struct md_device *m,
215                         __u32 type)
216 {
217         struct cmm_device *cmm_dev = md2cmm_dev(m);
218         int rc;
219         ENTRY;
220
221         /* disable quota for CMD case temporary. */
222         if (cmm_dev->cmm_tgt_count)
223                 RETURN(-EOPNOTSUPP);
224
225         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_on(env,
226                                                       cmm_dev->cmm_child,
227                                                       type);
228         RETURN(rc);
229 }
230
231 static int cmm_quota_off(const struct lu_env *env, struct md_device *m,
232                          __u32 type)
233 {
234         struct cmm_device *cmm_dev = md2cmm_dev(m);
235         int rc;
236         ENTRY;
237
238         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_off(env,
239                                                        cmm_dev->cmm_child,
240                                                        type);
241         RETURN(rc);
242 }
243
244 static int cmm_quota_setinfo(const struct lu_env *env, struct md_device *m,
245                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
246 {
247         struct cmm_device *cmm_dev = md2cmm_dev(m);
248         int rc;
249         ENTRY;
250
251         /* disable quota for CMD case temporary. */
252         if (cmm_dev->cmm_tgt_count)
253                 RETURN(-EOPNOTSUPP);
254
255         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setinfo(env,
256                                                            cmm_dev->cmm_child,
257                                                            type, id, dqinfo);
258         RETURN(rc);
259 }
260
261 static int cmm_quota_getinfo(const struct lu_env *env,
262                              const struct md_device *m,
263                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
264 {
265         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
266         int rc;
267         ENTRY;
268
269         /* disable quota for CMD case temporary. */
270         if (cmm_dev->cmm_tgt_count)
271                 RETURN(-EOPNOTSUPP);
272
273         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getinfo(env,
274                                                            cmm_dev->cmm_child,
275                                                            type, id, dqinfo);
276         RETURN(rc);
277 }
278
279 static int cmm_quota_setquota(const struct lu_env *env, struct md_device *m,
280                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
281 {
282         struct cmm_device *cmm_dev = md2cmm_dev(m);
283         int rc;
284         ENTRY;
285
286         /* disable quota for CMD case temporary. */
287         if (cmm_dev->cmm_tgt_count)
288                 RETURN(-EOPNOTSUPP);
289
290         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setquota(env,
291                                                             cmm_dev->cmm_child,
292                                                             type, id, dqblk);
293         RETURN(rc);
294 }
295
296 static int cmm_quota_getquota(const struct lu_env *env,
297                               const struct md_device *m,
298                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
299 {
300         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
301         int rc;
302         ENTRY;
303
304         /* disable quota for CMD case temporary. */
305         if (cmm_dev->cmm_tgt_count)
306                 RETURN(-EOPNOTSUPP);
307
308         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getquota(env,
309                                                             cmm_dev->cmm_child,
310                                                             type, id, dqblk);
311         RETURN(rc);
312 }
313
314 static int cmm_quota_getoinfo(const struct lu_env *env,
315                               const struct md_device *m,
316                               __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
317 {
318         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
319         int rc;
320         ENTRY;
321
322         /* disable quota for CMD case temporary. */
323         if (cmm_dev->cmm_tgt_count)
324                 RETURN(-EOPNOTSUPP);
325
326         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoinfo(env,
327                                                             cmm_dev->cmm_child,
328                                                             type, id, dqinfo);
329         RETURN(rc);
330 }
331
332 static int cmm_quota_getoquota(const struct lu_env *env,
333                                const struct md_device *m,
334                                __u32 type, __u32 id, struct obd_dqblk *dqblk)
335 {
336         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
337         int rc;
338         ENTRY;
339
340         /* disable quota for CMD case temporary. */
341         if (cmm_dev->cmm_tgt_count)
342                 RETURN(-EOPNOTSUPP);
343
344         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoquota(env,
345                                                              cmm_dev->cmm_child,
346                                                              type, id, dqblk);
347         RETURN(rc);
348 }
349
350 static int cmm_quota_invalidate(const struct lu_env *env, struct md_device *m,
351                                 __u32 type)
352 {
353         struct cmm_device *cmm_dev = md2cmm_dev(m);
354         int rc;
355         ENTRY;
356
357         /* disable quota for CMD case temporary. */
358         if (cmm_dev->cmm_tgt_count)
359                 RETURN(-EOPNOTSUPP);
360
361         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_invalidate(env,
362                                                               cmm_dev->cmm_child,
363                                                               type);
364         RETURN(rc);
365 }
366
367 static int cmm_quota_finvalidate(const struct lu_env *env, struct md_device *m,
368                                  __u32 type)
369 {
370         struct cmm_device *cmm_dev = md2cmm_dev(m);
371         int rc;
372         ENTRY;
373
374         /* disable quota for CMD case temporary. */
375         if (cmm_dev->cmm_tgt_count)
376                 RETURN(-EOPNOTSUPP);
377
378         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_finvalidate(env,
379                                                                cmm_dev->cmm_child,
380                                                                type);
381         RETURN(rc);
382 }
383 /** @} */
384 #endif
385
386 int cmm_iocontrol(const struct lu_env *env, struct md_device *m,
387                   unsigned int cmd, int len, void *data)
388 {
389         struct md_device *next = md2cmm_dev(m)->cmm_child;
390         int rc;
391
392         ENTRY;
393         rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
394         RETURN(rc);
395 }
396
397
398 static const struct md_device_operations cmm_md_ops = {
399         .mdo_statfs          = cmm_statfs,
400         .mdo_root_get        = cmm_root_get,
401         .mdo_maxsize_get     = cmm_maxsize_get,
402         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
403         .mdo_update_capa_key = cmm_update_capa_key,
404         .mdo_llog_ctxt_get   = cmm_llog_ctxt_get,
405         .mdo_iocontrol       = cmm_iocontrol,
406 #ifdef HAVE_QUOTA_SUPPORT
407         .mdo_quota           = {
408                 .mqo_notify      = cmm_quota_notify,
409                 .mqo_setup       = cmm_quota_setup,
410                 .mqo_cleanup     = cmm_quota_cleanup,
411                 .mqo_recovery    = cmm_quota_recovery,
412                 .mqo_check       = cmm_quota_check,
413                 .mqo_on          = cmm_quota_on,
414                 .mqo_off         = cmm_quota_off,
415                 .mqo_setinfo     = cmm_quota_setinfo,
416                 .mqo_getinfo     = cmm_quota_getinfo,
417                 .mqo_setquota    = cmm_quota_setquota,
418                 .mqo_getquota    = cmm_quota_getquota,
419                 .mqo_getoinfo    = cmm_quota_getoinfo,
420                 .mqo_getoquota   = cmm_quota_getoquota,
421                 .mqo_invalidate  = cmm_quota_invalidate,
422                 .mqo_finvalidate = cmm_quota_finvalidate
423         }
424 #endif
425 };
426
427 extern struct lu_device_type mdc_device_type;
428 /**
429  * Init MDC.
430  */
431 static int cmm_post_init_mdc(const struct lu_env *env,
432                              struct cmm_device *cmm)
433 {
434         int max_mdsize, max_cookiesize, rc;
435         struct mdc_device *mc, *tmp;
436
437         /* get the max mdsize and cookiesize from lower layer */
438         rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
439                              &max_cookiesize);
440         if (rc)
441                 RETURN(rc);
442
443         cfs_spin_lock(&cmm->cmm_tgt_guard);
444         cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
445                                      mc_linkage) {
446                 cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
447         }
448         cfs_spin_unlock(&cmm->cmm_tgt_guard);
449         RETURN(rc);
450 }
451
452 /* --- cmm_lu_operations --- */
453 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
454 static int cmm_add_mdc(const struct lu_env *env,
455                        struct cmm_device *cm, struct lustre_cfg *cfg)
456 {
457         struct lu_device_type *ldt = &mdc_device_type;
458         char *p, *num = lustre_cfg_string(cfg, 2);
459         struct mdc_device *mc, *tmp;
460         struct lu_fld_target target;
461         struct lu_device *ld;
462         struct lu_device *cmm_lu = cmm2lu_dev(cm);
463         mdsno_t mdc_num;
464         struct lu_site *site = cmm2lu_dev(cm)->ld_site;
465         int rc;
466 #ifdef HAVE_QUOTA_SUPPORT
467         int first;
468 #endif
469         ENTRY;
470
471         /* find out that there is no such mdc */
472         LASSERT(num);
473         mdc_num = simple_strtol(num, &p, 10);
474         if (*p) {
475                 CERROR("Invalid index in lustre_cgf, offset 2\n");
476                 RETURN(-EINVAL);
477         }
478
479         cfs_spin_lock(&cm->cmm_tgt_guard);
480         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
481                                      mc_linkage) {
482                 if (mc->mc_num == mdc_num) {
483                         cfs_spin_unlock(&cm->cmm_tgt_guard);
484                         RETURN(-EEXIST);
485                 }
486         }
487         cfs_spin_unlock(&cm->cmm_tgt_guard);
488         ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
489         if (IS_ERR(ld))
490                 RETURN(PTR_ERR(ld));
491
492         ld->ld_site = site;
493
494         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
495         if (rc) {
496                 ldt->ldt_ops->ldto_device_free(env, ld);
497                 RETURN(rc);
498         }
499         /* pass config to the just created MDC */
500         rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
501         if (rc) {
502                 ldt->ldt_ops->ldto_device_fini(env, ld);
503                 ldt->ldt_ops->ldto_device_free(env, ld);
504                 RETURN(rc);
505         }
506
507         cfs_spin_lock(&cm->cmm_tgt_guard);
508         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
509                                      mc_linkage) {
510                 if (mc->mc_num == mdc_num) {
511                         cfs_spin_unlock(&cm->cmm_tgt_guard);
512                         ldt->ldt_ops->ldto_device_fini(env, ld);
513                         ldt->ldt_ops->ldto_device_free(env, ld);
514                         RETURN(-EEXIST);
515                 }
516         }
517         mc = lu2mdc_dev(ld);
518         cfs_list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
519         cm->cmm_tgt_count++;
520 #ifdef HAVE_QUOTA_SUPPORT
521         first = cm->cmm_tgt_count;
522 #endif
523         cfs_spin_unlock(&cm->cmm_tgt_guard);
524
525         lu_device_get(cmm_lu);
526         lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
527
528         target.ft_srv = NULL;
529         target.ft_idx = mc->mc_num;
530         target.ft_exp = mc->mc_desc.cl_exp;
531         fld_client_add_target(cm->cmm_fld, &target);
532
533         if (mc->mc_num == 0) {
534                 /* this is mdt0 -> mc export, fld lookup need this export
535                    to forward fld lookup request. */
536                 LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
537                 lu_site2md(site)->ms_server_fld->lsf_control_exp =
538                                           mc->mc_desc.cl_exp;
539         }
540 #ifdef HAVE_QUOTA_SUPPORT
541         /* XXX: Disable quota for CMD case temporary. */
542         if (first == 1) {
543                 CWARN("Disable quota for CMD case temporary!\n");
544                 cmm_child_ops(cm)->mdo_quota.mqo_off(env, cm->cmm_child, UGQUOTA);
545         }
546 #endif
547         /* Set max md size for the mdc. */
548         rc = cmm_post_init_mdc(env, cm);
549         RETURN(rc);
550 }
551
552 static void cmm_device_shutdown(const struct lu_env *env,
553                                 struct cmm_device *cm,
554                                 struct lustre_cfg *cfg)
555 {
556         struct mdc_device *mc, *tmp;
557         ENTRY;
558
559         /* Remove local target from FLD. */
560         fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
561
562         /* Finish all mdc devices. */
563         cfs_spin_lock(&cm->cmm_tgt_guard);
564         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
565                 struct lu_device *ld_m = mdc2lu_dev(mc);
566                 fld_client_del_target(cm->cmm_fld, mc->mc_num);
567                 ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
568         }
569         cfs_spin_unlock(&cm->cmm_tgt_guard);
570
571         /* remove upcall device*/
572         md_upcall_fini(&cm->cmm_md_dev);
573
574         EXIT;
575 }
576
577 static int cmm_device_mount(const struct lu_env *env,
578                             struct cmm_device *m, struct lustre_cfg *cfg)
579 {
580         const char *index = lustre_cfg_string(cfg, 2);
581         char *p;
582
583         LASSERT(index != NULL);
584
585         m->cmm_local_num = simple_strtol(index, &p, 10);
586         if (*p) {
587                 CERROR("Invalid index in lustre_cgf\n");
588                 RETURN(-EINVAL);
589         }
590
591         RETURN(0);
592 }
593
594 static int cmm_process_config(const struct lu_env *env,
595                               struct lu_device *d, struct lustre_cfg *cfg)
596 {
597         struct cmm_device *m = lu2cmm_dev(d);
598         struct lu_device *next = md2lu_dev(m->cmm_child);
599         int err;
600         ENTRY;
601
602         switch(cfg->lcfg_command) {
603         case LCFG_ADD_MDC:
604                 /* On first ADD_MDC add also local target. */
605                 if (!(m->cmm_flags & CMM_INITIALIZED)) {
606                         struct lu_site *ls = cmm2lu_dev(m)->ld_site;
607                         struct lu_fld_target target;
608
609                         target.ft_srv = lu_site2md(ls)->ms_server_fld;
610                         target.ft_idx = m->cmm_local_num;
611                         target.ft_exp = NULL;
612
613                         fld_client_add_target(m->cmm_fld, &target);
614                 }
615                 err = cmm_add_mdc(env, m, cfg);
616
617                 /* The first ADD_MDC can be counted as setup is finished. */
618                 if (!(m->cmm_flags & CMM_INITIALIZED))
619                         m->cmm_flags |= CMM_INITIALIZED;
620
621                 break;
622         case LCFG_SETUP:
623         {
624                 /* lower layers should be set up at first */
625                 err = next->ld_ops->ldo_process_config(env, next, cfg);
626                 if (err == 0)
627                         err = cmm_device_mount(env, m, cfg);
628                 break;
629         }
630         case LCFG_CLEANUP:
631         {
632                 lu_dev_del_linkage(d->ld_site, d);
633                 cmm_device_shutdown(env, m, cfg);
634         }
635         default:
636                 err = next->ld_ops->ldo_process_config(env, next, cfg);
637         }
638         RETURN(err);
639 }
640
641 static int cmm_recovery_complete(const struct lu_env *env,
642                                  struct lu_device *d)
643 {
644         struct cmm_device *m = lu2cmm_dev(d);
645         struct lu_device *next = md2lu_dev(m->cmm_child);
646         int rc;
647         ENTRY;
648         rc = next->ld_ops->ldo_recovery_complete(env, next);
649         RETURN(rc);
650 }
651
652 static int cmm_prepare(const struct lu_env *env,
653                        struct lu_device *pdev,
654                        struct lu_device *dev)
655 {
656         struct cmm_device *cmm = lu2cmm_dev(dev);
657         struct lu_device *next = md2lu_dev(cmm->cmm_child);
658         int rc;
659
660         ENTRY;
661         rc = next->ld_ops->ldo_prepare(env, dev, next);
662         RETURN(rc);
663 }
664
665 static const struct lu_device_operations cmm_lu_ops = {
666         .ldo_object_alloc      = cmm_object_alloc,
667         .ldo_process_config    = cmm_process_config,
668         .ldo_recovery_complete = cmm_recovery_complete,
669         .ldo_prepare           = cmm_prepare,
670 };
671
672 /* --- lu_device_type operations --- */
673 int cmm_upcall(const struct lu_env *env, struct md_device *md,
674                enum md_upcall_event ev, void *data)
675 {
676         int rc;
677         ENTRY;
678
679         switch (ev) {
680                 case MD_LOV_SYNC:
681                         rc = cmm_post_init_mdc(env, md2cmm_dev(md));
682                         if (rc)
683                                 CERROR("can not init md size %d\n", rc);
684                         /* fall through */
685                 default:
686                         rc = md_do_upcall(env, md, ev, data);
687         }
688         RETURN(rc);
689 }
690
691 static struct lu_device *cmm_device_free(const struct lu_env *env,
692                                          struct lu_device *d)
693 {
694         struct cmm_device *m = lu2cmm_dev(d);
695         struct lu_device  *next = md2lu_dev(m->cmm_child);
696         ENTRY;
697
698         LASSERT(m->cmm_tgt_count == 0);
699         LASSERT(cfs_list_empty(&m->cmm_targets));
700         if (m->cmm_fld != NULL) {
701                 OBD_FREE_PTR(m->cmm_fld);
702                 m->cmm_fld = NULL;
703         }
704         md_device_fini(&m->cmm_md_dev);
705         OBD_FREE_PTR(m);
706         RETURN(next);
707 }
708
709 static struct lu_device *cmm_device_alloc(const struct lu_env *env,
710                                           struct lu_device_type *t,
711                                           struct lustre_cfg *cfg)
712 {
713         struct lu_device  *l;
714         struct cmm_device *m;
715         ENTRY;
716
717         OBD_ALLOC_PTR(m);
718         if (m == NULL) {
719                 l = ERR_PTR(-ENOMEM);
720         } else {
721                 md_device_init(&m->cmm_md_dev, t);
722                 m->cmm_md_dev.md_ops = &cmm_md_ops;
723                 md_upcall_init(&m->cmm_md_dev, cmm_upcall);
724                 l = cmm2lu_dev(m);
725                 l->ld_ops = &cmm_lu_ops;
726
727                 OBD_ALLOC_PTR(m->cmm_fld);
728                 if (!m->cmm_fld) {
729                         cmm_device_free(env, l);
730                         l = ERR_PTR(-ENOMEM);
731                 }
732         }
733         RETURN(l);
734 }
735
736 /* context key constructor/destructor: cmm_key_init, cmm_key_fini */
737 LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
738
739 /* context key: cmm_thread_key */
740 LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
741
742 struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
743 {
744         struct cmm_thread_info *info;
745
746         info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
747         LASSERT(info != NULL);
748         return info;
749 }
750
751 /* type constructor/destructor: cmm_type_init/cmm_type_fini */
752 LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
753
754 /* 
755  * Kludge code : it should be moved mdc_device.c if mdc_(mds)_device
756  * is really stacked.
757  */
758 static int __cmm_type_init(struct lu_device_type *t)
759 {
760         int rc;
761         rc = lu_device_type_init(&mdc_device_type);
762         if (rc == 0) {
763                 rc = cmm_type_init(t);
764                 if (rc)
765                         lu_device_type_fini(&mdc_device_type);
766         }
767         return rc;
768 }
769
770 static void __cmm_type_fini(struct lu_device_type *t)
771 {
772         lu_device_type_fini(&mdc_device_type);
773         cmm_type_fini(t);
774 }
775
776 static void __cmm_type_start(struct lu_device_type *t)
777 {
778         mdc_device_type.ldt_ops->ldto_start(&mdc_device_type);
779         cmm_type_start(t);
780 }
781
782 static void __cmm_type_stop(struct lu_device_type *t)
783 {
784         mdc_device_type.ldt_ops->ldto_stop(&mdc_device_type);
785         cmm_type_stop(t);
786 }
787
788 static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
789                            const char *name, struct lu_device *next)
790 {
791         struct cmm_device *m = lu2cmm_dev(d);
792         struct lu_site *ls;
793         int err = 0;
794         ENTRY;
795
796         cfs_spin_lock_init(&m->cmm_tgt_guard);
797         CFS_INIT_LIST_HEAD(&m->cmm_targets);
798         m->cmm_tgt_count = 0;
799         m->cmm_child = lu2md_dev(next);
800
801         err = fld_client_init(m->cmm_fld, name,
802                               LUSTRE_CLI_FLD_HASH_DHT);
803         if (err) {
804                 CERROR("Can't init FLD, err %d\n", err);
805                 RETURN(err);
806         }
807
808         /* Assign site's fld client ref, needed for asserts in osd. */
809         ls = cmm2lu_dev(m)->ld_site;
810         lu_site2md(ls)->ms_client_fld = m->cmm_fld;
811         err = cmm_procfs_init(m, name);
812
813         RETURN(err);
814 }
815
816 static struct lu_device *cmm_device_fini(const struct lu_env *env,
817                                          struct lu_device *ld)
818 {
819         struct cmm_device *cm = lu2cmm_dev(ld);
820         struct mdc_device *mc, *tmp;
821         struct lu_site *ls;
822         ENTRY;
823
824         /* Finish all mdc devices */
825         cfs_spin_lock(&cm->cmm_tgt_guard);
826         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
827                 struct lu_device *ld_m = mdc2lu_dev(mc);
828                 struct lu_device *ld_c = cmm2lu_dev(cm);
829
830                 cfs_list_del_init(&mc->mc_linkage);
831                 lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
832                 lu_device_put(ld_c);
833                 ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
834                 ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
835                 cm->cmm_tgt_count--;
836         }
837         cfs_spin_unlock(&cm->cmm_tgt_guard);
838
839         fld_client_proc_fini(cm->cmm_fld);
840         fld_client_fini(cm->cmm_fld);
841         ls = cmm2lu_dev(cm)->ld_site;
842         lu_site2md(ls)->ms_client_fld = NULL;
843         cmm_procfs_fini(cm);
844
845         RETURN (md2lu_dev(cm->cmm_child));
846 }
847
848 static struct lu_device_type_operations cmm_device_type_ops = {
849         .ldto_init = __cmm_type_init,
850         .ldto_fini = __cmm_type_fini,
851
852         .ldto_start = __cmm_type_start,
853         .ldto_stop  = __cmm_type_stop,
854
855         .ldto_device_alloc = cmm_device_alloc,
856         .ldto_device_free  = cmm_device_free,
857
858         .ldto_device_init = cmm_device_init,
859         .ldto_device_fini = cmm_device_fini
860 };
861
862 static struct lu_device_type cmm_device_type = {
863         .ldt_tags     = LU_DEVICE_MD,
864         .ldt_name     = LUSTRE_CMM_NAME,
865         .ldt_ops      = &cmm_device_type_ops,
866         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
867 };
868
869 struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
870         { 0 }
871 };
872
873 struct lprocfs_vars lprocfs_cmm_module_vars[] = {
874         { 0 }
875 };
876
877 static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
878 {
879     lvars->module_vars  = lprocfs_cmm_module_vars;
880     lvars->obd_vars     = lprocfs_cmm_obd_vars;
881 }
882 /** @} */
883
884 static int __init cmm_mod_init(void)
885 {
886         struct lprocfs_static_vars lvars;
887
888         lprocfs_cmm_init_vars(&lvars);
889         return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
890                                    LUSTRE_CMM_NAME, &cmm_device_type);
891 }
892
893 static void __exit cmm_mod_exit(void)
894 {
895         class_unregister_type(LUSTRE_CMM_NAME);
896 }
897
898 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
899 MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
900 MODULE_LICENSE("GPL");
901
902 cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);