Whamcloud - gitweb
b=15957
[fs/lustre-release.git] / lustre / cmm / cmm_device.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_device.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lprocfs_status.h>
53 #include <lustre_ver.h>
54 #include "cmm_internal.h"
55 #include "mdc_internal.h"
56 #ifdef HAVE_QUOTA_SUPPORT
57 # include <lustre_quota.h>
58 #endif
59
60 static struct obd_ops cmm_obd_device_ops = {
61         .o_owner           = THIS_MODULE
62 };
63
64 static const struct lu_device_operations cmm_lu_ops;
65
66 static inline int lu_device_is_cmm(struct lu_device *d)
67 {
68         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
69 }
70
71 int cmm_root_get(const struct lu_env *env, struct md_device *md,
72                  struct lu_fid *fid)
73 {
74         struct cmm_device *cmm_dev = md2cmm_dev(md);
75         /* valid only on master MDS */
76         if (cmm_dev->cmm_local_num == 0)
77                 return cmm_child_ops(cmm_dev)->mdo_root_get(env,
78                                      cmm_dev->cmm_child, fid);
79         else
80                 return -EINVAL;
81 }
82
83 static int cmm_statfs(const struct lu_env *env, struct md_device *md,
84                       struct kstatfs *sfs)
85 {
86         struct cmm_device *cmm_dev = md2cmm_dev(md);
87         int rc;
88
89         ENTRY;
90         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
91                                                 cmm_dev->cmm_child, sfs);
92         RETURN (rc);
93 }
94
95 static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
96                            int *md_size, int *cookie_size)
97 {
98         struct cmm_device *cmm_dev = md2cmm_dev(md);
99         int rc;
100         ENTRY;
101         rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
102                                                      md_size, cookie_size);
103         RETURN(rc);
104 }
105
106 static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
107                               int mode , unsigned long timeout, __u32 alg,
108                               struct lustre_capa_key *keys)
109 {
110         struct cmm_device *cmm_dev = md2cmm_dev(md);
111         int rc;
112         ENTRY;
113         LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
114         rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
115                                                         mode, timeout, alg,
116                                                         keys);
117         RETURN(rc);
118 }
119
120 static int cmm_update_capa_key(const struct lu_env *env,
121                                struct md_device *md,
122                                struct lustre_capa_key *key)
123 {
124         struct cmm_device *cmm_dev = md2cmm_dev(md);
125         int rc;
126         ENTRY;
127         rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
128                                                          cmm_dev->cmm_child,
129                                                          key);
130         RETURN(rc);
131 }
132
133 #ifdef HAVE_QUOTA_SUPPORT
134 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
135 {
136         struct cmm_device *cmm_dev = md2cmm_dev(m);
137         int rc;
138         ENTRY;
139
140         /* disable quota for CMD case temporary. */
141         if (cmm_dev->cmm_tgt_count)
142                 RETURN(-EOPNOTSUPP);
143
144         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_notify(env,
145                                                           cmm_dev->cmm_child);
146         RETURN(rc);
147 }
148
149 static int cmm_quota_setup(const struct lu_env *env, struct md_device *m,
150                            void *data)
151 {
152         struct cmm_device *cmm_dev = md2cmm_dev(m);
153         int rc;
154         ENTRY;
155
156         /* disable quota for CMD case temporary. */
157         if (cmm_dev->cmm_tgt_count)
158                 RETURN(-EOPNOTSUPP);
159
160         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setup(env,
161                                                          cmm_dev->cmm_child,
162                                                          data);
163         RETURN(rc);
164 }
165
166 static int cmm_quota_cleanup(const struct lu_env *env, struct md_device *m)
167 {
168         struct cmm_device *cmm_dev = md2cmm_dev(m);
169         int rc;
170         ENTRY;
171
172         /* disable quota for CMD case temporary. */
173         if (cmm_dev->cmm_tgt_count)
174                 RETURN(-EOPNOTSUPP);
175
176         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_cleanup(env,
177                                                            cmm_dev->cmm_child);
178         RETURN(rc);
179 }
180
181 static int cmm_quota_recovery(const struct lu_env *env, struct md_device *m)
182 {
183         struct cmm_device *cmm_dev = md2cmm_dev(m);
184         int rc;
185         ENTRY;
186
187         /* disable quota for CMD case temporary. */
188         if (cmm_dev->cmm_tgt_count)
189                 RETURN(-EOPNOTSUPP);
190
191         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_recovery(env,
192                                                             cmm_dev->cmm_child);
193         RETURN(rc);
194 }
195
196 static int cmm_quota_check(const struct lu_env *env, struct md_device *m,
197                            struct obd_export *exp, __u32 type)
198 {
199         struct cmm_device *cmm_dev = md2cmm_dev(m);
200         int rc;
201         ENTRY;
202
203         /* disable quota for CMD case temporary. */
204         if (cmm_dev->cmm_tgt_count)
205                 RETURN(-EOPNOTSUPP);
206
207         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_check(env,
208                                                          cmm_dev->cmm_child,
209                                                          exp, type);
210         RETURN(rc);
211 }
212
213 static int cmm_quota_on(const struct lu_env *env, struct md_device *m,
214                         __u32 type, __u32 id)
215 {
216         struct cmm_device *cmm_dev = md2cmm_dev(m);
217         int rc;
218         ENTRY;
219
220         /* disable quota for CMD case temporary. */
221         if (cmm_dev->cmm_tgt_count)
222                 RETURN(-EOPNOTSUPP);
223
224         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_on(env,
225                                                       cmm_dev->cmm_child,
226                                                       type, id);
227         RETURN(rc);
228 }
229
230 static int cmm_quota_off(const struct lu_env *env, struct md_device *m,
231                          __u32 type, __u32 id)
232 {
233         struct cmm_device *cmm_dev = md2cmm_dev(m);
234         int rc;
235         ENTRY;
236
237         /* disable quota for CMD case temporary. */
238         if (cmm_dev->cmm_tgt_count)
239                 RETURN(-EOPNOTSUPP);
240
241         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_off(env,
242                                                        cmm_dev->cmm_child,
243                                                        type, id);
244         RETURN(rc);
245 }
246
247 static int cmm_quota_setinfo(const struct lu_env *env, struct md_device *m,
248                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
249 {
250         struct cmm_device *cmm_dev = md2cmm_dev(m);
251         int rc;
252         ENTRY;
253
254         /* disable quota for CMD case temporary. */
255         if (cmm_dev->cmm_tgt_count)
256                 RETURN(-EOPNOTSUPP);
257
258         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setinfo(env,
259                                                            cmm_dev->cmm_child,
260                                                            type, id, dqinfo);
261         RETURN(rc);
262 }
263
264 static int cmm_quota_getinfo(const struct lu_env *env,
265                              const struct md_device *m,
266                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
267 {
268         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
269         int rc;
270         ENTRY;
271
272         /* disable quota for CMD case temporary. */
273         if (cmm_dev->cmm_tgt_count)
274                 RETURN(-EOPNOTSUPP);
275
276         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getinfo(env,
277                                                            cmm_dev->cmm_child,
278                                                            type, id, dqinfo);
279         RETURN(rc);
280 }
281
282 static int cmm_quota_setquota(const struct lu_env *env, struct md_device *m,
283                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
284 {
285         struct cmm_device *cmm_dev = md2cmm_dev(m);
286         int rc;
287         ENTRY;
288
289         /* disable quota for CMD case temporary. */
290         if (cmm_dev->cmm_tgt_count)
291                 RETURN(-EOPNOTSUPP);
292
293         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setquota(env,
294                                                             cmm_dev->cmm_child,
295                                                             type, id, dqblk);
296         RETURN(rc);
297 }
298
299 static int cmm_quota_getquota(const struct lu_env *env,
300                               const struct md_device *m,
301                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
302 {
303         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
304         int rc;
305         ENTRY;
306
307         /* disable quota for CMD case temporary. */
308         if (cmm_dev->cmm_tgt_count)
309                 RETURN(-EOPNOTSUPP);
310
311         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getquota(env,
312                                                             cmm_dev->cmm_child,
313                                                             type, id, dqblk);
314         RETURN(rc);
315 }
316
317 static int cmm_quota_getoinfo(const struct lu_env *env,
318                               const struct md_device *m,
319                               __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
320 {
321         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
322         int rc;
323         ENTRY;
324
325         /* disable quota for CMD case temporary. */
326         if (cmm_dev->cmm_tgt_count)
327                 RETURN(-EOPNOTSUPP);
328
329         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoinfo(env,
330                                                             cmm_dev->cmm_child,
331                                                             type, id, dqinfo);
332         RETURN(rc);
333 }
334
335 static int cmm_quota_getoquota(const struct lu_env *env,
336                                const struct md_device *m,
337                                __u32 type, __u32 id, struct obd_dqblk *dqblk)
338 {
339         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
340         int rc;
341         ENTRY;
342
343         /* disable quota for CMD case temporary. */
344         if (cmm_dev->cmm_tgt_count)
345                 RETURN(-EOPNOTSUPP);
346
347         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoquota(env,
348                                                              cmm_dev->cmm_child,
349                                                              type, id, dqblk);
350         RETURN(rc);
351 }
352
353 static int cmm_quota_invalidate(const struct lu_env *env, struct md_device *m,
354                                 __u32 type)
355 {
356         struct cmm_device *cmm_dev = md2cmm_dev(m);
357         int rc;
358         ENTRY;
359
360         /* disable quota for CMD case temporary. */
361         if (cmm_dev->cmm_tgt_count)
362                 RETURN(-EOPNOTSUPP);
363
364         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_invalidate(env,
365                                                               cmm_dev->cmm_child,
366                                                               type);
367         RETURN(rc);
368 }
369
370 static int cmm_quota_finvalidate(const struct lu_env *env, struct md_device *m,
371                                  __u32 type)
372 {
373         struct cmm_device *cmm_dev = md2cmm_dev(m);
374         int rc;
375         ENTRY;
376
377         /* disable quota for CMD case temporary. */
378         if (cmm_dev->cmm_tgt_count)
379                 RETURN(-EOPNOTSUPP);
380
381         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_finvalidate(env,
382                                                                cmm_dev->cmm_child,
383                                                                type);
384         RETURN(rc);
385 }
386 #endif
387
388 static const struct md_device_operations cmm_md_ops = {
389         .mdo_statfs          = cmm_statfs,
390         .mdo_root_get        = cmm_root_get,
391         .mdo_maxsize_get     = cmm_maxsize_get,
392         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
393         .mdo_update_capa_key = cmm_update_capa_key,
394 #ifdef HAVE_QUOTA_SUPPORT
395         .mdo_quota           = {
396                 .mqo_notify      = cmm_quota_notify,
397                 .mqo_setup       = cmm_quota_setup,
398                 .mqo_cleanup     = cmm_quota_cleanup,
399                 .mqo_recovery    = cmm_quota_recovery,
400                 .mqo_check       = cmm_quota_check,
401                 .mqo_on          = cmm_quota_on,
402                 .mqo_off         = cmm_quota_off,
403                 .mqo_setinfo     = cmm_quota_setinfo,
404                 .mqo_getinfo     = cmm_quota_getinfo,
405                 .mqo_setquota    = cmm_quota_setquota,
406                 .mqo_getquota    = cmm_quota_getquota,
407                 .mqo_getoinfo    = cmm_quota_getoinfo,
408                 .mqo_getoquota   = cmm_quota_getoquota,
409                 .mqo_invalidate  = cmm_quota_invalidate,
410                 .mqo_finvalidate = cmm_quota_finvalidate
411         }
412 #endif
413 };
414
415 extern struct lu_device_type mdc_device_type;
416
417 static int cmm_post_init_mdc(const struct lu_env *env,
418                              struct cmm_device *cmm)
419 {
420         int max_mdsize, max_cookiesize, rc;
421         struct mdc_device *mc, *tmp;
422
423         /* get the max mdsize and cookiesize from lower layer */
424         rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
425                                                 &max_cookiesize);
426         if (rc)
427                 RETURN(rc);
428
429         spin_lock(&cmm->cmm_tgt_guard);
430         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
431                                  mc_linkage) {
432                 cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
433         }
434         spin_unlock(&cmm->cmm_tgt_guard);
435         RETURN(rc);
436 }
437
438 /* --- cmm_lu_operations --- */
439 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
440 static int cmm_add_mdc(const struct lu_env *env,
441                        struct cmm_device *cm, struct lustre_cfg *cfg)
442 {
443         struct lu_device_type *ldt = &mdc_device_type;
444         char *p, *num = lustre_cfg_string(cfg, 2);
445         struct mdc_device *mc, *tmp;
446         struct lu_fld_target target;
447         struct lu_device *ld;
448         struct lu_device *cmm_lu = cmm2lu_dev(cm);
449         mdsno_t mdc_num;
450         struct lu_site *site = cmm2lu_dev(cm)->ld_site;
451         int rc;
452         ENTRY;
453
454         /* find out that there is no such mdc */
455         LASSERT(num);
456         mdc_num = simple_strtol(num, &p, 10);
457         if (*p) {
458                 CERROR("Invalid index in lustre_cgf, offset 2\n");
459                 RETURN(-EINVAL);
460         }
461
462         spin_lock(&cm->cmm_tgt_guard);
463         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
464                                  mc_linkage) {
465                 if (mc->mc_num == mdc_num) {
466                         spin_unlock(&cm->cmm_tgt_guard);
467                         RETURN(-EEXIST);
468                 }
469         }
470         spin_unlock(&cm->cmm_tgt_guard);
471         ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
472         if (IS_ERR(ld))
473                 RETURN(PTR_ERR(ld));
474
475         ld->ld_site = site;
476
477         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
478         if (rc) {
479                 ldt->ldt_ops->ldto_device_free(env, ld);
480                 RETURN(rc);
481         }
482         /* pass config to the just created MDC */
483         rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
484         if (rc) {
485                 ldt->ldt_ops->ldto_device_fini(env, ld);
486                 ldt->ldt_ops->ldto_device_free(env, ld);
487                 RETURN(rc);
488         }
489
490         spin_lock(&cm->cmm_tgt_guard);
491         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
492                                  mc_linkage) {
493                 if (mc->mc_num == mdc_num) {
494                         spin_unlock(&cm->cmm_tgt_guard);
495                         ldt->ldt_ops->ldto_device_fini(env, ld);
496                         ldt->ldt_ops->ldto_device_free(env, ld);
497                         RETURN(-EEXIST);
498                 }
499         }
500         mc = lu2mdc_dev(ld);
501         list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
502         cm->cmm_tgt_count++;
503         spin_unlock(&cm->cmm_tgt_guard);
504
505         lu_device_get(cmm_lu);
506         lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
507
508         target.ft_srv = NULL;
509         target.ft_idx = mc->mc_num;
510         target.ft_exp = mc->mc_desc.cl_exp;
511         fld_client_add_target(cm->cmm_fld, &target);
512
513         if (mc->mc_num == 0) {
514                 /* this is mdt0 -> mc export, fld lookup need this export
515                    to forward fld lookup request. */
516                 LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
517                 lu_site2md(site)->ms_server_fld->lsf_control_exp =
518                                           mc->mc_desc.cl_exp;
519         }
520         /* Set max md size for the mdc. */
521         rc = cmm_post_init_mdc(env, cm);
522         RETURN(rc);
523 }
524
525 static void cmm_device_shutdown(const struct lu_env *env,
526                                 struct cmm_device *cm,
527                                 struct lustre_cfg *cfg)
528 {
529         struct mdc_device *mc, *tmp;
530         ENTRY;
531
532         /* Remove local target from FLD. */
533         fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
534
535         /* Finish all mdc devices. */
536         spin_lock(&cm->cmm_tgt_guard);
537         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
538                 struct lu_device *ld_m = mdc2lu_dev(mc);
539                 fld_client_del_target(cm->cmm_fld, mc->mc_num);
540                 ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
541         }
542         spin_unlock(&cm->cmm_tgt_guard);
543
544         /* remove upcall device*/
545         md_upcall_fini(&cm->cmm_md_dev);
546
547         EXIT;
548 }
549
550 static int cmm_device_mount(const struct lu_env *env,
551                             struct cmm_device *m, struct lustre_cfg *cfg)
552 {
553         const char *index = lustre_cfg_string(cfg, 2);
554         char *p;
555
556         LASSERT(index != NULL);
557
558         m->cmm_local_num = simple_strtol(index, &p, 10);
559         if (*p) {
560                 CERROR("Invalid index in lustre_cgf\n");
561                 RETURN(-EINVAL);
562         }
563
564         RETURN(0);
565 }
566
567 static int cmm_process_config(const struct lu_env *env,
568                               struct lu_device *d, struct lustre_cfg *cfg)
569 {
570         struct cmm_device *m = lu2cmm_dev(d);
571         struct lu_device *next = md2lu_dev(m->cmm_child);
572         int err;
573         ENTRY;
574
575         switch(cfg->lcfg_command) {
576         case LCFG_ADD_MDC:
577                 /* On first ADD_MDC add also local target. */
578                 if (!(m->cmm_flags & CMM_INITIALIZED)) {
579                         struct lu_site *ls = cmm2lu_dev(m)->ld_site;
580                         struct lu_fld_target target;
581
582                         target.ft_srv = lu_site2md(ls)->ms_server_fld;
583                         target.ft_idx = m->cmm_local_num;
584                         target.ft_exp = NULL;
585
586                         fld_client_add_target(m->cmm_fld, &target);
587                 }
588                 err = cmm_add_mdc(env, m, cfg);
589
590                 /* The first ADD_MDC can be counted as setup is finished. */
591                 if (!(m->cmm_flags & CMM_INITIALIZED))
592                         m->cmm_flags |= CMM_INITIALIZED;
593
594                 break;
595         case LCFG_SETUP:
596         {
597                 /* lower layers should be set up at first */
598                 err = next->ld_ops->ldo_process_config(env, next, cfg);
599                 if (err == 0)
600                         err = cmm_device_mount(env, m, cfg);
601                 break;
602         }
603         case LCFG_CLEANUP:
604         {
605                 cmm_device_shutdown(env, m, cfg);
606         }
607         default:
608                 err = next->ld_ops->ldo_process_config(env, next, cfg);
609         }
610         RETURN(err);
611 }
612
613 static int cmm_recovery_complete(const struct lu_env *env,
614                                  struct lu_device *d)
615 {
616         struct cmm_device *m = lu2cmm_dev(d);
617         struct lu_device *next = md2lu_dev(m->cmm_child);
618         int rc;
619         ENTRY;
620         rc = next->ld_ops->ldo_recovery_complete(env, next);
621         RETURN(rc);
622 }
623
624 static int cmm_prepare(const struct lu_env *env,
625                        struct lu_device *pdev,
626                        struct lu_device *dev)
627 {
628         struct cmm_device *cmm = lu2cmm_dev(dev);
629         struct lu_device *next = md2lu_dev(cmm->cmm_child);
630         int rc;
631
632         ENTRY;
633         rc = next->ld_ops->ldo_prepare(env, dev, next);
634         RETURN(rc);
635 }
636
637 static const struct lu_device_operations cmm_lu_ops = {
638         .ldo_object_alloc      = cmm_object_alloc,
639         .ldo_process_config    = cmm_process_config,
640         .ldo_recovery_complete = cmm_recovery_complete,
641         .ldo_prepare           = cmm_prepare,
642 };
643
644 /* --- lu_device_type operations --- */
645 int cmm_upcall(const struct lu_env *env, struct md_device *md,
646                enum md_upcall_event ev)
647 {
648         int rc;
649         ENTRY;
650
651         switch (ev) {
652                 case MD_LOV_SYNC:
653                         rc = cmm_post_init_mdc(env, md2cmm_dev(md));
654                         if (rc)
655                                 CERROR("can not init md size %d\n", rc);
656                         /* fall through */
657                 default:
658                         rc = md_do_upcall(env, md, ev);
659         }
660         RETURN(rc);
661 }
662
663 static struct lu_device *cmm_device_free(const struct lu_env *env,
664                                          struct lu_device *d)
665 {
666         struct cmm_device *m = lu2cmm_dev(d);
667         struct lu_device  *next = md2lu_dev(m->cmm_child);
668         ENTRY;
669
670         LASSERT(m->cmm_tgt_count == 0);
671         LASSERT(list_empty(&m->cmm_targets));
672         if (m->cmm_fld != NULL) {
673                 OBD_FREE_PTR(m->cmm_fld);
674                 m->cmm_fld = NULL;
675         }
676         md_device_fini(&m->cmm_md_dev);
677         OBD_FREE_PTR(m);
678         RETURN(next);
679 }
680
681 static struct lu_device *cmm_device_alloc(const struct lu_env *env,
682                                           struct lu_device_type *t,
683                                           struct lustre_cfg *cfg)
684 {
685         struct lu_device  *l;
686         struct cmm_device *m;
687         ENTRY;
688
689         OBD_ALLOC_PTR(m);
690         if (m == NULL) {
691                 l = ERR_PTR(-ENOMEM);
692         } else {
693                 md_device_init(&m->cmm_md_dev, t);
694                 m->cmm_md_dev.md_ops = &cmm_md_ops;
695                 md_upcall_init(&m->cmm_md_dev, cmm_upcall);
696                 l = cmm2lu_dev(m);
697                 l->ld_ops = &cmm_lu_ops;
698
699                 OBD_ALLOC_PTR(m->cmm_fld);
700                 if (!m->cmm_fld) {
701                         cmm_device_free(env, l);
702                         l = ERR_PTR(-ENOMEM);
703                 }
704         }
705         RETURN(l);
706 }
707
708 /* context key constructor/destructor: cmm_key_init, cmm_key_fini */
709 LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
710
711 /* context key: cmm_thread_key */
712 LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
713
714 struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
715 {
716         struct cmm_thread_info *info;
717
718         info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
719         LASSERT(info != NULL);
720         return info;
721 }
722
723 /* type constructor/destructor: cmm_type_init/cmm_type_fini */
724 LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
725
726 static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
727                            const char *name, struct lu_device *next)
728 {
729         struct cmm_device *m = lu2cmm_dev(d);
730         struct lu_site *ls;
731         int err = 0;
732         ENTRY;
733
734         spin_lock_init(&m->cmm_tgt_guard);
735         CFS_INIT_LIST_HEAD(&m->cmm_targets);
736         m->cmm_tgt_count = 0;
737         m->cmm_child = lu2md_dev(next);
738
739         err = fld_client_init(m->cmm_fld, name,
740                               LUSTRE_CLI_FLD_HASH_DHT);
741         if (err) {
742                 CERROR("Can't init FLD, err %d\n", err);
743                 RETURN(err);
744         }
745
746         /* Assign site's fld client ref, needed for asserts in osd. */
747         ls = cmm2lu_dev(m)->ld_site;
748         lu_site2md(ls)->ms_client_fld = m->cmm_fld;
749         err = cmm_procfs_init(m, name);
750
751         RETURN(err);
752 }
753
754 static struct lu_device *cmm_device_fini(const struct lu_env *env,
755                                          struct lu_device *ld)
756 {
757         struct cmm_device *cm = lu2cmm_dev(ld);
758         struct mdc_device *mc, *tmp;
759         struct lu_site *ls;
760         ENTRY;
761
762         /* Finish all mdc devices */
763         spin_lock(&cm->cmm_tgt_guard);
764         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
765                 struct lu_device *ld_m = mdc2lu_dev(mc);
766                 struct lu_device *ld_c = cmm2lu_dev(cm);
767
768                 list_del_init(&mc->mc_linkage);
769                 lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
770                 lu_device_put(ld_c);
771                 ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
772                 ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
773                 cm->cmm_tgt_count--;
774         }
775         spin_unlock(&cm->cmm_tgt_guard);
776
777         fld_client_fini(cm->cmm_fld);
778         ls = cmm2lu_dev(cm)->ld_site;
779         lu_site2md(ls)->ms_client_fld = NULL;
780         cmm_procfs_fini(cm);
781
782         RETURN (md2lu_dev(cm->cmm_child));
783 }
784
785 static struct lu_device_type_operations cmm_device_type_ops = {
786         .ldto_init = cmm_type_init,
787         .ldto_fini = cmm_type_fini,
788
789         .ldto_start = cmm_type_start,
790         .ldto_stop  = cmm_type_stop,
791
792         .ldto_device_alloc = cmm_device_alloc,
793         .ldto_device_free  = cmm_device_free,
794
795         .ldto_device_init = cmm_device_init,
796         .ldto_device_fini = cmm_device_fini
797 };
798
799 static struct lu_device_type cmm_device_type = {
800         .ldt_tags     = LU_DEVICE_MD,
801         .ldt_name     = LUSTRE_CMM_NAME,
802         .ldt_ops      = &cmm_device_type_ops,
803         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
804 };
805
806 struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
807         { 0 }
808 };
809
810 struct lprocfs_vars lprocfs_cmm_module_vars[] = {
811         { 0 }
812 };
813
814 static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
815 {
816     lvars->module_vars  = lprocfs_cmm_module_vars;
817     lvars->obd_vars     = lprocfs_cmm_obd_vars;
818 }
819
820 static int __init cmm_mod_init(void)
821 {
822         struct lprocfs_static_vars lvars;
823
824         lprocfs_cmm_init_vars(&lvars);
825         return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
826                                    LUSTRE_CMM_NAME, &cmm_device_type);
827 }
828
829 static void __exit cmm_mod_exit(void)
830 {
831         class_unregister_type(LUSTRE_CMM_NAME);
832 }
833
834 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
835 MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
836 MODULE_LICENSE("GPL");
837
838 cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);