Whamcloud - gitweb
f51049c7babd5c039e9cc4dd1dfd89bb3012bb1a
[fs/lustre-release.git] / lustre / cmm / cmm_device.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_device.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42 /**
43  * \addtogroup cmm
44  * @{
45  */
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52
53 #include <obd.h>
54 #include <obd_class.h>
55 #include <lprocfs_status.h>
56 #include <lustre_ver.h>
57 #include "cmm_internal.h"
58 #include "mdc_internal.h"
59 #ifdef HAVE_QUOTA_SUPPORT
60 # include <lustre_quota.h>
61 #endif
62
63 struct obd_ops cmm_obd_device_ops = {
64         .o_owner           = THIS_MODULE
65 };
66
67 static const struct lu_device_operations cmm_lu_ops;
68
69 static inline int lu_device_is_cmm(struct lu_device *d)
70 {
71         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
72 }
73
74 int cmm_root_get(const struct lu_env *env, struct md_device *md,
75                  struct lu_fid *fid)
76 {
77         struct cmm_device *cmm_dev = md2cmm_dev(md);
78         /* valid only on master MDS */
79         if (cmm_dev->cmm_local_num == 0)
80                 return cmm_child_ops(cmm_dev)->mdo_root_get(env,
81                                      cmm_dev->cmm_child, fid);
82         else
83                 return -EINVAL;
84 }
85
86 static int cmm_statfs(const struct lu_env *env, struct md_device *md,
87                       struct obd_statfs *sfs)
88 {
89         struct cmm_device *cmm_dev = md2cmm_dev(md);
90         int rc;
91
92         ENTRY;
93         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
94                                                 cmm_dev->cmm_child, sfs);
95         RETURN (rc);
96 }
97
98 static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
99                            int *md_size, int *cookie_size)
100 {
101         struct cmm_device *cmm_dev = md2cmm_dev(md);
102         int rc;
103         ENTRY;
104         rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
105                                                      md_size, cookie_size);
106         RETURN(rc);
107 }
108
109 static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
110                               int mode , unsigned long timeout, __u32 alg,
111                               struct lustre_capa_key *keys)
112 {
113         struct cmm_device *cmm_dev = md2cmm_dev(md);
114         int rc;
115         ENTRY;
116         LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
117         rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
118                                                         mode, timeout, alg,
119                                                         keys);
120         RETURN(rc);
121 }
122
123 static int cmm_update_capa_key(const struct lu_env *env,
124                                struct md_device *md,
125                                struct lustre_capa_key *key)
126 {
127         struct cmm_device *cmm_dev = md2cmm_dev(md);
128         int rc;
129         ENTRY;
130         rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
131                                                          cmm_dev->cmm_child,
132                                                          key);
133         RETURN(rc);
134 }
135
136 static int cmm_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
137                              int idx, void **h)
138 {
139         struct cmm_device *cmm_dev = md2cmm_dev(m);
140         int rc;
141         ENTRY;
142
143         rc = cmm_child_ops(cmm_dev)->mdo_llog_ctxt_get(env, cmm_dev->cmm_child,
144                                                        idx, h);
145         RETURN(rc);
146 }
147
148 #ifdef HAVE_QUOTA_SUPPORT
149 /**
150  * \name Quota functions
151  * @{
152  */
153 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
154 {
155         struct cmm_device *cmm_dev = md2cmm_dev(m);
156         int rc;
157         ENTRY;
158
159         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_notify(env,
160                                                           cmm_dev->cmm_child);
161         RETURN(rc);
162 }
163
164 static int cmm_quota_setup(const struct lu_env *env, struct md_device *m,
165                            void *data)
166 {
167         struct cmm_device *cmm_dev = md2cmm_dev(m);
168         int rc;
169         ENTRY;
170
171         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setup(env,
172                                                          cmm_dev->cmm_child,
173                                                          data);
174         RETURN(rc);
175 }
176
177 static int cmm_quota_cleanup(const struct lu_env *env, struct md_device *m)
178 {
179         struct cmm_device *cmm_dev = md2cmm_dev(m);
180         int rc;
181         ENTRY;
182
183         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_cleanup(env,
184                                                            cmm_dev->cmm_child);
185         RETURN(rc);
186 }
187
188 static int cmm_quota_recovery(const struct lu_env *env, struct md_device *m)
189 {
190         struct cmm_device *cmm_dev = md2cmm_dev(m);
191         int rc;
192         ENTRY;
193
194         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_recovery(env,
195                                                             cmm_dev->cmm_child);
196         RETURN(rc);
197 }
198
199 static int cmm_quota_check(const struct lu_env *env, struct md_device *m,
200                            __u32 type)
201 {
202         struct cmm_device *cmm_dev = md2cmm_dev(m);
203         int rc;
204         ENTRY;
205
206         /* disable quota for CMD case temporary. */
207         if (cmm_dev->cmm_tgt_count)
208                 RETURN(-EOPNOTSUPP);
209
210         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_check(env,
211                                                          cmm_dev->cmm_child,
212                                                          type);
213         RETURN(rc);
214 }
215
216 static int cmm_quota_on(const struct lu_env *env, struct md_device *m,
217                         __u32 type)
218 {
219         struct cmm_device *cmm_dev = md2cmm_dev(m);
220         int rc;
221         ENTRY;
222
223         /* disable quota for CMD case temporary. */
224         if (cmm_dev->cmm_tgt_count)
225                 RETURN(-EOPNOTSUPP);
226
227         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_on(env,
228                                                       cmm_dev->cmm_child,
229                                                       type);
230         RETURN(rc);
231 }
232
233 static int cmm_quota_off(const struct lu_env *env, struct md_device *m,
234                          __u32 type)
235 {
236         struct cmm_device *cmm_dev = md2cmm_dev(m);
237         int rc;
238         ENTRY;
239
240         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_off(env,
241                                                        cmm_dev->cmm_child,
242                                                        type);
243         RETURN(rc);
244 }
245
246 static int cmm_quota_setinfo(const struct lu_env *env, struct md_device *m,
247                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
248 {
249         struct cmm_device *cmm_dev = md2cmm_dev(m);
250         int rc;
251         ENTRY;
252
253         /* disable quota for CMD case temporary. */
254         if (cmm_dev->cmm_tgt_count)
255                 RETURN(-EOPNOTSUPP);
256
257         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setinfo(env,
258                                                            cmm_dev->cmm_child,
259                                                            type, id, dqinfo);
260         RETURN(rc);
261 }
262
263 static int cmm_quota_getinfo(const struct lu_env *env,
264                              const struct md_device *m,
265                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
266 {
267         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
268         int rc;
269         ENTRY;
270
271         /* disable quota for CMD case temporary. */
272         if (cmm_dev->cmm_tgt_count)
273                 RETURN(-EOPNOTSUPP);
274
275         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getinfo(env,
276                                                            cmm_dev->cmm_child,
277                                                            type, id, dqinfo);
278         RETURN(rc);
279 }
280
281 static int cmm_quota_setquota(const struct lu_env *env, struct md_device *m,
282                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
283 {
284         struct cmm_device *cmm_dev = md2cmm_dev(m);
285         int rc;
286         ENTRY;
287
288         /* disable quota for CMD case temporary. */
289         if (cmm_dev->cmm_tgt_count)
290                 RETURN(-EOPNOTSUPP);
291
292         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setquota(env,
293                                                             cmm_dev->cmm_child,
294                                                             type, id, dqblk);
295         RETURN(rc);
296 }
297
298 static int cmm_quota_getquota(const struct lu_env *env,
299                               const struct md_device *m,
300                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
301 {
302         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
303         int rc;
304         ENTRY;
305
306         /* disable quota for CMD case temporary. */
307         if (cmm_dev->cmm_tgt_count)
308                 RETURN(-EOPNOTSUPP);
309
310         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getquota(env,
311                                                             cmm_dev->cmm_child,
312                                                             type, id, dqblk);
313         RETURN(rc);
314 }
315
316 static int cmm_quota_getoinfo(const struct lu_env *env,
317                               const struct md_device *m,
318                               __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
319 {
320         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
321         int rc;
322         ENTRY;
323
324         /* disable quota for CMD case temporary. */
325         if (cmm_dev->cmm_tgt_count)
326                 RETURN(-EOPNOTSUPP);
327
328         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoinfo(env,
329                                                             cmm_dev->cmm_child,
330                                                             type, id, dqinfo);
331         RETURN(rc);
332 }
333
334 static int cmm_quota_getoquota(const struct lu_env *env,
335                                const struct md_device *m,
336                                __u32 type, __u32 id, struct obd_dqblk *dqblk)
337 {
338         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
339         int rc;
340         ENTRY;
341
342         /* disable quota for CMD case temporary. */
343         if (cmm_dev->cmm_tgt_count)
344                 RETURN(-EOPNOTSUPP);
345
346         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoquota(env,
347                                                              cmm_dev->cmm_child,
348                                                              type, id, dqblk);
349         RETURN(rc);
350 }
351
352 static int cmm_quota_invalidate(const struct lu_env *env, struct md_device *m,
353                                 __u32 type)
354 {
355         struct cmm_device *cmm_dev = md2cmm_dev(m);
356         int rc;
357         ENTRY;
358
359         /* disable quota for CMD case temporary. */
360         if (cmm_dev->cmm_tgt_count)
361                 RETURN(-EOPNOTSUPP);
362
363         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_invalidate(env,
364                                                               cmm_dev->cmm_child,
365                                                               type);
366         RETURN(rc);
367 }
368
369 static int cmm_quota_finvalidate(const struct lu_env *env, struct md_device *m,
370                                  __u32 type)
371 {
372         struct cmm_device *cmm_dev = md2cmm_dev(m);
373         int rc;
374         ENTRY;
375
376         /* disable quota for CMD case temporary. */
377         if (cmm_dev->cmm_tgt_count)
378                 RETURN(-EOPNOTSUPP);
379
380         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_finvalidate(env,
381                                                                cmm_dev->cmm_child,
382                                                                type);
383         RETURN(rc);
384 }
385 /** @} */
386 #endif
387
388 int cmm_iocontrol(const struct lu_env *env, struct md_device *m,
389                   unsigned int cmd, int len, void *data)
390 {
391         struct md_device *next = md2cmm_dev(m)->cmm_child;
392         int rc;
393
394         ENTRY;
395         rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
396         RETURN(rc);
397 }
398
399
400 static const struct md_device_operations cmm_md_ops = {
401         .mdo_statfs          = cmm_statfs,
402         .mdo_root_get        = cmm_root_get,
403         .mdo_maxsize_get     = cmm_maxsize_get,
404         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
405         .mdo_update_capa_key = cmm_update_capa_key,
406         .mdo_llog_ctxt_get   = cmm_llog_ctxt_get,
407         .mdo_iocontrol       = cmm_iocontrol,
408 #ifdef HAVE_QUOTA_SUPPORT
409         .mdo_quota           = {
410                 .mqo_notify      = cmm_quota_notify,
411                 .mqo_setup       = cmm_quota_setup,
412                 .mqo_cleanup     = cmm_quota_cleanup,
413                 .mqo_recovery    = cmm_quota_recovery,
414                 .mqo_check       = cmm_quota_check,
415                 .mqo_on          = cmm_quota_on,
416                 .mqo_off         = cmm_quota_off,
417                 .mqo_setinfo     = cmm_quota_setinfo,
418                 .mqo_getinfo     = cmm_quota_getinfo,
419                 .mqo_setquota    = cmm_quota_setquota,
420                 .mqo_getquota    = cmm_quota_getquota,
421                 .mqo_getoinfo    = cmm_quota_getoinfo,
422                 .mqo_getoquota   = cmm_quota_getoquota,
423                 .mqo_invalidate  = cmm_quota_invalidate,
424                 .mqo_finvalidate = cmm_quota_finvalidate
425         }
426 #endif
427 };
428
429 extern struct lu_device_type mdc_device_type;
430 /**
431  * Init MDC.
432  */
433 static int cmm_post_init_mdc(const struct lu_env *env,
434                              struct cmm_device *cmm)
435 {
436         int max_mdsize, max_cookiesize, rc;
437         struct mdc_device *mc, *tmp;
438
439         /* get the max mdsize and cookiesize from lower layer */
440         rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
441                              &max_cookiesize);
442         if (rc)
443                 RETURN(rc);
444
445         cfs_spin_lock(&cmm->cmm_tgt_guard);
446         cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
447                                      mc_linkage) {
448                 cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
449         }
450         cfs_spin_unlock(&cmm->cmm_tgt_guard);
451         RETURN(rc);
452 }
453
454 /* --- cmm_lu_operations --- */
455 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
456 static int cmm_add_mdc(const struct lu_env *env,
457                        struct cmm_device *cm, struct lustre_cfg *cfg)
458 {
459         struct lu_device_type *ldt = &mdc_device_type;
460         char *p, *num = lustre_cfg_string(cfg, 2);
461         struct mdc_device *mc, *tmp;
462         struct lu_fld_target target;
463         struct lu_device *ld;
464         struct lu_device *cmm_lu = cmm2lu_dev(cm);
465         mdsno_t mdc_num;
466         struct lu_site *site = cmm2lu_dev(cm)->ld_site;
467         int rc;
468 #ifdef HAVE_QUOTA_SUPPORT
469         int first;
470 #endif
471         ENTRY;
472
473         /* find out that there is no such mdc */
474         LASSERT(num);
475         mdc_num = simple_strtol(num, &p, 10);
476         if (*p) {
477                 CERROR("Invalid index in lustre_cgf, offset 2\n");
478                 RETURN(-EINVAL);
479         }
480
481         cfs_spin_lock(&cm->cmm_tgt_guard);
482         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
483                                      mc_linkage) {
484                 if (mc->mc_num == mdc_num) {
485                         cfs_spin_unlock(&cm->cmm_tgt_guard);
486                         RETURN(-EEXIST);
487                 }
488         }
489         cfs_spin_unlock(&cm->cmm_tgt_guard);
490         ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
491         if (IS_ERR(ld))
492                 RETURN(PTR_ERR(ld));
493
494         ld->ld_site = site;
495
496         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
497         if (rc) {
498                 ldt->ldt_ops->ldto_device_free(env, ld);
499                 RETURN(rc);
500         }
501         /* pass config to the just created MDC */
502         rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
503         if (rc) {
504                 ldt->ldt_ops->ldto_device_fini(env, ld);
505                 ldt->ldt_ops->ldto_device_free(env, ld);
506                 RETURN(rc);
507         }
508
509         cfs_spin_lock(&cm->cmm_tgt_guard);
510         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
511                                      mc_linkage) {
512                 if (mc->mc_num == mdc_num) {
513                         cfs_spin_unlock(&cm->cmm_tgt_guard);
514                         ldt->ldt_ops->ldto_device_fini(env, ld);
515                         ldt->ldt_ops->ldto_device_free(env, ld);
516                         RETURN(-EEXIST);
517                 }
518         }
519         mc = lu2mdc_dev(ld);
520         cfs_list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
521         cm->cmm_tgt_count++;
522 #ifdef HAVE_QUOTA_SUPPORT
523         first = cm->cmm_tgt_count;
524 #endif
525         cfs_spin_unlock(&cm->cmm_tgt_guard);
526
527         lu_device_get(cmm_lu);
528         lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
529
530         target.ft_srv = NULL;
531         target.ft_idx = mc->mc_num;
532         target.ft_exp = mc->mc_desc.cl_exp;
533         fld_client_add_target(cm->cmm_fld, &target);
534
535         if (mc->mc_num == 0) {
536                 /* this is mdt0 -> mc export, fld lookup need this export
537                    to forward fld lookup request. */
538                 LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
539                 lu_site2md(site)->ms_server_fld->lsf_control_exp =
540                                           mc->mc_desc.cl_exp;
541         }
542 #ifdef HAVE_QUOTA_SUPPORT
543         /* XXX: Disable quota for CMD case temporary. */
544         if (first == 1) {
545                 CWARN("Disable quota for CMD case temporary!\n");
546                 cmm_child_ops(cm)->mdo_quota.mqo_off(env, cm->cmm_child, UGQUOTA);
547         }
548 #endif
549         /* Set max md size for the mdc. */
550         rc = cmm_post_init_mdc(env, cm);
551         RETURN(rc);
552 }
553
554 static void cmm_device_shutdown(const struct lu_env *env,
555                                 struct cmm_device *cm,
556                                 struct lustre_cfg *cfg)
557 {
558         struct mdc_device *mc, *tmp;
559         ENTRY;
560
561         /* Remove local target from FLD. */
562         fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
563
564         /* Finish all mdc devices. */
565         cfs_spin_lock(&cm->cmm_tgt_guard);
566         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
567                 struct lu_device *ld_m = mdc2lu_dev(mc);
568                 fld_client_del_target(cm->cmm_fld, mc->mc_num);
569                 ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
570         }
571         cfs_spin_unlock(&cm->cmm_tgt_guard);
572
573         /* remove upcall device*/
574         md_upcall_fini(&cm->cmm_md_dev);
575
576         EXIT;
577 }
578
579 static int cmm_device_mount(const struct lu_env *env,
580                             struct cmm_device *m, struct lustre_cfg *cfg)
581 {
582         const char *index = lustre_cfg_string(cfg, 2);
583         char *p;
584
585         LASSERT(index != NULL);
586
587         m->cmm_local_num = simple_strtol(index, &p, 10);
588         if (*p) {
589                 CERROR("Invalid index in lustre_cgf\n");
590                 RETURN(-EINVAL);
591         }
592
593         RETURN(0);
594 }
595
596 static int cmm_process_config(const struct lu_env *env,
597                               struct lu_device *d, struct lustre_cfg *cfg)
598 {
599         struct cmm_device *m = lu2cmm_dev(d);
600         struct lu_device *next = md2lu_dev(m->cmm_child);
601         int err;
602         ENTRY;
603
604         switch(cfg->lcfg_command) {
605         case LCFG_ADD_MDC:
606                 /* On first ADD_MDC add also local target. */
607                 if (!(m->cmm_flags & CMM_INITIALIZED)) {
608                         struct lu_site *ls = cmm2lu_dev(m)->ld_site;
609                         struct lu_fld_target target;
610
611                         target.ft_srv = lu_site2md(ls)->ms_server_fld;
612                         target.ft_idx = m->cmm_local_num;
613                         target.ft_exp = NULL;
614
615                         fld_client_add_target(m->cmm_fld, &target);
616                 }
617                 err = cmm_add_mdc(env, m, cfg);
618
619                 /* The first ADD_MDC can be counted as setup is finished. */
620                 if (!(m->cmm_flags & CMM_INITIALIZED))
621                         m->cmm_flags |= CMM_INITIALIZED;
622
623                 break;
624         case LCFG_SETUP:
625         {
626                 /* lower layers should be set up at first */
627                 err = next->ld_ops->ldo_process_config(env, next, cfg);
628                 if (err == 0)
629                         err = cmm_device_mount(env, m, cfg);
630                 break;
631         }
632         case LCFG_CLEANUP:
633         {
634                 cmm_device_shutdown(env, m, cfg);
635         }
636         default:
637                 err = next->ld_ops->ldo_process_config(env, next, cfg);
638         }
639         RETURN(err);
640 }
641
642 static int cmm_recovery_complete(const struct lu_env *env,
643                                  struct lu_device *d)
644 {
645         struct cmm_device *m = lu2cmm_dev(d);
646         struct lu_device *next = md2lu_dev(m->cmm_child);
647         int rc;
648         ENTRY;
649         rc = next->ld_ops->ldo_recovery_complete(env, next);
650         RETURN(rc);
651 }
652
653 static int cmm_prepare(const struct lu_env *env,
654                        struct lu_device *pdev,
655                        struct lu_device *dev)
656 {
657         struct cmm_device *cmm = lu2cmm_dev(dev);
658         struct lu_device *next = md2lu_dev(cmm->cmm_child);
659         int rc;
660
661         ENTRY;
662         rc = next->ld_ops->ldo_prepare(env, dev, next);
663         RETURN(rc);
664 }
665
666 static const struct lu_device_operations cmm_lu_ops = {
667         .ldo_object_alloc      = cmm_object_alloc,
668         .ldo_process_config    = cmm_process_config,
669         .ldo_recovery_complete = cmm_recovery_complete,
670         .ldo_prepare           = cmm_prepare,
671 };
672
673 /* --- lu_device_type operations --- */
674 int cmm_upcall(const struct lu_env *env, struct md_device *md,
675                enum md_upcall_event ev, void *data)
676 {
677         int rc;
678         ENTRY;
679
680         switch (ev) {
681                 case MD_LOV_SYNC:
682                         rc = cmm_post_init_mdc(env, md2cmm_dev(md));
683                         if (rc)
684                                 CERROR("can not init md size %d\n", rc);
685                         /* fall through */
686                 default:
687                         rc = md_do_upcall(env, md, ev, data);
688         }
689         RETURN(rc);
690 }
691
692 static struct lu_device *cmm_device_free(const struct lu_env *env,
693                                          struct lu_device *d)
694 {
695         struct cmm_device *m = lu2cmm_dev(d);
696         struct lu_device  *next = md2lu_dev(m->cmm_child);
697         ENTRY;
698
699         LASSERT(m->cmm_tgt_count == 0);
700         LASSERT(cfs_list_empty(&m->cmm_targets));
701         if (m->cmm_fld != NULL) {
702                 OBD_FREE_PTR(m->cmm_fld);
703                 m->cmm_fld = NULL;
704         }
705         md_device_fini(&m->cmm_md_dev);
706         OBD_FREE_PTR(m);
707         RETURN(next);
708 }
709
710 static struct lu_device *cmm_device_alloc(const struct lu_env *env,
711                                           struct lu_device_type *t,
712                                           struct lustre_cfg *cfg)
713 {
714         struct lu_device  *l;
715         struct cmm_device *m;
716         ENTRY;
717
718         OBD_ALLOC_PTR(m);
719         if (m == NULL) {
720                 l = ERR_PTR(-ENOMEM);
721         } else {
722                 md_device_init(&m->cmm_md_dev, t);
723                 m->cmm_md_dev.md_ops = &cmm_md_ops;
724                 md_upcall_init(&m->cmm_md_dev, cmm_upcall);
725                 l = cmm2lu_dev(m);
726                 l->ld_ops = &cmm_lu_ops;
727
728                 OBD_ALLOC_PTR(m->cmm_fld);
729                 if (!m->cmm_fld) {
730                         cmm_device_free(env, l);
731                         l = ERR_PTR(-ENOMEM);
732                 }
733         }
734         RETURN(l);
735 }
736
737 /* context key constructor/destructor: cmm_key_init, cmm_key_fini */
738 LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
739
740 /* context key: cmm_thread_key */
741 LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
742
743 struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
744 {
745         struct cmm_thread_info *info;
746
747         info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
748         LASSERT(info != NULL);
749         return info;
750 }
751
752 /* type constructor/destructor: cmm_type_init/cmm_type_fini */
753 LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
754
755 /* 
756  * Kludge code : it should be moved mdc_device.c if mdc_(mds)_device
757  * is really stacked.
758  */
759 static int __cmm_type_init(struct lu_device_type *t)
760 {
761         int rc;
762         rc = lu_device_type_init(&mdc_device_type);
763         if (rc == 0) {
764                 rc = cmm_type_init(t);
765                 if (rc)
766                         lu_device_type_fini(&mdc_device_type);
767         }
768         return rc;
769 }
770
771 static void __cmm_type_fini(struct lu_device_type *t)
772 {
773         lu_device_type_fini(&mdc_device_type);
774         cmm_type_fini(t);
775 }
776
777 static void __cmm_type_start(struct lu_device_type *t)
778 {
779         mdc_device_type.ldt_ops->ldto_start(&mdc_device_type);
780         cmm_type_start(t);
781 }
782
783 static void __cmm_type_stop(struct lu_device_type *t)
784 {
785         mdc_device_type.ldt_ops->ldto_stop(&mdc_device_type);
786         cmm_type_stop(t);
787 }
788
789 static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
790                            const char *name, struct lu_device *next)
791 {
792         struct cmm_device *m = lu2cmm_dev(d);
793         struct lu_site *ls;
794         int err = 0;
795         ENTRY;
796
797         cfs_spin_lock_init(&m->cmm_tgt_guard);
798         CFS_INIT_LIST_HEAD(&m->cmm_targets);
799         m->cmm_tgt_count = 0;
800         m->cmm_child = lu2md_dev(next);
801
802         err = fld_client_init(m->cmm_fld, name,
803                               LUSTRE_CLI_FLD_HASH_DHT);
804         if (err) {
805                 CERROR("Can't init FLD, err %d\n", err);
806                 RETURN(err);
807         }
808
809         /* Assign site's fld client ref, needed for asserts in osd. */
810         ls = cmm2lu_dev(m)->ld_site;
811         lu_site2md(ls)->ms_client_fld = m->cmm_fld;
812         err = cmm_procfs_init(m, name);
813
814         RETURN(err);
815 }
816
817 static struct lu_device *cmm_device_fini(const struct lu_env *env,
818                                          struct lu_device *ld)
819 {
820         struct cmm_device *cm = lu2cmm_dev(ld);
821         struct mdc_device *mc, *tmp;
822         struct lu_site *ls;
823         ENTRY;
824
825         /* Finish all mdc devices */
826         cfs_spin_lock(&cm->cmm_tgt_guard);
827         cfs_list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
828                 struct lu_device *ld_m = mdc2lu_dev(mc);
829                 struct lu_device *ld_c = cmm2lu_dev(cm);
830
831                 cfs_list_del_init(&mc->mc_linkage);
832                 lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
833                 lu_device_put(ld_c);
834                 ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
835                 ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
836                 cm->cmm_tgt_count--;
837         }
838         cfs_spin_unlock(&cm->cmm_tgt_guard);
839
840         fld_client_proc_fini(cm->cmm_fld);
841         fld_client_fini(cm->cmm_fld);
842         ls = cmm2lu_dev(cm)->ld_site;
843         lu_site2md(ls)->ms_client_fld = NULL;
844         cmm_procfs_fini(cm);
845
846         RETURN (md2lu_dev(cm->cmm_child));
847 }
848
849 static struct lu_device_type_operations cmm_device_type_ops = {
850         .ldto_init = __cmm_type_init,
851         .ldto_fini = __cmm_type_fini,
852
853         .ldto_start = __cmm_type_start,
854         .ldto_stop  = __cmm_type_stop,
855
856         .ldto_device_alloc = cmm_device_alloc,
857         .ldto_device_free  = cmm_device_free,
858
859         .ldto_device_init = cmm_device_init,
860         .ldto_device_fini = cmm_device_fini
861 };
862
863 static struct lu_device_type cmm_device_type = {
864         .ldt_tags     = LU_DEVICE_MD,
865         .ldt_name     = LUSTRE_CMM_NAME,
866         .ldt_ops      = &cmm_device_type_ops,
867         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
868 };
869
870 struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
871         { 0 }
872 };
873
874 struct lprocfs_vars lprocfs_cmm_module_vars[] = {
875         { 0 }
876 };
877
878 static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
879 {
880     lvars->module_vars  = lprocfs_cmm_module_vars;
881     lvars->obd_vars     = lprocfs_cmm_obd_vars;
882 }
883 /** @} */
884
885 static int __init cmm_mod_init(void)
886 {
887         struct lprocfs_static_vars lvars;
888
889         lprocfs_cmm_init_vars(&lvars);
890         return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
891                                    LUSTRE_CMM_NAME, &cmm_device_type);
892 }
893
894 static void __exit cmm_mod_exit(void)
895 {
896         class_unregister_type(LUSTRE_CMM_NAME);
897 }
898
899 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
900 MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
901 MODULE_LICENSE("GPL");
902
903 cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);