Whamcloud - gitweb
bae6967345c565bbe028ad2b24554a8e944365b5
[fs/lustre-release.git] / lustre / cmm / cmm_device.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_device.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lprocfs_status.h>
53 #include <lustre_ver.h>
54 #include "cmm_internal.h"
55 #include "mdc_internal.h"
56 #ifdef HAVE_QUOTA_SUPPORT
57 # include <lustre_quota.h>
58 #endif
59
60 static struct obd_ops cmm_obd_device_ops = {
61         .o_owner           = THIS_MODULE
62 };
63
64 static const struct lu_device_operations cmm_lu_ops;
65
66 static inline int lu_device_is_cmm(struct lu_device *d)
67 {
68         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
69 }
70
71 int cmm_root_get(const struct lu_env *env, struct md_device *md,
72                  struct lu_fid *fid)
73 {
74         struct cmm_device *cmm_dev = md2cmm_dev(md);
75         /* valid only on master MDS */
76         if (cmm_dev->cmm_local_num == 0)
77                 return cmm_child_ops(cmm_dev)->mdo_root_get(env,
78                                      cmm_dev->cmm_child, fid);
79         else
80                 return -EINVAL;
81 }
82
83 static int cmm_statfs(const struct lu_env *env, struct md_device *md,
84                       struct kstatfs *sfs)
85 {
86         struct cmm_device *cmm_dev = md2cmm_dev(md);
87         int rc;
88
89         ENTRY;
90         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
91                                                 cmm_dev->cmm_child, sfs);
92         RETURN (rc);
93 }
94
95 static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
96                            int *md_size, int *cookie_size)
97 {
98         struct cmm_device *cmm_dev = md2cmm_dev(md);
99         int rc;
100         ENTRY;
101         rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
102                                                      md_size, cookie_size);
103         RETURN(rc);
104 }
105
106 static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
107                               int mode , unsigned long timeout, __u32 alg,
108                               struct lustre_capa_key *keys)
109 {
110         struct cmm_device *cmm_dev = md2cmm_dev(md);
111         int rc;
112         ENTRY;
113         LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
114         rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
115                                                         mode, timeout, alg,
116                                                         keys);
117         RETURN(rc);
118 }
119
120 static int cmm_update_capa_key(const struct lu_env *env,
121                                struct md_device *md,
122                                struct lustre_capa_key *key)
123 {
124         struct cmm_device *cmm_dev = md2cmm_dev(md);
125         int rc;
126         ENTRY;
127         rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
128                                                          cmm_dev->cmm_child,
129                                                          key);
130         RETURN(rc);
131 }
132
133 #ifdef HAVE_QUOTA_SUPPORT
134 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
135 {
136         struct cmm_device *cmm_dev = md2cmm_dev(m);
137         int rc;
138         ENTRY;
139
140         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_notify(env,
141                                                           cmm_dev->cmm_child);
142         RETURN(rc);
143 }
144
145 static int cmm_quota_setup(const struct lu_env *env, struct md_device *m,
146                            void *data)
147 {
148         struct cmm_device *cmm_dev = md2cmm_dev(m);
149         int rc;
150         ENTRY;
151
152         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setup(env,
153                                                          cmm_dev->cmm_child,
154                                                          data);
155         RETURN(rc);
156 }
157
158 static int cmm_quota_cleanup(const struct lu_env *env, struct md_device *m)
159 {
160         struct cmm_device *cmm_dev = md2cmm_dev(m);
161         int rc;
162         ENTRY;
163
164         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_cleanup(env,
165                                                            cmm_dev->cmm_child);
166         RETURN(rc);
167 }
168
169 static int cmm_quota_recovery(const struct lu_env *env, struct md_device *m)
170 {
171         struct cmm_device *cmm_dev = md2cmm_dev(m);
172         int rc;
173         ENTRY;
174
175         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_recovery(env,
176                                                             cmm_dev->cmm_child);
177         RETURN(rc);
178 }
179
180 static int cmm_quota_check(const struct lu_env *env, struct md_device *m,
181                            struct obd_export *exp, __u32 type)
182 {
183         struct cmm_device *cmm_dev = md2cmm_dev(m);
184         int rc;
185         ENTRY;
186
187         /* disable quota for CMD case temporary. */
188         if (cmm_dev->cmm_tgt_count)
189                 RETURN(-EOPNOTSUPP);
190
191         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_check(env,
192                                                          cmm_dev->cmm_child,
193                                                          exp, type);
194         RETURN(rc);
195 }
196
197 static int cmm_quota_on(const struct lu_env *env, struct md_device *m,
198                         __u32 type)
199 {
200         struct cmm_device *cmm_dev = md2cmm_dev(m);
201         int rc;
202         ENTRY;
203
204         /* disable quota for CMD case temporary. */
205         if (cmm_dev->cmm_tgt_count)
206                 RETURN(-EOPNOTSUPP);
207
208         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_on(env,
209                                                       cmm_dev->cmm_child,
210                                                       type);
211         RETURN(rc);
212 }
213
214 static int cmm_quota_off(const struct lu_env *env, struct md_device *m,
215                          __u32 type)
216 {
217         struct cmm_device *cmm_dev = md2cmm_dev(m);
218         int rc;
219         ENTRY;
220
221         /* disable quota for CMD case temporary. */
222         if (cmm_dev->cmm_tgt_count)
223                 RETURN(-EOPNOTSUPP);
224
225         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_off(env,
226                                                        cmm_dev->cmm_child,
227                                                        type);
228         RETURN(rc);
229 }
230
231 static int cmm_quota_setinfo(const struct lu_env *env, struct md_device *m,
232                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
233 {
234         struct cmm_device *cmm_dev = md2cmm_dev(m);
235         int rc;
236         ENTRY;
237
238         /* disable quota for CMD case temporary. */
239         if (cmm_dev->cmm_tgt_count)
240                 RETURN(-EOPNOTSUPP);
241
242         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setinfo(env,
243                                                            cmm_dev->cmm_child,
244                                                            type, id, dqinfo);
245         RETURN(rc);
246 }
247
248 static int cmm_quota_getinfo(const struct lu_env *env,
249                              const struct md_device *m,
250                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
251 {
252         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
253         int rc;
254         ENTRY;
255
256         /* disable quota for CMD case temporary. */
257         if (cmm_dev->cmm_tgt_count)
258                 RETURN(-EOPNOTSUPP);
259
260         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getinfo(env,
261                                                            cmm_dev->cmm_child,
262                                                            type, id, dqinfo);
263         RETURN(rc);
264 }
265
266 static int cmm_quota_setquota(const struct lu_env *env, struct md_device *m,
267                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
268 {
269         struct cmm_device *cmm_dev = md2cmm_dev(m);
270         int rc;
271         ENTRY;
272
273         /* disable quota for CMD case temporary. */
274         if (cmm_dev->cmm_tgt_count)
275                 RETURN(-EOPNOTSUPP);
276
277         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setquota(env,
278                                                             cmm_dev->cmm_child,
279                                                             type, id, dqblk);
280         RETURN(rc);
281 }
282
283 static int cmm_quota_getquota(const struct lu_env *env,
284                               const struct md_device *m,
285                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
286 {
287         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
288         int rc;
289         ENTRY;
290
291         /* disable quota for CMD case temporary. */
292         if (cmm_dev->cmm_tgt_count)
293                 RETURN(-EOPNOTSUPP);
294
295         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getquota(env,
296                                                             cmm_dev->cmm_child,
297                                                             type, id, dqblk);
298         RETURN(rc);
299 }
300
301 static int cmm_quota_getoinfo(const struct lu_env *env,
302                               const struct md_device *m,
303                               __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
304 {
305         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
306         int rc;
307         ENTRY;
308
309         /* disable quota for CMD case temporary. */
310         if (cmm_dev->cmm_tgt_count)
311                 RETURN(-EOPNOTSUPP);
312
313         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoinfo(env,
314                                                             cmm_dev->cmm_child,
315                                                             type, id, dqinfo);
316         RETURN(rc);
317 }
318
319 static int cmm_quota_getoquota(const struct lu_env *env,
320                                const struct md_device *m,
321                                __u32 type, __u32 id, struct obd_dqblk *dqblk)
322 {
323         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
324         int rc;
325         ENTRY;
326
327         /* disable quota for CMD case temporary. */
328         if (cmm_dev->cmm_tgt_count)
329                 RETURN(-EOPNOTSUPP);
330
331         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoquota(env,
332                                                              cmm_dev->cmm_child,
333                                                              type, id, dqblk);
334         RETURN(rc);
335 }
336
337 static int cmm_quota_invalidate(const struct lu_env *env, struct md_device *m,
338                                 __u32 type)
339 {
340         struct cmm_device *cmm_dev = md2cmm_dev(m);
341         int rc;
342         ENTRY;
343
344         /* disable quota for CMD case temporary. */
345         if (cmm_dev->cmm_tgt_count)
346                 RETURN(-EOPNOTSUPP);
347
348         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_invalidate(env,
349                                                               cmm_dev->cmm_child,
350                                                               type);
351         RETURN(rc);
352 }
353
354 static int cmm_quota_finvalidate(const struct lu_env *env, struct md_device *m,
355                                  __u32 type)
356 {
357         struct cmm_device *cmm_dev = md2cmm_dev(m);
358         int rc;
359         ENTRY;
360
361         /* disable quota for CMD case temporary. */
362         if (cmm_dev->cmm_tgt_count)
363                 RETURN(-EOPNOTSUPP);
364
365         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_finvalidate(env,
366                                                                cmm_dev->cmm_child,
367                                                                type);
368         RETURN(rc);
369 }
370 #endif
371
372 static const struct md_device_operations cmm_md_ops = {
373         .mdo_statfs          = cmm_statfs,
374         .mdo_root_get        = cmm_root_get,
375         .mdo_maxsize_get     = cmm_maxsize_get,
376         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
377         .mdo_update_capa_key = cmm_update_capa_key,
378 #ifdef HAVE_QUOTA_SUPPORT
379         .mdo_quota           = {
380                 .mqo_notify      = cmm_quota_notify,
381                 .mqo_setup       = cmm_quota_setup,
382                 .mqo_cleanup     = cmm_quota_cleanup,
383                 .mqo_recovery    = cmm_quota_recovery,
384                 .mqo_check       = cmm_quota_check,
385                 .mqo_on          = cmm_quota_on,
386                 .mqo_off         = cmm_quota_off,
387                 .mqo_setinfo     = cmm_quota_setinfo,
388                 .mqo_getinfo     = cmm_quota_getinfo,
389                 .mqo_setquota    = cmm_quota_setquota,
390                 .mqo_getquota    = cmm_quota_getquota,
391                 .mqo_getoinfo    = cmm_quota_getoinfo,
392                 .mqo_getoquota   = cmm_quota_getoquota,
393                 .mqo_invalidate  = cmm_quota_invalidate,
394                 .mqo_finvalidate = cmm_quota_finvalidate
395         }
396 #endif
397 };
398
399 extern struct lu_device_type mdc_device_type;
400
401 static int cmm_post_init_mdc(const struct lu_env *env,
402                              struct cmm_device *cmm)
403 {
404         int max_mdsize, max_cookiesize, rc;
405         struct mdc_device *mc, *tmp;
406
407         /* get the max mdsize and cookiesize from lower layer */
408         rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
409                                                 &max_cookiesize);
410         if (rc)
411                 RETURN(rc);
412
413         spin_lock(&cmm->cmm_tgt_guard);
414         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
415                                  mc_linkage) {
416                 cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
417         }
418         spin_unlock(&cmm->cmm_tgt_guard);
419         RETURN(rc);
420 }
421
422 /* --- cmm_lu_operations --- */
423 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
424 static int cmm_add_mdc(const struct lu_env *env,
425                        struct cmm_device *cm, struct lustre_cfg *cfg)
426 {
427         struct lu_device_type *ldt = &mdc_device_type;
428         char *p, *num = lustre_cfg_string(cfg, 2);
429         struct mdc_device *mc, *tmp;
430         struct lu_fld_target target;
431         struct lu_device *ld;
432         struct lu_device *cmm_lu = cmm2lu_dev(cm);
433         mdsno_t mdc_num;
434         struct lu_site *site = cmm2lu_dev(cm)->ld_site;
435         int rc;
436 #ifdef HAVE_QUOTA_SUPPORT
437         int first;
438 #endif
439         ENTRY;
440
441         /* find out that there is no such mdc */
442         LASSERT(num);
443         mdc_num = simple_strtol(num, &p, 10);
444         if (*p) {
445                 CERROR("Invalid index in lustre_cgf, offset 2\n");
446                 RETURN(-EINVAL);
447         }
448
449         spin_lock(&cm->cmm_tgt_guard);
450         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
451                                  mc_linkage) {
452                 if (mc->mc_num == mdc_num) {
453                         spin_unlock(&cm->cmm_tgt_guard);
454                         RETURN(-EEXIST);
455                 }
456         }
457         spin_unlock(&cm->cmm_tgt_guard);
458         ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
459         if (IS_ERR(ld))
460                 RETURN(PTR_ERR(ld));
461
462         ld->ld_site = site;
463
464         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
465         if (rc) {
466                 ldt->ldt_ops->ldto_device_free(env, ld);
467                 RETURN(rc);
468         }
469         /* pass config to the just created MDC */
470         rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
471         if (rc) {
472                 ldt->ldt_ops->ldto_device_fini(env, ld);
473                 ldt->ldt_ops->ldto_device_free(env, ld);
474                 RETURN(rc);
475         }
476
477         spin_lock(&cm->cmm_tgt_guard);
478         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
479                                  mc_linkage) {
480                 if (mc->mc_num == mdc_num) {
481                         spin_unlock(&cm->cmm_tgt_guard);
482                         ldt->ldt_ops->ldto_device_fini(env, ld);
483                         ldt->ldt_ops->ldto_device_free(env, ld);
484                         RETURN(-EEXIST);
485                 }
486         }
487         mc = lu2mdc_dev(ld);
488         list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
489         cm->cmm_tgt_count++;
490 #ifdef HAVE_QUOTA_SUPPORT
491         first = cm->cmm_tgt_count;
492 #endif
493         spin_unlock(&cm->cmm_tgt_guard);
494
495         lu_device_get(cmm_lu);
496         lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
497
498         target.ft_srv = NULL;
499         target.ft_idx = mc->mc_num;
500         target.ft_exp = mc->mc_desc.cl_exp;
501         fld_client_add_target(cm->cmm_fld, &target);
502
503         if (mc->mc_num == 0) {
504                 /* this is mdt0 -> mc export, fld lookup need this export
505                    to forward fld lookup request. */
506                 LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
507                 lu_site2md(site)->ms_server_fld->lsf_control_exp =
508                                           mc->mc_desc.cl_exp;
509         }
510 #ifdef HAVE_QUOTA_SUPPORT
511         /* XXX: Disable quota for CMD case temporary. */
512         if (first == 1) {
513                 CWARN("Disable quota for CMD case temporary!\n");
514                 cmm_child_ops(cm)->mdo_quota.mqo_off(env, cm->cmm_child, UGQUOTA);
515         }
516 #endif
517         /* Set max md size for the mdc. */
518         rc = cmm_post_init_mdc(env, cm);
519         RETURN(rc);
520 }
521
522 static void cmm_device_shutdown(const struct lu_env *env,
523                                 struct cmm_device *cm,
524                                 struct lustre_cfg *cfg)
525 {
526         struct mdc_device *mc, *tmp;
527         ENTRY;
528
529         /* Remove local target from FLD. */
530         fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
531
532         /* Finish all mdc devices. */
533         spin_lock(&cm->cmm_tgt_guard);
534         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
535                 struct lu_device *ld_m = mdc2lu_dev(mc);
536                 fld_client_del_target(cm->cmm_fld, mc->mc_num);
537                 ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
538         }
539         spin_unlock(&cm->cmm_tgt_guard);
540
541         /* remove upcall device*/
542         md_upcall_fini(&cm->cmm_md_dev);
543
544         EXIT;
545 }
546
547 static int cmm_device_mount(const struct lu_env *env,
548                             struct cmm_device *m, struct lustre_cfg *cfg)
549 {
550         const char *index = lustre_cfg_string(cfg, 2);
551         char *p;
552
553         LASSERT(index != NULL);
554
555         m->cmm_local_num = simple_strtol(index, &p, 10);
556         if (*p) {
557                 CERROR("Invalid index in lustre_cgf\n");
558                 RETURN(-EINVAL);
559         }
560
561         RETURN(0);
562 }
563
564 static int cmm_process_config(const struct lu_env *env,
565                               struct lu_device *d, struct lustre_cfg *cfg)
566 {
567         struct cmm_device *m = lu2cmm_dev(d);
568         struct lu_device *next = md2lu_dev(m->cmm_child);
569         int err;
570         ENTRY;
571
572         switch(cfg->lcfg_command) {
573         case LCFG_ADD_MDC:
574                 /* On first ADD_MDC add also local target. */
575                 if (!(m->cmm_flags & CMM_INITIALIZED)) {
576                         struct lu_site *ls = cmm2lu_dev(m)->ld_site;
577                         struct lu_fld_target target;
578
579                         target.ft_srv = lu_site2md(ls)->ms_server_fld;
580                         target.ft_idx = m->cmm_local_num;
581                         target.ft_exp = NULL;
582
583                         fld_client_add_target(m->cmm_fld, &target);
584                 }
585                 err = cmm_add_mdc(env, m, cfg);
586
587                 /* The first ADD_MDC can be counted as setup is finished. */
588                 if (!(m->cmm_flags & CMM_INITIALIZED))
589                         m->cmm_flags |= CMM_INITIALIZED;
590
591                 break;
592         case LCFG_SETUP:
593         {
594                 /* lower layers should be set up at first */
595                 err = next->ld_ops->ldo_process_config(env, next, cfg);
596                 if (err == 0)
597                         err = cmm_device_mount(env, m, cfg);
598                 break;
599         }
600         case LCFG_CLEANUP:
601         {
602                 cmm_device_shutdown(env, m, cfg);
603         }
604         default:
605                 err = next->ld_ops->ldo_process_config(env, next, cfg);
606         }
607         RETURN(err);
608 }
609
610 static int cmm_recovery_complete(const struct lu_env *env,
611                                  struct lu_device *d)
612 {
613         struct cmm_device *m = lu2cmm_dev(d);
614         struct lu_device *next = md2lu_dev(m->cmm_child);
615         int rc;
616         ENTRY;
617         rc = next->ld_ops->ldo_recovery_complete(env, next);
618         RETURN(rc);
619 }
620
621 static int cmm_prepare(const struct lu_env *env,
622                        struct lu_device *pdev,
623                        struct lu_device *dev)
624 {
625         struct cmm_device *cmm = lu2cmm_dev(dev);
626         struct lu_device *next = md2lu_dev(cmm->cmm_child);
627         int rc;
628
629         ENTRY;
630         rc = next->ld_ops->ldo_prepare(env, dev, next);
631         RETURN(rc);
632 }
633
634 static const struct lu_device_operations cmm_lu_ops = {
635         .ldo_object_alloc      = cmm_object_alloc,
636         .ldo_process_config    = cmm_process_config,
637         .ldo_recovery_complete = cmm_recovery_complete,
638         .ldo_prepare           = cmm_prepare,
639 };
640
641 /* --- lu_device_type operations --- */
642 int cmm_upcall(const struct lu_env *env, struct md_device *md,
643                enum md_upcall_event ev)
644 {
645         int rc;
646         ENTRY;
647
648         switch (ev) {
649                 case MD_LOV_SYNC:
650                         rc = cmm_post_init_mdc(env, md2cmm_dev(md));
651                         if (rc)
652                                 CERROR("can not init md size %d\n", rc);
653                         /* fall through */
654                 default:
655                         rc = md_do_upcall(env, md, ev);
656         }
657         RETURN(rc);
658 }
659
660 static struct lu_device *cmm_device_free(const struct lu_env *env,
661                                          struct lu_device *d)
662 {
663         struct cmm_device *m = lu2cmm_dev(d);
664         struct lu_device  *next = md2lu_dev(m->cmm_child);
665         ENTRY;
666
667         LASSERT(m->cmm_tgt_count == 0);
668         LASSERT(list_empty(&m->cmm_targets));
669         if (m->cmm_fld != NULL) {
670                 OBD_FREE_PTR(m->cmm_fld);
671                 m->cmm_fld = NULL;
672         }
673         md_device_fini(&m->cmm_md_dev);
674         OBD_FREE_PTR(m);
675         RETURN(next);
676 }
677
678 static struct lu_device *cmm_device_alloc(const struct lu_env *env,
679                                           struct lu_device_type *t,
680                                           struct lustre_cfg *cfg)
681 {
682         struct lu_device  *l;
683         struct cmm_device *m;
684         ENTRY;
685
686         OBD_ALLOC_PTR(m);
687         if (m == NULL) {
688                 l = ERR_PTR(-ENOMEM);
689         } else {
690                 md_device_init(&m->cmm_md_dev, t);
691                 m->cmm_md_dev.md_ops = &cmm_md_ops;
692                 md_upcall_init(&m->cmm_md_dev, cmm_upcall);
693                 l = cmm2lu_dev(m);
694                 l->ld_ops = &cmm_lu_ops;
695
696                 OBD_ALLOC_PTR(m->cmm_fld);
697                 if (!m->cmm_fld) {
698                         cmm_device_free(env, l);
699                         l = ERR_PTR(-ENOMEM);
700                 }
701         }
702         RETURN(l);
703 }
704
705 /* context key constructor/destructor: cmm_key_init, cmm_key_fini */
706 LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
707
708 /* context key: cmm_thread_key */
709 LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
710
711 struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
712 {
713         struct cmm_thread_info *info;
714
715         info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
716         LASSERT(info != NULL);
717         return info;
718 }
719
720 /* type constructor/destructor: cmm_type_init/cmm_type_fini */
721 LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
722
723 /* 
724  * Kludge code : it should be moved mdc_device.c if mdc_(mds)_device
725  * is really stacked.
726  */
727 static int __cmm_type_init(struct lu_device_type *t)
728 {
729         int rc;
730         rc = lu_device_type_init(&mdc_device_type);
731         if (rc == 0) {
732                 rc = cmm_type_init(t);
733                 if (rc)
734                         lu_device_type_fini(&mdc_device_type);
735         }
736         return rc;
737 }
738
739 static void __cmm_type_fini(struct lu_device_type *t)
740 {
741         lu_device_type_fini(&mdc_device_type);
742         cmm_type_fini(t);
743 }
744
745 static void __cmm_type_start(struct lu_device_type *t)
746 {
747         mdc_device_type.ldt_ops->ldto_start(&mdc_device_type);
748         cmm_type_start(t);
749 }
750
751 static void __cmm_type_stop(struct lu_device_type *t)
752 {
753         mdc_device_type.ldt_ops->ldto_stop(&mdc_device_type);
754         cmm_type_stop(t);
755 }
756
757 static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
758                            const char *name, struct lu_device *next)
759 {
760         struct cmm_device *m = lu2cmm_dev(d);
761         struct lu_site *ls;
762         int err = 0;
763         ENTRY;
764
765         spin_lock_init(&m->cmm_tgt_guard);
766         CFS_INIT_LIST_HEAD(&m->cmm_targets);
767         m->cmm_tgt_count = 0;
768         m->cmm_child = lu2md_dev(next);
769
770         err = fld_client_init(m->cmm_fld, name,
771                               LUSTRE_CLI_FLD_HASH_DHT);
772         if (err) {
773                 CERROR("Can't init FLD, err %d\n", err);
774                 RETURN(err);
775         }
776
777         /* Assign site's fld client ref, needed for asserts in osd. */
778         ls = cmm2lu_dev(m)->ld_site;
779         lu_site2md(ls)->ms_client_fld = m->cmm_fld;
780         err = cmm_procfs_init(m, name);
781
782         RETURN(err);
783 }
784
785 static struct lu_device *cmm_device_fini(const struct lu_env *env,
786                                          struct lu_device *ld)
787 {
788         struct cmm_device *cm = lu2cmm_dev(ld);
789         struct mdc_device *mc, *tmp;
790         struct lu_site *ls;
791         ENTRY;
792
793         /* Finish all mdc devices */
794         spin_lock(&cm->cmm_tgt_guard);
795         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
796                 struct lu_device *ld_m = mdc2lu_dev(mc);
797                 struct lu_device *ld_c = cmm2lu_dev(cm);
798
799                 list_del_init(&mc->mc_linkage);
800                 lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
801                 lu_device_put(ld_c);
802                 ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
803                 ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
804                 cm->cmm_tgt_count--;
805         }
806         spin_unlock(&cm->cmm_tgt_guard);
807
808         fld_client_fini(cm->cmm_fld);
809         ls = cmm2lu_dev(cm)->ld_site;
810         lu_site2md(ls)->ms_client_fld = NULL;
811         cmm_procfs_fini(cm);
812
813         RETURN (md2lu_dev(cm->cmm_child));
814 }
815
816 static struct lu_device_type_operations cmm_device_type_ops = {
817         .ldto_init = __cmm_type_init,
818         .ldto_fini = __cmm_type_fini,
819
820         .ldto_start = __cmm_type_start,
821         .ldto_stop  = __cmm_type_stop,
822
823         .ldto_device_alloc = cmm_device_alloc,
824         .ldto_device_free  = cmm_device_free,
825
826         .ldto_device_init = cmm_device_init,
827         .ldto_device_fini = cmm_device_fini
828 };
829
830 static struct lu_device_type cmm_device_type = {
831         .ldt_tags     = LU_DEVICE_MD,
832         .ldt_name     = LUSTRE_CMM_NAME,
833         .ldt_ops      = &cmm_device_type_ops,
834         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
835 };
836
837 struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
838         { 0 }
839 };
840
841 struct lprocfs_vars lprocfs_cmm_module_vars[] = {
842         { 0 }
843 };
844
845 static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
846 {
847     lvars->module_vars  = lprocfs_cmm_module_vars;
848     lvars->obd_vars     = lprocfs_cmm_obd_vars;
849 }
850
851 static int __init cmm_mod_init(void)
852 {
853         struct lprocfs_static_vars lvars;
854
855         lprocfs_cmm_init_vars(&lvars);
856         return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
857                                    LUSTRE_CMM_NAME, &cmm_device_type);
858 }
859
860 static void __exit cmm_mod_exit(void)
861 {
862         class_unregister_type(LUSTRE_CMM_NAME);
863 }
864
865 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
866 MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
867 MODULE_LICENSE("GPL");
868
869 cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);