Whamcloud - gitweb
b=20748
[fs/lustre-release.git] / lustre / cmm / cmm_device.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_device.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lprocfs_status.h>
53 #include <lustre_ver.h>
54 #include "cmm_internal.h"
55 #include "mdc_internal.h"
56 #ifdef HAVE_QUOTA_SUPPORT
57 # include <lustre_quota.h>
58 #endif
59
60 struct obd_ops cmm_obd_device_ops = {
61         .o_owner           = THIS_MODULE
62 };
63
64 static const struct lu_device_operations cmm_lu_ops;
65
66 static inline int lu_device_is_cmm(struct lu_device *d)
67 {
68         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
69 }
70
71 int cmm_root_get(const struct lu_env *env, struct md_device *md,
72                  struct lu_fid *fid)
73 {
74         struct cmm_device *cmm_dev = md2cmm_dev(md);
75         /* valid only on master MDS */
76         if (cmm_dev->cmm_local_num == 0)
77                 return cmm_child_ops(cmm_dev)->mdo_root_get(env,
78                                      cmm_dev->cmm_child, fid);
79         else
80                 return -EINVAL;
81 }
82
83 static int cmm_statfs(const struct lu_env *env, struct md_device *md,
84                       struct kstatfs *sfs)
85 {
86         struct cmm_device *cmm_dev = md2cmm_dev(md);
87         int rc;
88
89         ENTRY;
90         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
91                                                 cmm_dev->cmm_child, sfs);
92         RETURN (rc);
93 }
94
95 static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
96                            int *md_size, int *cookie_size)
97 {
98         struct cmm_device *cmm_dev = md2cmm_dev(md);
99         int rc;
100         ENTRY;
101         rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
102                                                      md_size, cookie_size);
103         RETURN(rc);
104 }
105
106 static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
107                               int mode , unsigned long timeout, __u32 alg,
108                               struct lustre_capa_key *keys)
109 {
110         struct cmm_device *cmm_dev = md2cmm_dev(md);
111         int rc;
112         ENTRY;
113         LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
114         rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
115                                                         mode, timeout, alg,
116                                                         keys);
117         RETURN(rc);
118 }
119
120 static int cmm_update_capa_key(const struct lu_env *env,
121                                struct md_device *md,
122                                struct lustre_capa_key *key)
123 {
124         struct cmm_device *cmm_dev = md2cmm_dev(md);
125         int rc;
126         ENTRY;
127         rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
128                                                          cmm_dev->cmm_child,
129                                                          key);
130         RETURN(rc);
131 }
132
133 static int cmm_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
134                              int idx, void **h)
135 {
136         struct cmm_device *cmm_dev = md2cmm_dev(m);
137         int rc;
138         ENTRY;
139
140         rc = cmm_child_ops(cmm_dev)->mdo_llog_ctxt_get(env, cmm_dev->cmm_child,
141                                                        idx, h);
142         RETURN(rc);
143 }
144
145 #ifdef HAVE_QUOTA_SUPPORT
146 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
147 {
148         struct cmm_device *cmm_dev = md2cmm_dev(m);
149         int rc;
150         ENTRY;
151
152         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_notify(env,
153                                                           cmm_dev->cmm_child);
154         RETURN(rc);
155 }
156
157 static int cmm_quota_setup(const struct lu_env *env, struct md_device *m,
158                            void *data)
159 {
160         struct cmm_device *cmm_dev = md2cmm_dev(m);
161         int rc;
162         ENTRY;
163
164         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setup(env,
165                                                          cmm_dev->cmm_child,
166                                                          data);
167         RETURN(rc);
168 }
169
170 static int cmm_quota_cleanup(const struct lu_env *env, struct md_device *m)
171 {
172         struct cmm_device *cmm_dev = md2cmm_dev(m);
173         int rc;
174         ENTRY;
175
176         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_cleanup(env,
177                                                            cmm_dev->cmm_child);
178         RETURN(rc);
179 }
180
181 static int cmm_quota_recovery(const struct lu_env *env, struct md_device *m)
182 {
183         struct cmm_device *cmm_dev = md2cmm_dev(m);
184         int rc;
185         ENTRY;
186
187         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_recovery(env,
188                                                             cmm_dev->cmm_child);
189         RETURN(rc);
190 }
191
192 static int cmm_quota_check(const struct lu_env *env, struct md_device *m,
193                            struct obd_export *exp, __u32 type)
194 {
195         struct cmm_device *cmm_dev = md2cmm_dev(m);
196         int rc;
197         ENTRY;
198
199         /* disable quota for CMD case temporary. */
200         if (cmm_dev->cmm_tgt_count)
201                 RETURN(-EOPNOTSUPP);
202
203         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_check(env,
204                                                          cmm_dev->cmm_child,
205                                                          exp, type);
206         RETURN(rc);
207 }
208
209 static int cmm_quota_on(const struct lu_env *env, struct md_device *m,
210                         __u32 type)
211 {
212         struct cmm_device *cmm_dev = md2cmm_dev(m);
213         int rc;
214         ENTRY;
215
216         /* disable quota for CMD case temporary. */
217         if (cmm_dev->cmm_tgt_count)
218                 RETURN(-EOPNOTSUPP);
219
220         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_on(env,
221                                                       cmm_dev->cmm_child,
222                                                       type);
223         RETURN(rc);
224 }
225
226 static int cmm_quota_off(const struct lu_env *env, struct md_device *m,
227                          __u32 type)
228 {
229         struct cmm_device *cmm_dev = md2cmm_dev(m);
230         int rc;
231         ENTRY;
232
233         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_off(env,
234                                                        cmm_dev->cmm_child,
235                                                        type);
236         RETURN(rc);
237 }
238
239 static int cmm_quota_setinfo(const struct lu_env *env, struct md_device *m,
240                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
241 {
242         struct cmm_device *cmm_dev = md2cmm_dev(m);
243         int rc;
244         ENTRY;
245
246         /* disable quota for CMD case temporary. */
247         if (cmm_dev->cmm_tgt_count)
248                 RETURN(-EOPNOTSUPP);
249
250         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setinfo(env,
251                                                            cmm_dev->cmm_child,
252                                                            type, id, dqinfo);
253         RETURN(rc);
254 }
255
256 static int cmm_quota_getinfo(const struct lu_env *env,
257                              const struct md_device *m,
258                              __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
259 {
260         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
261         int rc;
262         ENTRY;
263
264         /* disable quota for CMD case temporary. */
265         if (cmm_dev->cmm_tgt_count)
266                 RETURN(-EOPNOTSUPP);
267
268         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getinfo(env,
269                                                            cmm_dev->cmm_child,
270                                                            type, id, dqinfo);
271         RETURN(rc);
272 }
273
274 static int cmm_quota_setquota(const struct lu_env *env, struct md_device *m,
275                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
276 {
277         struct cmm_device *cmm_dev = md2cmm_dev(m);
278         int rc;
279         ENTRY;
280
281         /* disable quota for CMD case temporary. */
282         if (cmm_dev->cmm_tgt_count)
283                 RETURN(-EOPNOTSUPP);
284
285         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_setquota(env,
286                                                             cmm_dev->cmm_child,
287                                                             type, id, dqblk);
288         RETURN(rc);
289 }
290
291 static int cmm_quota_getquota(const struct lu_env *env,
292                               const struct md_device *m,
293                               __u32 type, __u32 id, struct obd_dqblk *dqblk)
294 {
295         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
296         int rc;
297         ENTRY;
298
299         /* disable quota for CMD case temporary. */
300         if (cmm_dev->cmm_tgt_count)
301                 RETURN(-EOPNOTSUPP);
302
303         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getquota(env,
304                                                             cmm_dev->cmm_child,
305                                                             type, id, dqblk);
306         RETURN(rc);
307 }
308
309 static int cmm_quota_getoinfo(const struct lu_env *env,
310                               const struct md_device *m,
311                               __u32 type, __u32 id, struct obd_dqinfo *dqinfo)
312 {
313         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
314         int rc;
315         ENTRY;
316
317         /* disable quota for CMD case temporary. */
318         if (cmm_dev->cmm_tgt_count)
319                 RETURN(-EOPNOTSUPP);
320
321         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoinfo(env,
322                                                             cmm_dev->cmm_child,
323                                                             type, id, dqinfo);
324         RETURN(rc);
325 }
326
327 static int cmm_quota_getoquota(const struct lu_env *env,
328                                const struct md_device *m,
329                                __u32 type, __u32 id, struct obd_dqblk *dqblk)
330 {
331         struct cmm_device *cmm_dev = md2cmm_dev((struct md_device *)m);
332         int rc;
333         ENTRY;
334
335         /* disable quota for CMD case temporary. */
336         if (cmm_dev->cmm_tgt_count)
337                 RETURN(-EOPNOTSUPP);
338
339         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_getoquota(env,
340                                                              cmm_dev->cmm_child,
341                                                              type, id, dqblk);
342         RETURN(rc);
343 }
344
345 static int cmm_quota_invalidate(const struct lu_env *env, struct md_device *m,
346                                 __u32 type)
347 {
348         struct cmm_device *cmm_dev = md2cmm_dev(m);
349         int rc;
350         ENTRY;
351
352         /* disable quota for CMD case temporary. */
353         if (cmm_dev->cmm_tgt_count)
354                 RETURN(-EOPNOTSUPP);
355
356         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_invalidate(env,
357                                                               cmm_dev->cmm_child,
358                                                               type);
359         RETURN(rc);
360 }
361
362 static int cmm_quota_finvalidate(const struct lu_env *env, struct md_device *m,
363                                  __u32 type)
364 {
365         struct cmm_device *cmm_dev = md2cmm_dev(m);
366         int rc;
367         ENTRY;
368
369         /* disable quota for CMD case temporary. */
370         if (cmm_dev->cmm_tgt_count)
371                 RETURN(-EOPNOTSUPP);
372
373         rc = cmm_child_ops(cmm_dev)->mdo_quota.mqo_finvalidate(env,
374                                                                cmm_dev->cmm_child,
375                                                                type);
376         RETURN(rc);
377 }
378 #endif
379
380 int cmm_iocontrol(const struct lu_env *env, struct md_device *m,
381                   unsigned int cmd, int len, void *data)
382 {
383         struct md_device *next = md2cmm_dev(m)->cmm_child;
384         int rc;
385
386         ENTRY;
387         rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
388         RETURN(rc);
389 }
390
391
392 static const struct md_device_operations cmm_md_ops = {
393         .mdo_statfs          = cmm_statfs,
394         .mdo_root_get        = cmm_root_get,
395         .mdo_maxsize_get     = cmm_maxsize_get,
396         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
397         .mdo_update_capa_key = cmm_update_capa_key,
398         .mdo_llog_ctxt_get   = cmm_llog_ctxt_get,
399         .mdo_iocontrol       = cmm_iocontrol,
400 #ifdef HAVE_QUOTA_SUPPORT
401         .mdo_quota           = {
402                 .mqo_notify      = cmm_quota_notify,
403                 .mqo_setup       = cmm_quota_setup,
404                 .mqo_cleanup     = cmm_quota_cleanup,
405                 .mqo_recovery    = cmm_quota_recovery,
406                 .mqo_check       = cmm_quota_check,
407                 .mqo_on          = cmm_quota_on,
408                 .mqo_off         = cmm_quota_off,
409                 .mqo_setinfo     = cmm_quota_setinfo,
410                 .mqo_getinfo     = cmm_quota_getinfo,
411                 .mqo_setquota    = cmm_quota_setquota,
412                 .mqo_getquota    = cmm_quota_getquota,
413                 .mqo_getoinfo    = cmm_quota_getoinfo,
414                 .mqo_getoquota   = cmm_quota_getoquota,
415                 .mqo_invalidate  = cmm_quota_invalidate,
416                 .mqo_finvalidate = cmm_quota_finvalidate
417         }
418 #endif
419 };
420
421 extern struct lu_device_type mdc_device_type;
422
423 static int cmm_post_init_mdc(const struct lu_env *env,
424                              struct cmm_device *cmm)
425 {
426         int max_mdsize, max_cookiesize, rc;
427         struct mdc_device *mc, *tmp;
428
429         /* get the max mdsize and cookiesize from lower layer */
430         rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize,
431                                                 &max_cookiesize);
432         if (rc)
433                 RETURN(rc);
434
435         spin_lock(&cmm->cmm_tgt_guard);
436         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
437                                  mc_linkage) {
438                 cmm_mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
439         }
440         spin_unlock(&cmm->cmm_tgt_guard);
441         RETURN(rc);
442 }
443
444 /* --- cmm_lu_operations --- */
445 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
446 static int cmm_add_mdc(const struct lu_env *env,
447                        struct cmm_device *cm, struct lustre_cfg *cfg)
448 {
449         struct lu_device_type *ldt = &mdc_device_type;
450         char *p, *num = lustre_cfg_string(cfg, 2);
451         struct mdc_device *mc, *tmp;
452         struct lu_fld_target target;
453         struct lu_device *ld;
454         struct lu_device *cmm_lu = cmm2lu_dev(cm);
455         mdsno_t mdc_num;
456         struct lu_site *site = cmm2lu_dev(cm)->ld_site;
457         int rc;
458 #ifdef HAVE_QUOTA_SUPPORT
459         int first;
460 #endif
461         ENTRY;
462
463         /* find out that there is no such mdc */
464         LASSERT(num);
465         mdc_num = simple_strtol(num, &p, 10);
466         if (*p) {
467                 CERROR("Invalid index in lustre_cgf, offset 2\n");
468                 RETURN(-EINVAL);
469         }
470
471         spin_lock(&cm->cmm_tgt_guard);
472         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
473                                  mc_linkage) {
474                 if (mc->mc_num == mdc_num) {
475                         spin_unlock(&cm->cmm_tgt_guard);
476                         RETURN(-EEXIST);
477                 }
478         }
479         spin_unlock(&cm->cmm_tgt_guard);
480         ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
481         if (IS_ERR(ld))
482                 RETURN(PTR_ERR(ld));
483
484         ld->ld_site = site;
485
486         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
487         if (rc) {
488                 ldt->ldt_ops->ldto_device_free(env, ld);
489                 RETURN(rc);
490         }
491         /* pass config to the just created MDC */
492         rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
493         if (rc) {
494                 ldt->ldt_ops->ldto_device_fini(env, ld);
495                 ldt->ldt_ops->ldto_device_free(env, ld);
496                 RETURN(rc);
497         }
498
499         spin_lock(&cm->cmm_tgt_guard);
500         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
501                                  mc_linkage) {
502                 if (mc->mc_num == mdc_num) {
503                         spin_unlock(&cm->cmm_tgt_guard);
504                         ldt->ldt_ops->ldto_device_fini(env, ld);
505                         ldt->ldt_ops->ldto_device_free(env, ld);
506                         RETURN(-EEXIST);
507                 }
508         }
509         mc = lu2mdc_dev(ld);
510         list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
511         cm->cmm_tgt_count++;
512 #ifdef HAVE_QUOTA_SUPPORT
513         first = cm->cmm_tgt_count;
514 #endif
515         spin_unlock(&cm->cmm_tgt_guard);
516
517         lu_device_get(cmm_lu);
518         lu_ref_add(&cmm_lu->ld_reference, "mdc-child", ld);
519
520         target.ft_srv = NULL;
521         target.ft_idx = mc->mc_num;
522         target.ft_exp = mc->mc_desc.cl_exp;
523         fld_client_add_target(cm->cmm_fld, &target);
524
525         if (mc->mc_num == 0) {
526                 /* this is mdt0 -> mc export, fld lookup need this export
527                    to forward fld lookup request. */
528                 LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
529                 lu_site2md(site)->ms_server_fld->lsf_control_exp =
530                                           mc->mc_desc.cl_exp;
531         }
532 #ifdef HAVE_QUOTA_SUPPORT
533         /* XXX: Disable quota for CMD case temporary. */
534         if (first == 1) {
535                 CWARN("Disable quota for CMD case temporary!\n");
536                 cmm_child_ops(cm)->mdo_quota.mqo_off(env, cm->cmm_child, UGQUOTA);
537         }
538 #endif
539         /* Set max md size for the mdc. */
540         rc = cmm_post_init_mdc(env, cm);
541         RETURN(rc);
542 }
543
544 static void cmm_device_shutdown(const struct lu_env *env,
545                                 struct cmm_device *cm,
546                                 struct lustre_cfg *cfg)
547 {
548         struct mdc_device *mc, *tmp;
549         ENTRY;
550
551         /* Remove local target from FLD. */
552         fld_client_del_target(cm->cmm_fld, cm->cmm_local_num);
553
554         /* Finish all mdc devices. */
555         spin_lock(&cm->cmm_tgt_guard);
556         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
557                 struct lu_device *ld_m = mdc2lu_dev(mc);
558                 fld_client_del_target(cm->cmm_fld, mc->mc_num);
559                 ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
560         }
561         spin_unlock(&cm->cmm_tgt_guard);
562
563         /* remove upcall device*/
564         md_upcall_fini(&cm->cmm_md_dev);
565
566         EXIT;
567 }
568
569 static int cmm_device_mount(const struct lu_env *env,
570                             struct cmm_device *m, struct lustre_cfg *cfg)
571 {
572         const char *index = lustre_cfg_string(cfg, 2);
573         char *p;
574
575         LASSERT(index != NULL);
576
577         m->cmm_local_num = simple_strtol(index, &p, 10);
578         if (*p) {
579                 CERROR("Invalid index in lustre_cgf\n");
580                 RETURN(-EINVAL);
581         }
582
583         RETURN(0);
584 }
585
586 static int cmm_process_config(const struct lu_env *env,
587                               struct lu_device *d, struct lustre_cfg *cfg)
588 {
589         struct cmm_device *m = lu2cmm_dev(d);
590         struct lu_device *next = md2lu_dev(m->cmm_child);
591         int err;
592         ENTRY;
593
594         switch(cfg->lcfg_command) {
595         case LCFG_ADD_MDC:
596                 /* On first ADD_MDC add also local target. */
597                 if (!(m->cmm_flags & CMM_INITIALIZED)) {
598                         struct lu_site *ls = cmm2lu_dev(m)->ld_site;
599                         struct lu_fld_target target;
600
601                         target.ft_srv = lu_site2md(ls)->ms_server_fld;
602                         target.ft_idx = m->cmm_local_num;
603                         target.ft_exp = NULL;
604
605                         fld_client_add_target(m->cmm_fld, &target);
606                 }
607                 err = cmm_add_mdc(env, m, cfg);
608
609                 /* The first ADD_MDC can be counted as setup is finished. */
610                 if (!(m->cmm_flags & CMM_INITIALIZED))
611                         m->cmm_flags |= CMM_INITIALIZED;
612
613                 break;
614         case LCFG_SETUP:
615         {
616                 /* lower layers should be set up at first */
617                 err = next->ld_ops->ldo_process_config(env, next, cfg);
618                 if (err == 0)
619                         err = cmm_device_mount(env, m, cfg);
620                 break;
621         }
622         case LCFG_CLEANUP:
623         {
624                 cmm_device_shutdown(env, m, cfg);
625         }
626         default:
627                 err = next->ld_ops->ldo_process_config(env, next, cfg);
628         }
629         RETURN(err);
630 }
631
632 static int cmm_recovery_complete(const struct lu_env *env,
633                                  struct lu_device *d)
634 {
635         struct cmm_device *m = lu2cmm_dev(d);
636         struct lu_device *next = md2lu_dev(m->cmm_child);
637         int rc;
638         ENTRY;
639         rc = next->ld_ops->ldo_recovery_complete(env, next);
640         RETURN(rc);
641 }
642
643 static int cmm_prepare(const struct lu_env *env,
644                        struct lu_device *pdev,
645                        struct lu_device *dev)
646 {
647         struct cmm_device *cmm = lu2cmm_dev(dev);
648         struct lu_device *next = md2lu_dev(cmm->cmm_child);
649         int rc;
650
651         ENTRY;
652         rc = next->ld_ops->ldo_prepare(env, dev, next);
653         RETURN(rc);
654 }
655
656 static const struct lu_device_operations cmm_lu_ops = {
657         .ldo_object_alloc      = cmm_object_alloc,
658         .ldo_process_config    = cmm_process_config,
659         .ldo_recovery_complete = cmm_recovery_complete,
660         .ldo_prepare           = cmm_prepare,
661 };
662
663 /* --- lu_device_type operations --- */
664 int cmm_upcall(const struct lu_env *env, struct md_device *md,
665                enum md_upcall_event ev, void *data)
666 {
667         int rc;
668         ENTRY;
669
670         switch (ev) {
671                 case MD_LOV_SYNC:
672                         rc = cmm_post_init_mdc(env, md2cmm_dev(md));
673                         if (rc)
674                                 CERROR("can not init md size %d\n", rc);
675                         /* fall through */
676                 default:
677                         rc = md_do_upcall(env, md, ev, data);
678         }
679         RETURN(rc);
680 }
681
682 static struct lu_device *cmm_device_free(const struct lu_env *env,
683                                          struct lu_device *d)
684 {
685         struct cmm_device *m = lu2cmm_dev(d);
686         struct lu_device  *next = md2lu_dev(m->cmm_child);
687         ENTRY;
688
689         LASSERT(m->cmm_tgt_count == 0);
690         LASSERT(list_empty(&m->cmm_targets));
691         if (m->cmm_fld != NULL) {
692                 OBD_FREE_PTR(m->cmm_fld);
693                 m->cmm_fld = NULL;
694         }
695         md_device_fini(&m->cmm_md_dev);
696         OBD_FREE_PTR(m);
697         RETURN(next);
698 }
699
700 static struct lu_device *cmm_device_alloc(const struct lu_env *env,
701                                           struct lu_device_type *t,
702                                           struct lustre_cfg *cfg)
703 {
704         struct lu_device  *l;
705         struct cmm_device *m;
706         ENTRY;
707
708         OBD_ALLOC_PTR(m);
709         if (m == NULL) {
710                 l = ERR_PTR(-ENOMEM);
711         } else {
712                 md_device_init(&m->cmm_md_dev, t);
713                 m->cmm_md_dev.md_ops = &cmm_md_ops;
714                 md_upcall_init(&m->cmm_md_dev, cmm_upcall);
715                 l = cmm2lu_dev(m);
716                 l->ld_ops = &cmm_lu_ops;
717
718                 OBD_ALLOC_PTR(m->cmm_fld);
719                 if (!m->cmm_fld) {
720                         cmm_device_free(env, l);
721                         l = ERR_PTR(-ENOMEM);
722                 }
723         }
724         RETURN(l);
725 }
726
727 /* context key constructor/destructor: cmm_key_init, cmm_key_fini */
728 LU_KEY_INIT_FINI(cmm, struct cmm_thread_info);
729
730 /* context key: cmm_thread_key */
731 LU_CONTEXT_KEY_DEFINE(cmm, LCT_MD_THREAD);
732
733 struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
734 {
735         struct cmm_thread_info *info;
736
737         info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
738         LASSERT(info != NULL);
739         return info;
740 }
741
742 /* type constructor/destructor: cmm_type_init/cmm_type_fini */
743 LU_TYPE_INIT_FINI(cmm, &cmm_thread_key);
744
745 /* 
746  * Kludge code : it should be moved mdc_device.c if mdc_(mds)_device
747  * is really stacked.
748  */
749 static int __cmm_type_init(struct lu_device_type *t)
750 {
751         int rc;
752         rc = lu_device_type_init(&mdc_device_type);
753         if (rc == 0) {
754                 rc = cmm_type_init(t);
755                 if (rc)
756                         lu_device_type_fini(&mdc_device_type);
757         }
758         return rc;
759 }
760
761 static void __cmm_type_fini(struct lu_device_type *t)
762 {
763         lu_device_type_fini(&mdc_device_type);
764         cmm_type_fini(t);
765 }
766
767 static void __cmm_type_start(struct lu_device_type *t)
768 {
769         mdc_device_type.ldt_ops->ldto_start(&mdc_device_type);
770         cmm_type_start(t);
771 }
772
773 static void __cmm_type_stop(struct lu_device_type *t)
774 {
775         mdc_device_type.ldt_ops->ldto_stop(&mdc_device_type);
776         cmm_type_stop(t);
777 }
778
779 static int cmm_device_init(const struct lu_env *env, struct lu_device *d,
780                            const char *name, struct lu_device *next)
781 {
782         struct cmm_device *m = lu2cmm_dev(d);
783         struct lu_site *ls;
784         int err = 0;
785         ENTRY;
786
787         spin_lock_init(&m->cmm_tgt_guard);
788         CFS_INIT_LIST_HEAD(&m->cmm_targets);
789         m->cmm_tgt_count = 0;
790         m->cmm_child = lu2md_dev(next);
791
792         err = fld_client_init(m->cmm_fld, name,
793                               LUSTRE_CLI_FLD_HASH_DHT);
794         if (err) {
795                 CERROR("Can't init FLD, err %d\n", err);
796                 RETURN(err);
797         }
798
799         /* Assign site's fld client ref, needed for asserts in osd. */
800         ls = cmm2lu_dev(m)->ld_site;
801         lu_site2md(ls)->ms_client_fld = m->cmm_fld;
802         err = cmm_procfs_init(m, name);
803
804         RETURN(err);
805 }
806
807 static struct lu_device *cmm_device_fini(const struct lu_env *env,
808                                          struct lu_device *ld)
809 {
810         struct cmm_device *cm = lu2cmm_dev(ld);
811         struct mdc_device *mc, *tmp;
812         struct lu_site *ls;
813         ENTRY;
814
815         /* Finish all mdc devices */
816         spin_lock(&cm->cmm_tgt_guard);
817         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
818                 struct lu_device *ld_m = mdc2lu_dev(mc);
819                 struct lu_device *ld_c = cmm2lu_dev(cm);
820
821                 list_del_init(&mc->mc_linkage);
822                 lu_ref_del(&ld_c->ld_reference, "mdc-child", ld_m);
823                 lu_device_put(ld_c);
824                 ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
825                 ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
826                 cm->cmm_tgt_count--;
827         }
828         spin_unlock(&cm->cmm_tgt_guard);
829
830         fld_client_fini(cm->cmm_fld);
831         ls = cmm2lu_dev(cm)->ld_site;
832         lu_site2md(ls)->ms_client_fld = NULL;
833         cmm_procfs_fini(cm);
834
835         RETURN (md2lu_dev(cm->cmm_child));
836 }
837
838 static struct lu_device_type_operations cmm_device_type_ops = {
839         .ldto_init = __cmm_type_init,
840         .ldto_fini = __cmm_type_fini,
841
842         .ldto_start = __cmm_type_start,
843         .ldto_stop  = __cmm_type_stop,
844
845         .ldto_device_alloc = cmm_device_alloc,
846         .ldto_device_free  = cmm_device_free,
847
848         .ldto_device_init = cmm_device_init,
849         .ldto_device_fini = cmm_device_fini
850 };
851
852 static struct lu_device_type cmm_device_type = {
853         .ldt_tags     = LU_DEVICE_MD,
854         .ldt_name     = LUSTRE_CMM_NAME,
855         .ldt_ops      = &cmm_device_type_ops,
856         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
857 };
858
859 struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
860         { 0 }
861 };
862
863 struct lprocfs_vars lprocfs_cmm_module_vars[] = {
864         { 0 }
865 };
866
867 static void lprocfs_cmm_init_vars(struct lprocfs_static_vars *lvars)
868 {
869     lvars->module_vars  = lprocfs_cmm_module_vars;
870     lvars->obd_vars     = lprocfs_cmm_obd_vars;
871 }
872
873 static int __init cmm_mod_init(void)
874 {
875         struct lprocfs_static_vars lvars;
876
877         lprocfs_cmm_init_vars(&lvars);
878         return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
879                                    LUSTRE_CMM_NAME, &cmm_device_type);
880 }
881
882 static void __exit cmm_mod_exit(void)
883 {
884         class_unregister_type(LUSTRE_CMM_NAME);
885 }
886
887 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
888 MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
889 MODULE_LICENSE("GPL");
890
891 cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);