Whamcloud - gitweb
f5422b0a0602872e11047f22a570013e4bdd836c
[fs/lustre-release.git] / lustre / cmm / cmm_device.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_device.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42 /**
43  * \addtogroup cmm
44  * @{
45  */
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/module.h>
50
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lprocfs_status.h>
54 #include <lustre_ver.h>
55 #include "cmm_internal.h"
56 #include "mdc_internal.h"
57 #ifdef HAVE_QUOTA_SUPPORT
58 # include <lustre_quota.h>
59 #endif
60
61 struct obd_ops cmm_obd_device_ops = {
62         .o_owner           = THIS_MODULE
63 };
64
65 static const struct lu_device_operations cmm_lu_ops;
66
67 static inline int lu_device_is_cmm(struct lu_device *d)
68 {
69         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
70 }
71
72 int cmm_root_get(const struct lu_env *env, struct md_device *md,
73                  struct lu_fid *fid)
74 {
75         struct cmm_device *cmm_dev = md2cmm_dev(md);
76         /* valid only on master MDS */
77         if (cmm_dev->cmm_local_num == 0)
78                 return cmm_child_ops(cmm_dev)->mdo_root_get(env,
79                                      cmm_dev->cmm_child, fid);
80         else
81                 return -EINVAL;
82 }
83
84 static int cmm_statfs(const struct lu_env *env, struct md_device *md,
85                       struct obd_statfs *sfs)
86 {
87         struct cmm_device *cmm_dev = md2cmm_dev(md);
88         int rc;
89
90         ENTRY;
91         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
92                                                 cmm_dev->cmm_child, sfs);
93         RETURN (rc);
94 }
95
96 static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
97                            int *md_size, int *cookie_size)
98 {
99         struct cmm_device *cmm_dev = md2cmm_dev(md);
100         int rc;
101         ENTRY;
102         rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
103                                                      md_size, cookie_size);
104         RETURN(rc);
105 }
106
107 static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
108                               int mode , unsigned long timeout, __u32 alg,
109                               struct lustre_capa_key *keys)
110 {
111         struct cmm_device *cmm_dev = md2cmm_dev(md);
112         int rc;
113         ENTRY;
114         LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
115         rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
116                                                         mode, timeout, alg,
117                                                         keys);
118         RETURN(rc);
119 }
120
121 static int cmm_update_capa_key(const struct lu_env *env,
122                                struct md_device *md,
123                                struct lustre_capa_key *key)
124 {
125         struct cmm_device *cmm_dev = md2cmm_dev(md);
126         int rc;
127         ENTRY;
128         rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
129                                                          cmm_dev->cmm_child,
130                                                          key);
131         RETURN(rc);
132 }
133
134 static int cmm_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
135                              int idx, void **h)
136 {
137         struct cmm_device *cmm_dev = md2cmm_dev(m);
138         int rc;
139         ENTRY;
140
141         rc = cmm_child_ops(cmm_dev)->mdo_llog_ctxt_get(env, cmm_dev->cmm_child,
142                                                        idx, h);
143         RETURN(rc);
144 }
145
146 #ifdef HAVE_QUOTA_SUPPORT
147 /**
148  * \name Quota functions
149  * @{
150  */
151 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
152 {
153         struct cmm_device *cmm_dev = md2cmm_dev(m);
154         int rc;
155         ENTRY;
156
157         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_notify(env,
158                                                           cmm_dev->cmm_child);
159         RETURN(rc);
160 }
161
162 static int cmm_quota_setup(const struct lu_env *env, struct md_device *m,
163                            void *data)
164 {
165         struct cmm_device *cmm_dev = md2cmm_dev(m);
166         int rc;
167         ENTRY;
168
169         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setup(env,
170                                                          cmm_dev->cmm_child,
171                                                          data);
172         RETURN(rc);
173 }
174
175 static int cmm_quota_cleanup(const struct lu_env *env, struct md_device *m)
176 {
177         struct cmm_device *cmm_dev = md2cmm_dev(m);
178         int rc;
179         ENTRY;
180
181         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_cleanup(env,
182                                                            cmm_dev->cmm_child);
183         RETURN(rc);
184 }
185
186 static int cmm_quota_recovery(const struct lu_env *env, struct md_device *m)
187 {
188         struct cmm_device *cmm_dev = md2cmm_dev(m);
189         int rc;
190         ENTRY;
191
192         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_recovery(env,
193                                                             cmm_dev->cmm_child);
194         RETURN(rc);
195 }
196
197 static int cmm_quota_check(const struct lu_env *env, struct md_device *m,
198                            __u32 type)
199 {
200         struct cmm_device *cmm_dev = md2cmm_dev(m);
201         int rc;
202         ENTRY;
203
204         /* disable quota for CMD case temporary. */
205         if (cmm_dev->cmm_tgt_count)
206                 RETURN(-EOPNOTSUPP);
207
208         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_check(env,
209                                                          cmm_dev->cmm_child,
210                                                          type);
211         RETURN(rc);
212 }
213
214 static int cmm_quota_on(const struct lu_env *env, struct md_device *m,
215                         __u32 type)
216 {
217         struct cmm_device *cmm_dev = md2cmm_dev(m);
218         int rc;
219         ENTRY;
220
221         /* disable quota for CMD case temporary. */
222         if (cmm_dev->cmm_tgt_count)
223                 RETURN(-EOPNOTSUPP);
224
225         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_on(env,
226                                                       cmm_dev->cmm_child,
227                                                       type);
228         RETURN(rc);
229 }
230
231 static int cmm_quota_off(const struct lu_env *env, struct md_device *m,
232                          __u32 type)
233 {
234         struct cmm_device *cmm_dev = md2cmm_dev(m);
235         int rc;
236         ENTRY;
237
238         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_off(env,
239                                                        cmm_dev->cmm_child,
240                                                        type);
241         RETURN(rc);
242 }
243
244 static int cmm_quota_setinfo(const struct lu_env *env, struct md_device *m,
245                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
246 {
247         struct cmm_device *cmm_dev = md2cmm_dev(m);
248         int rc;
249         ENTRY;
250
251         /* disable quota for CMD case temporary. */
252         if (cmm_dev->cmm_tgt_count)
253                 RETURN(-EOPNOTSUPP);
254
255         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setinfo(env,
256                                                            cmm_dev->cmm_child,
257                                                            type, id, dqinfo);
258         RETURN(rc);
259 }
260
261 static int cmm_quota_getinfo(const struct lu_env *env,
262                              const struct md_device *m,
263                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
264 {
265         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
266         int rc;
267         ENTRY;
268
269         /* disable quota for CMD case temporary. */
270         if (cmm_dev->cmm_tgt_count)
271                 RETURN(-EOPNOTSUPP);
272
273         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getinfo(env,
274                                                            cmm_dev->cmm_child,
275                                                            type, id, dqinfo);
276         RETURN(rc);
277 }
278
279 static int cmm_quota_setquota(const struct lu_env *env, struct md_device *m,
280                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
281 {
282         struct cmm_device *cmm_dev = md2cmm_dev(m);
283         int rc;
284         ENTRY;
285
286         /* disable quota for CMD case temporary. */
287         if (cmm_dev->cmm_tgt_count)
288                 RETURN(-EOPNOTSUPP);
289
290         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setquota(env,
291                                                             cmm_dev->cmm_child,
292                                                             type, id, dqblk);
293         RETURN(rc);
294 }
295
296 static int cmm_quota_getquota(const struct lu_env *env,
297                               const struct md_device *m,
298                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
299 {
300         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
301         int rc;
302         ENTRY;
303
304         /* disable quota for CMD case temporary. */
305         if (cmm_dev->cmm_tgt_count)
306                 RETURN(-EOPNOTSUPP);
307
308         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getquota(env,
309                                                             cmm_dev->cmm_child,
310                                                             type, id, dqblk);
311         RETURN(rc);
312 }
313
314 static int cmm_quota_getoinfo(const struct lu_env *env,
315                               const struct md_device *m,
316                               __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
317 {
318         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
319         int rc;
320         ENTRY;
321
322         /* disable quota for CMD case temporary. */
323         if (cmm_dev->cmm_tgt_count)
324                 RETURN(-EOPNOTSUPP);
325
326         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoinfo(env,
327                                                             cmm_dev->cmm_child,
328                                                             type, id, dqinfo);
329         RETURN(rc);
330 }
331
332 static int cmm_quota_getoquota(const struct lu_env *env,
333                                const struct md_device *m,
334                                __u32 type, __u32 id, struct obd_dqblk *dqblk)
335 {
336         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
337         int rc;
338         ENTRY;
339
340         /* disable quota for CMD case temporary. */
341         if (cmm_dev->cmm_tgt_count)
342                 RETURN(-EOPNOTSUPP);
343
344         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoquota(env,
345                                                              cmm_dev->cmm_child,
346                                                              type, id, dqblk);
347         RETURN(rc);
348 }
349
350 static int cmm_quota_invalidate(const struct lu_env *env, struct md_device *m,
351                                 __u32 type)
352 {
353         struct cmm_device *cmm_dev = md2cmm_dev(m);
354         int rc;
355         ENTRY;
356
357         /* disable quota for CMD case temporary. */
358         if (cmm_dev->cmm_tgt_count)
359                 RETURN(-EOPNOTSUPP);
360
361         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_invalidate(env,
362                                                               cmm_dev->cmm_child,
363                                                               type);
364         RETURN(rc);
365 }
366
367 static int cmm_quota_finvalidate(const struct lu_env *env, struct md_device *m,
368                                  __u32 type)
369 {
370         struct cmm_device *cmm_dev = md2cmm_dev(m);
371         int rc;
372         ENTRY;
373
374         /* disable quota for CMD case temporary. */
375         if (cmm_dev->cmm_tgt_count)
376                 RETURN(-EOPNOTSUPP);
377
378         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_finvalidate(env,
379                                                                cmm_dev->cmm_child,
380                                                                type);
381         RETURN(rc);
382 }
383 /** @} */
384 #endif
385
386 int cmm_iocontrol(const struct lu_env *env, struct md_device *m,
387                   unsigned int cmd, int len, void *data)
388 {
389         struct md_device *next = md2cmm_dev(m)->cmm_child;
390         int rc;
391
392         ENTRY;
393         rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
394         RETURN(rc);
395 }
396
397
398 static const struct md_device_operations cmm_md_ops = {
399         .mdo_statfs          = cmm_statfs,
400         .mdo_root_get        = cmm_root_get,
401         .mdo_maxsize_get     = cmm_maxsize_get,
402         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
403         .mdo_update_capa_key = cmm_update_capa_key,
404         .mdo_llog_ctxt_get   = cmm_llog_ctxt_get,
405         .mdo_iocontrol       = cmm_iocontrol,
406 #ifdef HAVE_QUOTA_SUPPORT
407         .mdo_quota           = {
408                 .mqo_notify      = cmm_quota_notify,
409                 .mqo_setup       = cmm_quota_setup,
410                 .mqo_cleanup     = cmm_quota_cleanup,
411                 .mqo_recovery    = cmm_quota_recovery,
412                 .mqo_check       = cmm_quota_check,
413                 .mqo_on          = cmm_quota_on,
414                 .mqo_off         = cmm_quota_off,
415                 .mqo_setinfo     = cmm_quota_setinfo,
416                 .mqo_getinfo     = cmm_quota_getinfo,
417                 .mqo_setquota    = cmm_quota_setquota,
418                 .mqo_getquota    = cmm_quota_getquota,
419                 .mqo_getoinfo    = cmm_quota_getoinfo,
420                 .mqo_getoquota   = cmm_quota_getoquota,
421                 .mqo_invalidate  = cmm_quota_invalidate,
422                 .mqo_finvalidate = cmm_quota_finvalidate
423         }
424 #endif
425 };
426
427 extern struct lu_device_type mdc_device_type;
428 /**
429  * Init MDC.
430  */
431 static int cmm_post_init_mdc(const struct lu_env *env,
432                              struct cmm_device *cmm)
433 {
434         int max_mdsize, max_cookiesize, rc;
435         struct mdc_device *mc, *tmp;
436
437         /* get the max mdsize and cookiesize from lower layer */
438         rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
439                              &max_cookiesize);
440         if (rc)
441                 RETURN(rc);
442
443         cfs_spin_lock(&cmm->cmm_tgt_guard);
444         cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
445                                      mc_linkage) {
446                 cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
447         }
448         cfs_spin_unlock(&cmm->cmm_tgt_guard);
449         RETURN(rc);
450 }
451
452 /* --- cmm_lu_operations --- */
453 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
454 static int cmm_add_mdc(const struct lu_env *env,
455                        struct cmm_device *cm, struct lustre_cfg *cfg)
456 {
457         struct lu_device_type *ldt = &mdc_device_type;
458         char *p, *num = lustre_cfg_string(cfg, 2);
459         struct mdc_device *mc, *tmp;
460         struct lu_fld_target target;
461         struct lu_device *ld;
462         struct lu_device *cmm_lu = cmm2lu_dev(cm);
463         mdsno_t mdc_num;
464         struct lu_site *site = cmm2lu_dev(cm)->ld_site;
465         int rc;
466 #ifdef HAVE_QUOTA_SUPPORT
467         int first;
468 #endif
469         ENTRY;
470
471         /* find out that there is no such mdc */
472         LASSERT(num);
473         mdc_num = simple_strtol(num, &p, 10);
474         if (*p) {
475                 CERROR("Invalid index in lustre_cgf, offset 2\n");
476                 RETURN(-EINVAL);
477         }
478
479         cfs_spin_lock(&cm->cmm_tgt_guard);
480         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
481                                      mc_linkage) {
482                 if (mc->mc_num == mdc_num) {
483                         cfs_spin_unlock(&cm->cmm_tgt_guard);
484                         RETURN(-EEXIST);
485                 }
486         }
487         cfs_spin_unlock(&cm->cmm_tgt_guard);
488         ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
489         if (IS_ERR(ld))
490                 RETURN(PTR_ERR(ld));
491
492         ld->ld_site = site;
493
494         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
495         if (rc) {
496                 ldt->ldt_ops->ldto_device_free(env, ld);
497                 RETURN(rc);
498         }
499         /* pass config to the just created MDC */
500         rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
501         if (rc) {
502                 ldt->ldt_ops->ldto_device_fini(env, ld);
503                 ldt->ldt_ops->ldto_device_free(env, ld);
504                 RETURN(rc);
505         }
506
507         cfs_spin_lock(&cm->cmm_tgt_guard);
508         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
509                                      mc_linkage) {
510                 if (mc->mc_num == mdc_num) {
511                         cfs_spin_unlock(&cm->cmm_tgt_guard);
512                         ldt->ldt_ops->ldto_device_fini(env, ld);
513                         ldt->ldt_ops->ldto_device_free(env, ld);
514                         RETURN(-EEXIST);
515                 }
516         }
517         mc = lu2mdc_dev(ld);
518         cfs_list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
519         cm->cmm_tgt_count++;
520 #ifdef HAVE_QUOTA_SUPPORT
521         first = cm->cmm_tgt_count;
522 #endif
523         cfs_spin_unlock(&cm->cmm_tgt_guard);
524
525         lu_device_get(cmm_lu);
526         lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
527
528         target.ft_srv = NULL;
529         target.ft_idx = mc->mc_num;
530         target.ft_exp = mc->mc_desc.cl_exp;
531         fld_client_add_target(cm->cmm_fld, &target);
532
533         if (mc->mc_num == 0) {
534                 /* this is mdt0 -> mc export, fld lookup need this export
535                    to forward fld lookup request. */
536                 LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
537                 lu_site2md(site)->ms_server_fld->lsf_control_exp =
538                                           mc->mc_desc.cl_exp;
539         }
540 #ifdef HAVE_QUOTA_SUPPORT
541         /* XXX: Disable quota for CMD case temporary. */
542         if (first == 1) {
543                 CWARN("Disable quota for CMD case temporary!\n");
544                 cmm_child_ops(cm)->mdo_quota.mqo_off(env, cm->cmm_child, UGQUOTA);
545         }
546 #endif
547         /* Set max md size for the mdc. */
548         rc = cmm_post_init_mdc(env, cm);
549         RETURN(rc);
550 }
551
552 static void cmm_device_shutdown(const struct lu_env *env,
553                                 struct cmm_device *cm,
554                                 struct lustre_cfg *cfg)
555 {
556         struct mdc_device *mc, *tmp;
557         ENTRY;
558
559         /* Remove local target from FLD. */
560         fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
561
562         /* Finish all mdc devices. */
563         cfs_spin_lock(&cm->cmm_tgt_guard);
564         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
565                 struct lu_device *ld_m = mdc2lu_dev(mc);
566                 fld_client_del_target(cm->cmm_fld, mc->mc_num);
567                 ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
568         }
569         cfs_spin_unlock(&cm->cmm_tgt_guard);
570
571         /* remove upcall device*/
572         md_upcall_fini(&cm->cmm_md_dev);
573
574         EXIT;
575 }
576
577 static int cmm_device_mount(const struct lu_env *env,
578                             struct cmm_device *m, struct lustre_cfg *cfg)
579 {
580         const char *index = lustre_cfg_string(cfg, 2);
581         char *p;
582
583         LASSERT(index != NULL);
584
585         m->cmm_local_num = simple_strtol(index, &p, 10);
586         if (*p) {
587                 CERROR("Invalid index in lustre_cgf\n");
588                 RETURN(-EINVAL);
589         }
590
591         RETURN(0);
592 }
593
594 static int cmm_process_config(const struct lu_env *env,
595                               struct lu_device *d, struct lustre_cfg *cfg)
596 {
597         struct cmm_device *m = lu2cmm_dev(d);
598         struct lu_device *next = md2lu_dev(m->cmm_child);
599         int err;
600         ENTRY;
601
602         switch(cfg->lcfg_command) {
603         case LCFG_ADD_MDC:
604                 /* On first ADD_MDC add also local target. */
605                 if (!(m->cmm_flags & CMM_INITIALIZED)) {
606                         struct lu_site *ls = cmm2lu_dev(m)->ld_site;
607                         struct lu_fld_target target;
608
609                         target.ft_srv = lu_site2md(ls)->ms_server_fld;
610                         target.ft_idx = m->cmm_local_num;
611                         target.ft_exp = NULL;
612
613                         fld_client_add_target(m->cmm_fld, &target);
614                 }
615                 err = cmm_add_mdc(env, m, cfg);
616
617                 /* The first ADD_MDC can be counted as setup is finished. */
618                 if (!(m->cmm_flags & CMM_INITIALIZED))
619                         m->cmm_flags |= CMM_INITIALIZED;
620
621                 break;
622         case LCFG_SETUP:
623         {
624                 /* lower layers should be set up at first */
625                 err = next->ld_ops->ldo_process_config(env, next, cfg);
626                 if (err == 0)
627                         err = cmm_device_mount(env, m, cfg);
628                 break;
629         }
630         case LCFG_CLEANUP:
631         {
632                 cmm_device_shutdown(env, m, cfg);
633         }
634         default:
635                 err = next->ld_ops->ldo_process_config(env, next, cfg);
636         }
637         RETURN(err);
638 }
639
640 static int cmm_recovery_complete(const struct lu_env *env,
641                                  struct lu_device *d)
642 {
643         struct cmm_device *m = lu2cmm_dev(d);
644         struct lu_device *next = md2lu_dev(m->cmm_child);
645         int rc;
646         ENTRY;
647         rc = next->ld_ops->ldo_recovery_complete(env, next);
648         RETURN(rc);
649 }
650
651 static int cmm_prepare(const struct lu_env *env,
652                        struct lu_device *pdev,
653                        struct lu_device *dev)
654 {
655         struct cmm_device *cmm = lu2cmm_dev(dev);
656         struct lu_device *next = md2lu_dev(cmm->cmm_child);
657         int rc;
658
659         ENTRY;
660         rc = next->ld_ops->ldo_prepare(env, dev, next);
661         RETURN(rc);
662 }
663
664 static const struct lu_device_operations cmm_lu_ops = {
665         .ldo_object_alloc      = cmm_object_alloc,
666         .ldo_process_config    = cmm_process_config,
667         .ldo_recovery_complete = cmm_recovery_complete,
668         .ldo_prepare           = cmm_prepare,
669 };
670
671 /* --- lu_device_type operations --- */
672 int cmm_upcall(const struct lu_env *env, struct md_device *md,
673                enum md_upcall_event ev, void *data)
674 {
675         int rc;
676         ENTRY;
677
678         switch (ev) {
679                 case MD_LOV_SYNC:
680                         rc = cmm_post_init_mdc(env, md2cmm_dev(md));
681                         if (rc)
682                                 CERROR("can not init md size %d\n", rc);
683                         /* fall through */
684                 default:
685                         rc = md_do_upcall(env, md, ev, data);
686         }
687         RETURN(rc);
688 }
689
690 static struct lu_device *cmm_device_free(const struct lu_env *env,
691                                          struct lu_device *d)
692 {
693         struct cmm_device *m = lu2cmm_dev(d);
694         struct lu_device  *next = md2lu_dev(m->cmm_child);
695         ENTRY;
696
697         LASSERT(m->cmm_tgt_count == 0);
698         LASSERT(cfs_list_empty(&m->cmm_targets));
699         if (m->cmm_fld != NULL) {
700                 OBD_FREE_PTR(m->cmm_fld);
701                 m->cmm_fld = NULL;
702         }
703         md_device_fini(&m->cmm_md_dev);
704         OBD_FREE_PTR(m);
705         RETURN(next);
706 }
707
708 static struct lu_device *cmm_device_alloc(const struct lu_env *env,
709                                           struct lu_device_type *t,
710                                           struct lustre_cfg *cfg)
711 {
712         struct lu_device  *l;
713         struct cmm_device *m;
714         ENTRY;
715
716         OBD_ALLOC_PTR(m);
717         if (m == NULL) {
718                 l = ERR_PTR(-ENOMEM);
719         } else {
720                 md_device_init(&m->cmm_md_dev, t);
721                 m->cmm_md_dev.md_ops = &cmm_md_ops;
722                 md_upcall_init(&m->cmm_md_dev, cmm_upcall);
723                 l = cmm2lu_dev(m);
724                 l->ld_ops = &cmm_lu_ops;
725
726                 OBD_ALLOC_PTR(m->cmm_fld);
727                 if (!m->cmm_fld) {
728                         cmm_device_free(env, l);
729                         l = ERR_PTR(-ENOMEM);
730                 }
731         }
732         RETURN(l);
733 }
734
735 /* context key constructor/destructor: cmm_key_init, cmm_key_fini */
736 LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
737
738 /* context key: cmm_thread_key */
739 LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
740
741 struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
742 {
743         struct cmm_thread_info *info;
744
745         info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
746         LASSERT(info != NULL);
747         return info;
748 }
749
750 /* type constructor/destructor: cmm_type_init/cmm_type_fini */
751 LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
752
753 /* 
754  * Kludge code : it should be moved mdc_device.c if mdc_(mds)_device
755  * is really stacked.
756  */
757 static int __cmm_type_init(struct lu_device_type *t)
758 {
759         int rc;
760         rc = lu_device_type_init(&mdc_device_type);
761         if (rc == 0) {
762                 rc = cmm_type_init(t);
763                 if (rc)
764                         lu_device_type_fini(&mdc_device_type);
765         }
766         return rc;
767 }
768
769 static void __cmm_type_fini(struct lu_device_type *t)
770 {
771         lu_device_type_fini(&mdc_device_type);
772         cmm_type_fini(t);
773 }
774
775 static void __cmm_type_start(struct lu_device_type *t)
776 {
777         mdc_device_type.ldt_ops->ldto_start(&mdc_device_type);
778         cmm_type_start(t);
779 }
780
781 static void __cmm_type_stop(struct lu_device_type *t)
782 {
783         mdc_device_type.ldt_ops->ldto_stop(&mdc_device_type);
784         cmm_type_stop(t);
785 }
786
787 static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
788                            const char *name, struct lu_device *next)
789 {
790         struct cmm_device *m = lu2cmm_dev(d);
791         struct lu_site *ls;
792         int err = 0;
793         ENTRY;
794
795         cfs_spin_lock_init(&m->cmm_tgt_guard);
796         CFS_INIT_LIST_HEAD(&m->cmm_targets);
797         m->cmm_tgt_count = 0;
798         m->cmm_child = lu2md_dev(next);
799
800         err = fld_client_init(m->cmm_fld, name,
801                               LUSTRE_CLI_FLD_HASH_DHT);
802         if (err) {
803                 CERROR("Can't init FLD, err %d\n", err);
804                 RETURN(err);
805         }
806
807         /* Assign site's fld client ref, needed for asserts in osd. */
808         ls = cmm2lu_dev(m)->ld_site;
809         lu_site2md(ls)->ms_client_fld = m->cmm_fld;
810         err = cmm_procfs_init(m, name);
811
812         RETURN(err);
813 }
814
815 static struct lu_device *cmm_device_fini(const struct lu_env *env,
816                                          struct lu_device *ld)
817 {
818         struct cmm_device *cm = lu2cmm_dev(ld);
819         struct mdc_device *mc, *tmp;
820         struct lu_site *ls;
821         ENTRY;
822
823         /* Finish all mdc devices */
824         cfs_spin_lock(&cm->cmm_tgt_guard);
825         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
826                 struct lu_device *ld_m = mdc2lu_dev(mc);
827                 struct lu_device *ld_c = cmm2lu_dev(cm);
828
829                 cfs_list_del_init(&mc->mc_linkage);
830                 lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
831                 lu_device_put(ld_c);
832                 ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
833                 ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
834                 cm->cmm_tgt_count--;
835         }
836         cfs_spin_unlock(&cm->cmm_tgt_guard);
837
838         fld_client_proc_fini(cm->cmm_fld);
839         fld_client_fini(cm->cmm_fld);
840         ls = cmm2lu_dev(cm)->ld_site;
841         lu_site2md(ls)->ms_client_fld = NULL;
842         cmm_procfs_fini(cm);
843
844         RETURN (md2lu_dev(cm->cmm_child));
845 }
846
847 static struct lu_device_type_operations cmm_device_type_ops = {
848         .ldto_init = __cmm_type_init,
849         .ldto_fini = __cmm_type_fini,
850
851         .ldto_start = __cmm_type_start,
852         .ldto_stop  = __cmm_type_stop,
853
854         .ldto_device_alloc = cmm_device_alloc,
855         .ldto_device_free  = cmm_device_free,
856
857         .ldto_device_init = cmm_device_init,
858         .ldto_device_fini = cmm_device_fini
859 };
860
861 static struct lu_device_type cmm_device_type = {
862         .ldt_tags     = LU_DEVICE_MD,
863         .ldt_name     = LUSTRE_CMM_NAME,
864         .ldt_ops      = &cmm_device_type_ops,
865         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
866 };
867
868 struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
869         { 0 }
870 };
871
872 struct lprocfs_vars lprocfs_cmm_module_vars[] = {
873         { 0 }
874 };
875
876 static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
877 {
878     lvars->module_vars  = lprocfs_cmm_module_vars;
879     lvars->obd_vars     = lprocfs_cmm_obd_vars;
880 }
881 /** @} */
882
883 static int __init cmm_mod_init(void)
884 {
885         struct lprocfs_static_vars lvars;
886
887         lprocfs_cmm_init_vars(&lvars);
888         return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
889                                    LUSTRE_CMM_NAME, &cmm_device_type);
890 }
891
892 static void __exit cmm_mod_exit(void)
893 {
894         class_unregister_type(LUSTRE_CMM_NAME);
895 }
896
897 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
898 MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
899 MODULE_LICENSE("GPL");
900
901 cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);