Whamcloud - gitweb
Land b_head_quota onto HEAD (20081116_0105)
[fs/lustre-release.git] / lustre / cmm / cmm_device.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_device.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lprocfs_status.h>
53 #include <lustre_ver.h>
54 #include "cmm_internal.h"
55 #include "mdc_internal.h"
56 #ifdef HAVE_QUOTA_SUPPORT
57 # include <lustre_quota.h>
58 #endif
59
60 static struct obd_ops cmm_obd_device_ops = {
61         .o_owner           = THIS_MODULE
62 };
63
64 static const struct lu_device_operations cmm_lu_ops;
65
66 static inline int lu_device_is_cmm(struct lu_device *d)
67 {
68         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
69 }
70
71 int cmm_root_get(const struct lu_env *env, struct md_device *md,
72                  struct lu_fid *fid)
73 {
74         struct cmm_device *cmm_dev = md2cmm_dev(md);
75         /* valid only on master MDS */
76         if (cmm_dev->cmm_local_num == 0)
77                 return cmm_child_ops(cmm_dev)->mdo_root_get(env,
78                                      cmm_dev->cmm_child, fid);
79         else
80                 return -EINVAL;
81 }
82
83 static int cmm_statfs(const struct lu_env *env, struct md_device *md,
84                       struct kstatfs *sfs)
85 {
86         struct cmm_device *cmm_dev = md2cmm_dev(md);
87         int rc;
88
89         ENTRY;
90         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
91                                                 cmm_dev->cmm_child, sfs);
92         RETURN (rc);
93 }
94
95 static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
96                            int *md_size, int *cookie_size)
97 {
98         struct cmm_device *cmm_dev = md2cmm_dev(md);
99         int rc;
100         ENTRY;
101         rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
102                                                      md_size, cookie_size);
103         RETURN(rc);
104 }
105
106 static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
107                               int mode , unsigned long timeout, __u32 alg,
108                               struct lustre_capa_key *keys)
109 {
110         struct cmm_device *cmm_dev = md2cmm_dev(md);
111         int rc;
112         ENTRY;
113         LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
114         rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
115                                                         mode, timeout, alg,
116                                                         keys);
117         RETURN(rc);
118 }
119
120 static int cmm_update_capa_key(const struct lu_env *env,
121                                struct md_device *md,
122                                struct lustre_capa_key *key)
123 {
124         struct cmm_device *cmm_dev = md2cmm_dev(md);
125         int rc;
126         ENTRY;
127         rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
128                                                          cmm_dev->cmm_child,
129                                                          key);
130         RETURN(rc);
131 }
132
133 #ifdef HAVE_QUOTA_SUPPORT
134 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
135 {
136         struct cmm_device *cmm_dev = md2cmm_dev(m);
137         int rc;
138         ENTRY;
139
140         /* disable quota for CMD case temporary. */
141         if (cmm_dev->cmm_tgt_count)
142                 RETURN(-EOPNOTSUPP);
143
144         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_notify(env,
145                                                           cmm_dev->cmm_child);
146         RETURN(rc);
147 }
148
149 static int cmm_quota_setup(const struct lu_env *env, struct md_device *m,
150                            void *data)
151 {
152         struct cmm_device *cmm_dev = md2cmm_dev(m);
153         int rc;
154         ENTRY;
155
156         /* disable quota for CMD case temporary. */
157         if (cmm_dev->cmm_tgt_count)
158                 RETURN(-EOPNOTSUPP);
159
160         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setup(env,
161                                                          cmm_dev->cmm_child,
162                                                          data);
163         RETURN(rc);
164 }
165
166 static int cmm_quota_cleanup(const struct lu_env *env, struct md_device *m)
167 {
168         struct cmm_device *cmm_dev = md2cmm_dev(m);
169         int rc;
170         ENTRY;
171
172         /* disable quota for CMD case temporary. */
173         if (cmm_dev->cmm_tgt_count)
174                 RETURN(-EOPNOTSUPP);
175
176         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_cleanup(env,
177                                                            cmm_dev->cmm_child);
178         RETURN(rc);
179 }
180
181 static int cmm_quota_recovery(const struct lu_env *env, struct md_device *m)
182 {
183         struct cmm_device *cmm_dev = md2cmm_dev(m);
184         int rc;
185         ENTRY;
186
187         /* disable quota for CMD case temporary. */
188         if (cmm_dev->cmm_tgt_count)
189                 RETURN(-EOPNOTSUPP);
190
191         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_recovery(env,
192                                                             cmm_dev->cmm_child);
193         RETURN(rc);
194 }
195
196 static int cmm_quota_check(const struct lu_env *env, struct md_device *m,
197                            struct obd_export *exp, __u32 type)
198 {
199         struct cmm_device *cmm_dev = md2cmm_dev(m);
200         int rc;
201         ENTRY;
202
203         /* disable quota for CMD case temporary. */
204         if (cmm_dev->cmm_tgt_count)
205                 RETURN(-EOPNOTSUPP);
206
207         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_check(env,
208                                                          cmm_dev->cmm_child,
209                                                          exp, type);
210         RETURN(rc);
211 }
212
213 static int cmm_quota_on(const struct lu_env *env, struct md_device *m,
214                         __u32 type, __u32 id)
215 {
216         struct cmm_device *cmm_dev = md2cmm_dev(m);
217         int rc;
218         ENTRY;
219
220         /* disable quota for CMD case temporary. */
221         if (cmm_dev->cmm_tgt_count)
222                 RETURN(-EOPNOTSUPP);
223
224         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_on(env,
225                                                       cmm_dev->cmm_child,
226                                                       type, id);
227         RETURN(rc);
228 }
229
230 static int cmm_quota_off(const struct lu_env *env, struct md_device *m,
231                          __u32 type, __u32 id)
232 {
233         struct cmm_device *cmm_dev = md2cmm_dev(m);
234         int rc;
235         ENTRY;
236
237         /* disable quota for CMD case temporary. */
238         if (cmm_dev->cmm_tgt_count)
239                 RETURN(-EOPNOTSUPP);
240
241         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_off(env,
242                                                        cmm_dev->cmm_child,
243                                                        type, id);
244         RETURN(rc);
245 }
246
247 static int cmm_quota_setinfo(const struct lu_env *env, struct md_device *m,
248                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
249 {
250         struct cmm_device *cmm_dev = md2cmm_dev(m);
251         int rc;
252         ENTRY;
253
254         /* disable quota for CMD case temporary. */
255         if (cmm_dev->cmm_tgt_count)
256                 RETURN(-EOPNOTSUPP);
257
258         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setinfo(env,
259                                                            cmm_dev->cmm_child,
260                                                            type, id, dqinfo);
261         RETURN(rc);
262 }
263
264 static int cmm_quota_getinfo(const struct lu_env *env,
265                              const struct md_device *m,
266                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
267 {
268         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
269         int rc;
270         ENTRY;
271
272         /* disable quota for CMD case temporary. */
273         if (cmm_dev->cmm_tgt_count)
274                 RETURN(-EOPNOTSUPP);
275
276         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getinfo(env,
277                                                            cmm_dev->cmm_child,
278                                                            type, id, dqinfo);
279         RETURN(rc);
280 }
281
282 static int cmm_quota_setquota(const struct lu_env *env, struct md_device *m,
283                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
284 {
285         struct cmm_device *cmm_dev = md2cmm_dev(m);
286         int rc;
287         ENTRY;
288
289         /* disable quota for CMD case temporary. */
290         if (cmm_dev->cmm_tgt_count)
291                 RETURN(-EOPNOTSUPP);
292
293         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setquota(env,
294                                                             cmm_dev->cmm_child,
295                                                             type, id, dqblk);
296         RETURN(rc);
297 }
298
299 static int cmm_quota_getquota(const struct lu_env *env,
300                               const struct md_device *m,
301                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
302 {
303         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
304         int rc;
305         ENTRY;
306
307         /* disable quota for CMD case temporary. */
308         if (cmm_dev->cmm_tgt_count)
309                 RETURN(-EOPNOTSUPP);
310
311         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getquota(env,
312                                                             cmm_dev->cmm_child,
313                                                             type, id, dqblk);
314         RETURN(rc);
315 }
316
317 static int cmm_quota_getoinfo(const struct lu_env *env,
318                               const struct md_device *m,
319                               __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
320 {
321         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
322         int rc;
323         ENTRY;
324
325         /* disable quota for CMD case temporary. */
326         if (cmm_dev->cmm_tgt_count)
327                 RETURN(-EOPNOTSUPP);
328
329         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoinfo(env,
330                                                             cmm_dev->cmm_child,
331                                                             type, id, dqinfo);
332         RETURN(rc);
333 }
334
335 static int cmm_quota_getoquota(const struct lu_env *env,
336                                const struct md_device *m,
337                                __u32 type, __u32 id, struct obd_dqblk *dqblk)
338 {
339         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
340         int rc;
341         ENTRY;
342
343         /* disable quota for CMD case temporary. */
344         if (cmm_dev->cmm_tgt_count)
345                 RETURN(-EOPNOTSUPP);
346
347         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoquota(env,
348                                                              cmm_dev->cmm_child,
349                                                              type, id, dqblk);
350         RETURN(rc);
351 }
352
353 static int cmm_quota_invalidate(const struct lu_env *env, struct md_device *m,
354                                 __u32 type)
355 {
356         struct cmm_device *cmm_dev = md2cmm_dev(m);
357         int rc;
358         ENTRY;
359
360         /* disable quota for CMD case temporary. */
361         if (cmm_dev->cmm_tgt_count)
362                 RETURN(-EOPNOTSUPP);
363
364         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_invalidate(env,
365                                                               cmm_dev->cmm_child,
366                                                               type);
367         RETURN(rc);
368 }
369
370 static int cmm_quota_finvalidate(const struct lu_env *env, struct md_device *m,
371                                  __u32 type)
372 {
373         struct cmm_device *cmm_dev = md2cmm_dev(m);
374         int rc;
375         ENTRY;
376
377         /* disable quota for CMD case temporary. */
378         if (cmm_dev->cmm_tgt_count)
379                 RETURN(-EOPNOTSUPP);
380
381         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_finvalidate(env,
382                                                                cmm_dev->cmm_child,
383                                                                type);
384         RETURN(rc);
385 }
386 #endif
387
388 static const struct md_device_operations cmm_md_ops = {
389         .mdo_statfs          = cmm_statfs,
390         .mdo_root_get        = cmm_root_get,
391         .mdo_maxsize_get     = cmm_maxsize_get,
392         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
393         .mdo_update_capa_key = cmm_update_capa_key,
394 #ifdef HAVE_QUOTA_SUPPORT
395         .mdo_quota           = {
396                 .mqo_notify      = cmm_quota_notify,
397                 .mqo_setup       = cmm_quota_setup,
398                 .mqo_cleanup     = cmm_quota_cleanup,
399                 .mqo_recovery    = cmm_quota_recovery,
400                 .mqo_check       = cmm_quota_check,
401                 .mqo_on          = cmm_quota_on,
402                 .mqo_off         = cmm_quota_off,
403                 .mqo_setinfo     = cmm_quota_setinfo,
404                 .mqo_getinfo     = cmm_quota_getinfo,
405                 .mqo_setquota    = cmm_quota_setquota,
406                 .mqo_getquota    = cmm_quota_getquota,
407                 .mqo_getoinfo    = cmm_quota_getoinfo,
408                 .mqo_getoquota   = cmm_quota_getoquota,
409                 .mqo_invalidate  = cmm_quota_invalidate,
410                 .mqo_finvalidate = cmm_quota_finvalidate
411         }
412 #endif
413 };
414
415 extern struct lu_device_type mdc_device_type;
416
417 static int cmm_post_init_mdc(const struct lu_env *env,
418                              struct cmm_device *cmm)
419 {
420         int max_mdsize, max_cookiesize, rc;
421         struct mdc_device *mc, *tmp;
422
423         /* get the max mdsize and cookiesize from lower layer */
424         rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
425                                                 &max_cookiesize);
426         if (rc)
427                 RETURN(rc);
428
429         spin_lock(&cmm->cmm_tgt_guard);
430         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
431                                  mc_linkage) {
432                 cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
433         }
434         spin_unlock(&cmm->cmm_tgt_guard);
435         RETURN(rc);
436 }
437
438 /* --- cmm_lu_operations --- */
439 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
440 static int cmm_add_mdc(const struct lu_env *env,
441                        struct cmm_device *cm, struct lustre_cfg *cfg)
442 {
443         struct lu_device_type *ldt = &mdc_device_type;
444         char *p, *num = lustre_cfg_string(cfg, 2);
445         struct mdc_device *mc, *tmp;
446         struct lu_fld_target target;
447         struct lu_device *ld;
448         struct lu_device *cmm_lu = cmm2lu_dev(cm);
449         mdsno_t mdc_num;
450         int rc;
451         ENTRY;
452
453         /* find out that there is no such mdc */
454         LASSERT(num);
455         mdc_num = simple_strtol(num, &p, 10);
456         if (*p) {
457                 CERROR("Invalid index in lustre_cgf, offset 2\n");
458                 RETURN(-EINVAL);
459         }
460
461         spin_lock(&cm->cmm_tgt_guard);
462         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
463                                  mc_linkage) {
464                 if (mc->mc_num == mdc_num) {
465                         spin_unlock(&cm->cmm_tgt_guard);
466                         RETURN(-EEXIST);
467                 }
468         }
469         spin_unlock(&cm->cmm_tgt_guard);
470         ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
471         if (IS_ERR(ld))
472                 RETURN(PTR_ERR(ld));
473
474         ld->ld_site = cmm2lu_dev(cm)->ld_site;
475
476         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
477         if (rc) {
478                 ldt->ldt_ops->ldto_device_free(env, ld);
479                 RETURN(rc);
480         }
481         /* pass config to the just created MDC */
482         rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
483         if (rc) {
484                 ldt->ldt_ops->ldto_device_fini(env, ld);
485                 ldt->ldt_ops->ldto_device_free(env, ld);
486                 RETURN(rc);
487         }
488
489         spin_lock(&cm->cmm_tgt_guard);
490         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
491                                  mc_linkage) {
492                 if (mc->mc_num == mdc_num) {
493                         spin_unlock(&cm->cmm_tgt_guard);
494                         ldt->ldt_ops->ldto_device_fini(env, ld);
495                         ldt->ldt_ops->ldto_device_free(env, ld);
496                         RETURN(-EEXIST);
497                 }
498         }
499         mc = lu2mdc_dev(ld);
500         list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
501         cm->cmm_tgt_count++;
502         spin_unlock(&cm->cmm_tgt_guard);
503
504         lu_device_get(cmm_lu);
505         lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
506
507         target.ft_srv = NULL;
508         target.ft_idx = mc->mc_num;
509         target.ft_exp = mc->mc_desc.cl_exp;
510         fld_client_add_target(cm->cmm_fld, &target);
511
512         /* Set max md size for the mdc. */
513         rc = cmm_post_init_mdc(env, cm);
514         RETURN(rc);
515 }
516
517 static void cmm_device_shutdown(const struct lu_env *env,
518                                 struct cmm_device *cm,
519                                 struct lustre_cfg *cfg)
520 {
521         struct mdc_device *mc, *tmp;
522         ENTRY;
523
524         /* Remove local target from FLD. */
525         fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
526
527         /* Finish all mdc devices. */
528         spin_lock(&cm->cmm_tgt_guard);
529         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
530                 struct lu_device *ld_m = mdc2lu_dev(mc);
531                 fld_client_del_target(cm->cmm_fld, mc->mc_num);
532                 ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
533         }
534         spin_unlock(&cm->cmm_tgt_guard);
535
536         /* remove upcall device*/
537         md_upcall_fini(&cm->cmm_md_dev);
538
539         EXIT;
540 }
541
542 static int cmm_device_mount(const struct lu_env *env,
543                             struct cmm_device *m, struct lustre_cfg *cfg)
544 {
545         const char *index = lustre_cfg_string(cfg, 2);
546         char *p;
547
548         LASSERT(index != NULL);
549
550         m->cmm_local_num = simple_strtol(index, &p, 10);
551         if (*p) {
552                 CERROR("Invalid index in lustre_cgf\n");
553                 RETURN(-EINVAL);
554         }
555
556         RETURN(0);
557 }
558
559 static int cmm_process_config(const struct lu_env *env,
560                               struct lu_device *d, struct lustre_cfg *cfg)
561 {
562         struct cmm_device *m = lu2cmm_dev(d);
563         struct lu_device *next = md2lu_dev(m->cmm_child);
564         int err;
565         ENTRY;
566
567         switch(cfg->lcfg_command) {
568         case LCFG_ADD_MDC:
569                 /* On first ADD_MDC add also local target. */
570                 if (!(m->cmm_flags & CMM_INITIALIZED)) {
571                         struct lu_site *ls = cmm2lu_dev(m)->ld_site;
572                         struct lu_fld_target target;
573
574                         target.ft_srv = lu_site2md(ls)->ms_server_fld;
575                         target.ft_idx = m->cmm_local_num;
576                         target.ft_exp = NULL;
577
578                         fld_client_add_target(m->cmm_fld, &target);
579                 }
580                 err = cmm_add_mdc(env, m, cfg);
581
582                 /* The first ADD_MDC can be counted as setup is finished. */
583                 if (!(m->cmm_flags & CMM_INITIALIZED))
584                         m->cmm_flags |= CMM_INITIALIZED;
585
586                 break;
587         case LCFG_SETUP:
588         {
589                 /* lower layers should be set up at first */
590                 err = next->ld_ops->ldo_process_config(env, next, cfg);
591                 if (err == 0)
592                         err = cmm_device_mount(env, m, cfg);
593                 break;
594         }
595         case LCFG_CLEANUP:
596         {
597                 cmm_device_shutdown(env, m, cfg);
598         }
599         default:
600                 err = next->ld_ops->ldo_process_config(env, next, cfg);
601         }
602         RETURN(err);
603 }
604
605 static int cmm_recovery_complete(const struct lu_env *env,
606                                  struct lu_device *d)
607 {
608         struct cmm_device *m = lu2cmm_dev(d);
609         struct lu_device *next = md2lu_dev(m->cmm_child);
610         int rc;
611         ENTRY;
612         rc = next->ld_ops->ldo_recovery_complete(env, next);
613         RETURN(rc);
614 }
615
616 static const struct lu_device_operations cmm_lu_ops = {
617         .ldo_object_alloc      = cmm_object_alloc,
618         .ldo_process_config    = cmm_process_config,
619         .ldo_recovery_complete = cmm_recovery_complete
620 };
621
622 /* --- lu_device_type operations --- */
623 int cmm_upcall(const struct lu_env *env, struct md_device *md,
624                enum md_upcall_event ev)
625 {
626         int rc;
627         ENTRY;
628
629         switch (ev) {
630                 case MD_LOV_SYNC:
631                         rc = cmm_post_init_mdc(env, md2cmm_dev(md));
632                         if (rc)
633                                 CERROR("can not init md size %d\n", rc);
634                         /* fall through */
635                 default:
636                         rc = md_do_upcall(env, md, ev);
637         }
638         RETURN(rc);
639 }
640
641 static struct lu_device *cmm_device_free(const struct lu_env *env,
642                                          struct lu_device *d)
643 {
644         struct cmm_device *m = lu2cmm_dev(d);
645         struct lu_device  *next = md2lu_dev(m->cmm_child);
646         ENTRY;
647
648         LASSERT(m->cmm_tgt_count == 0);
649         LASSERT(list_empty(&m->cmm_targets));
650         if (m->cmm_fld != NULL) {
651                 OBD_FREE_PTR(m->cmm_fld);
652                 m->cmm_fld = NULL;
653         }
654         md_device_fini(&m->cmm_md_dev);
655         OBD_FREE_PTR(m);
656         RETURN(next);
657 }
658
659 static struct lu_device *cmm_device_alloc(const struct lu_env *env,
660                                           struct lu_device_type *t,
661                                           struct lustre_cfg *cfg)
662 {
663         struct lu_device  *l;
664         struct cmm_device *m;
665         ENTRY;
666
667         OBD_ALLOC_PTR(m);
668         if (m == NULL) {
669                 l = ERR_PTR(-ENOMEM);
670         } else {
671                 md_device_init(&m->cmm_md_dev, t);
672                 m->cmm_md_dev.md_ops = &cmm_md_ops;
673                 md_upcall_init(&m->cmm_md_dev, cmm_upcall);
674                 l = cmm2lu_dev(m);
675                 l->ld_ops = &cmm_lu_ops;
676
677                 OBD_ALLOC_PTR(m->cmm_fld);
678                 if (!m->cmm_fld) {
679                         cmm_device_free(env, l);
680                         l = ERR_PTR(-ENOMEM);
681                 }
682         }
683         RETURN(l);
684 }
685
686 /* context key constructor/destructor: cmm_key_init, cmm_key_fini */
687 LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
688
689 /* context key: cmm_thread_key */
690 LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
691
692 struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
693 {
694         struct cmm_thread_info *info;
695
696         info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
697         LASSERT(info != NULL);
698         return info;
699 }
700
701 /* type constructor/destructor: cmm_type_init/cmm_type_fini */
702 LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
703
704 static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
705                            const char *name, struct lu_device *next)
706 {
707         struct cmm_device *m = lu2cmm_dev(d);
708         struct lu_site *ls;
709         int err = 0;
710         ENTRY;
711
712         spin_lock_init(&m->cmm_tgt_guard);
713         CFS_INIT_LIST_HEAD(&m->cmm_targets);
714         m->cmm_tgt_count = 0;
715         m->cmm_child = lu2md_dev(next);
716
717         err = fld_client_init(m->cmm_fld, name,
718                               LUSTRE_CLI_FLD_HASH_DHT);
719         if (err) {
720                 CERROR("Can't init FLD, err %d\n", err);
721                 RETURN(err);
722         }
723
724         /* Assign site's fld client ref, needed for asserts in osd. */
725         ls = cmm2lu_dev(m)->ld_site;
726         lu_site2md(ls)->ms_client_fld = m->cmm_fld;
727         err = cmm_procfs_init(m, name);
728
729         RETURN(err);
730 }
731
732 static struct lu_device *cmm_device_fini(const struct lu_env *env,
733                                          struct lu_device *ld)
734 {
735         struct cmm_device *cm = lu2cmm_dev(ld);
736         struct mdc_device *mc, *tmp;
737         struct lu_site *ls;
738         ENTRY;
739
740         /* Finish all mdc devices */
741         spin_lock(&cm->cmm_tgt_guard);
742         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
743                 struct lu_device *ld_m = mdc2lu_dev(mc);
744                 struct lu_device *ld_c = cmm2lu_dev(cm);
745
746                 list_del_init(&mc->mc_linkage);
747                 lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
748                 lu_device_put(ld_c);
749                 ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
750                 ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
751                 cm->cmm_tgt_count--;
752         }
753         spin_unlock(&cm->cmm_tgt_guard);
754
755         fld_client_fini(cm->cmm_fld);
756         ls = cmm2lu_dev(cm)->ld_site;
757         lu_site2md(ls)->ms_client_fld = NULL;
758         cmm_procfs_fini(cm);
759
760         RETURN (md2lu_dev(cm->cmm_child));
761 }
762
763 static struct lu_device_type_operations cmm_device_type_ops = {
764         .ldto_init = cmm_type_init,
765         .ldto_fini = cmm_type_fini,
766
767         .ldto_start = cmm_type_start,
768         .ldto_stop  = cmm_type_stop,
769
770         .ldto_device_alloc = cmm_device_alloc,
771         .ldto_device_free  = cmm_device_free,
772
773         .ldto_device_init = cmm_device_init,
774         .ldto_device_fini = cmm_device_fini
775 };
776
777 static struct lu_device_type cmm_device_type = {
778         .ldt_tags     = LU_DEVICE_MD,
779         .ldt_name     = LUSTRE_CMM_NAME,
780         .ldt_ops      = &cmm_device_type_ops,
781         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
782 };
783
784 struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
785         { 0 }
786 };
787
788 struct lprocfs_vars lprocfs_cmm_module_vars[] = {
789         { 0 }
790 };
791
792 static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
793 {
794     lvars->module_vars  = lprocfs_cmm_module_vars;
795     lvars->obd_vars     = lprocfs_cmm_obd_vars;
796 }
797
798 static int __init cmm_mod_init(void)
799 {
800         struct lprocfs_static_vars lvars;
801
802         lprocfs_cmm_init_vars(&lvars);
803         return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
804                                    LUSTRE_CMM_NAME, &cmm_device_type);
805 }
806
807 static void __exit cmm_mod_exit(void)
808 {
809         class_unregister_type(LUSTRE_CMM_NAME);
810 }
811
812 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
813 MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
814 MODULE_LICENSE("GPL");
815
816 cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);