Whamcloud - gitweb
- some cleanups;
[fs/lustre-release.git] / lustre / cmm / cmm_device.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_device.c
5  *  Lustre Cluster Metadata Manager (cmm)
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Mike Pershin <tappro@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lprocfs_status.h>
39 #include <lustre_ver.h>
40 #include "cmm_internal.h"
41 #include "mdc_internal.h"
42
43 static struct obd_ops cmm_obd_device_ops = {
44         .o_owner           = THIS_MODULE
45 };
46
47 static struct lu_device_operations cmm_lu_ops;
48
49 static inline int lu_device_is_cmm(struct lu_device *d)
50 {
51         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
52 }
53
54 int cmm_root_get(const struct lu_env *env, struct md_device *md,
55                  struct lu_fid *fid)
56 {
57         struct cmm_device *cmm_dev = md2cmm_dev(md);
58         /* valid only on master MDS */
59         if (cmm_dev->cmm_local_num == 0)
60                 return cmm_child_ops(cmm_dev)->mdo_root_get(env,
61                                      cmm_dev->cmm_child, fid);
62         else
63                 return -EINVAL;
64 }
65
66 static int cmm_statfs(const struct lu_env *env, struct md_device *md,
67                       struct kstatfs *sfs)
68 {
69         struct cmm_device *cmm_dev = md2cmm_dev(md);
70         int rc;
71
72         ENTRY;
73         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
74                                                 cmm_dev->cmm_child, sfs);
75         RETURN (rc);
76 }
77
78 static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
79                            int *md_size, int *cookie_size)
80 {
81         struct cmm_device *cmm_dev = md2cmm_dev(md);
82         int rc;
83         ENTRY;
84         rc = cmm_child_ops(cmm_dev)->mdo_maxsize_get(env, cmm_dev->cmm_child,
85                                                      md_size, cookie_size);
86         RETURN(rc);
87 }
88
89 static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
90                               int mode , unsigned long timeout, __u32 alg,
91                               struct lustre_capa_key *keys)
92 {
93         struct cmm_device *cmm_dev = md2cmm_dev(md);
94         int rc;
95         ENTRY;
96         LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
97         rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
98                                                         mode, timeout, alg,
99                                                         keys);
100         RETURN(rc);
101 }
102
103 static int cmm_update_capa_key(const struct lu_env *env,
104                                struct md_device *md,
105                                struct lustre_capa_key *key)
106 {
107         struct cmm_device *cmm_dev = md2cmm_dev(md);
108         int rc;
109         ENTRY;
110         rc = cmm_child_ops(cmm_dev)->mdo_update_capa_key(env,
111                                                          cmm_dev->cmm_child,
112                                                          key);
113         RETURN(rc);
114 }
115
116 static struct md_device_operations cmm_md_ops = {
117         .mdo_statfs         = cmm_statfs,
118         .mdo_root_get       = cmm_root_get,
119         .mdo_maxsize_get    = cmm_maxsize_get,
120         .mdo_init_capa_ctxt = cmm_init_capa_ctxt,
121         .mdo_update_capa_key= cmm_update_capa_key,
122 };
123
124 extern struct lu_device_type mdc_device_type;
125
126 static int cmm_post_init_mdc(const struct lu_env *env, 
127                              struct cmm_device *cmm)
128 {
129         int max_mdsize, max_cookiesize, rc;
130         struct mdc_device *mc, *tmp;
131
132         /* get the max mdsize and cookiesize from lower layer */
133         rc = cmm_maxsize_get(env, &cmm->cmm_md_dev, &max_mdsize, 
134                                                 &max_cookiesize);
135         if (rc)
136                 RETURN(rc);
137
138         spin_lock(&cmm->cmm_tgt_guard);
139         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
140                                  mc_linkage) {
141                 mdc_init_ea_size(env, mc, max_mdsize, max_cookiesize);
142         }
143         spin_unlock(&cmm->cmm_tgt_guard); 
144         RETURN(rc);
145 }
146
147 /* --- cmm_lu_operations --- */
148 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
149 static int cmm_add_mdc(const struct lu_env *env,
150                        struct cmm_device *cm, struct lustre_cfg *cfg)
151 {
152         struct lu_device_type *ldt = &mdc_device_type;
153         char *p, *num = lustre_cfg_string(cfg, 2);
154         struct mdc_device *mc, *tmp;
155         struct lu_fld_target target;
156         struct lu_device *ld;
157         struct lu_site *ls;
158         mdsno_t mdc_num;
159         int rc;
160         ENTRY;
161
162         /* find out that there is no such mdc */
163         LASSERT(num);
164         mdc_num = simple_strtol(num, &p, 10);
165         if (*p) {
166                 CERROR("Invalid index in lustre_cgf, offset 2\n");
167                 RETURN(-EINVAL);
168         }
169
170         spin_lock(&cm->cmm_tgt_guard);
171         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
172                                  mc_linkage) {
173                 if (mc->mc_num == mdc_num) {
174                         spin_unlock(&cm->cmm_tgt_guard);
175                         RETURN(-EEXIST);
176                 }
177         }
178         spin_unlock(&cm->cmm_tgt_guard);
179         ld = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
180         ld->ld_site = cmm2lu_dev(cm)->ld_site;
181
182         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL);
183         if (rc) {
184                 ldt->ldt_ops->ldto_device_free(env, ld);
185                 RETURN (rc);
186         }
187         /* pass config to the just created MDC */
188         rc = ld->ld_ops->ldo_process_config(env, ld, cfg);
189         if (rc)
190                 RETURN(rc);
191
192         spin_lock(&cm->cmm_tgt_guard);
193         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
194                                  mc_linkage) {
195                 if (mc->mc_num == mdc_num) {
196                         spin_unlock(&cm->cmm_tgt_guard);
197                         ldt->ldt_ops->ldto_device_fini(env, ld);
198                         ldt->ldt_ops->ldto_device_free(env, ld);
199                         RETURN(-EEXIST);
200                 }
201         }
202         mc = lu2mdc_dev(ld);
203         list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
204         cm->cmm_tgt_count++;
205         spin_unlock(&cm->cmm_tgt_guard);
206
207         lu_device_get(cmm2lu_dev(cm));
208
209         ls = cm->cmm_md_dev.md_lu_dev.ld_site;
210
211         target.ft_srv = NULL;
212         target.ft_idx = mc->mc_num;
213         target.ft_exp = mc->mc_desc.cl_exp;
214
215         fld_client_add_target(ls->ls_client_fld, &target);
216         
217         /* set max md size for the mdc */
218         rc = cmm_post_init_mdc(env, cm);
219         
220         RETURN(rc);
221 }
222
223 static void cmm_device_shutdown(const struct lu_env *env,
224                                 struct cmm_device *cm,
225                                 struct lustre_cfg *cfg)
226 {
227         struct mdc_device *mc, *tmp;
228         ENTRY;
229
230         /* finish all mdc devices */
231         spin_lock(&cm->cmm_tgt_guard);
232         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
233                 struct lu_device *ld_m = mdc2lu_dev(mc);
234                 ld_m->ld_ops->ldo_process_config(env, ld_m, cfg);
235         }
236         spin_unlock(&cm->cmm_tgt_guard);
237
238         EXIT;
239 }
240 static int cmm_device_mount(const struct lu_env *env,
241                             struct cmm_device *m, struct lustre_cfg *cfg)
242 {
243         const char *index = lustre_cfg_string(cfg, 2);
244         char *p;
245
246         LASSERT(index != NULL);
247
248         m->cmm_local_num = simple_strtol(index, &p, 10);
249         if (*p) {
250                 CERROR("Invalid index in lustre_cgf\n");
251                 RETURN(-EINVAL);
252         }
253
254         RETURN(0);
255 }
256
257 static int cmm_process_config(const struct lu_env *env,
258                               struct lu_device *d, struct lustre_cfg *cfg)
259 {
260         struct cmm_device *m = lu2cmm_dev(d);
261         struct lu_device *next = md2lu_dev(m->cmm_child);
262         int err;
263         ENTRY;
264
265         switch(cfg->lcfg_command) {
266         case LCFG_ADD_MDC:
267                 err = cmm_add_mdc(env, m, cfg);
268                 /* the first ADD_MDC can be counted as setup is finished */
269                 if ((m->cmm_flags & CMM_INITIALIZED) == 0)
270                         m->cmm_flags |= CMM_INITIALIZED;
271                 break;
272         case LCFG_SETUP:
273         {
274                 /* lower layers should be set up at first */
275                 err = next->ld_ops->ldo_process_config(env, next, cfg);
276                 if (err == 0)
277                         err = cmm_device_mount(env, m, cfg);
278                 break;
279         }
280         case LCFG_CLEANUP:
281         {
282                 cmm_device_shutdown(env, m, cfg);
283         }
284         default:
285                 err = next->ld_ops->ldo_process_config(env, next, cfg);
286         }
287         RETURN(err);
288 }
289
290 static int cmm_recovery_complete(const struct lu_env *env,
291                                  struct lu_device *d)
292 {
293         struct cmm_device *m = lu2cmm_dev(d);
294         struct lu_device *next = md2lu_dev(m->cmm_child);
295         int rc;
296         ENTRY;
297         rc = next->ld_ops->ldo_recovery_complete(env, next);
298         RETURN(rc);
299 }
300
301 static struct lu_device_operations cmm_lu_ops = {
302         .ldo_object_alloc      = cmm_object_alloc,
303         .ldo_process_config    = cmm_process_config,
304         .ldo_recovery_complete = cmm_recovery_complete
305 };
306
307 /* --- lu_device_type operations --- */
308 int cmm_upcall(const struct lu_env *env, struct md_device *md,
309                enum md_upcall_event ev)
310 {
311         struct md_device *upcall_dev;
312         int rc;
313         ENTRY;
314
315         upcall_dev = md->md_upcall.mu_upcall_dev;
316
317         LASSERT(upcall_dev);
318         switch (ev) {
319                 case MD_LOV_SYNC:
320                         rc = cmm_post_init_mdc(env, md2cmm_dev(md)); 
321                         if (rc) 
322                                 CERROR("can not init md size %d\n", rc);
323                 default:
324                         rc = upcall_dev->md_upcall.mu_upcall(env, 
325                                         md->md_upcall.mu_upcall_dev, ev);
326         }
327         RETURN(rc);
328 }
329
330 static struct lu_device *cmm_device_alloc(const struct lu_env *env,
331                                           struct lu_device_type *t,
332                                           struct lustre_cfg *cfg)
333 {
334         struct lu_device  *l;
335         struct cmm_device *m;
336
337         ENTRY;
338
339         OBD_ALLOC_PTR(m);
340         if (m == NULL) {
341                 l = ERR_PTR(-ENOMEM);
342         } else {
343                 md_device_init(&m->cmm_md_dev, t);
344                 m->cmm_md_dev.md_ops = &cmm_md_ops;
345                 m->cmm_md_dev.md_upcall.mu_upcall = cmm_upcall;
346                 l = cmm2lu_dev(m);
347                 l->ld_ops = &cmm_lu_ops;
348         }
349
350         RETURN (l);
351 }
352
353 static void cmm_device_free(const struct lu_env *env, struct lu_device *d)
354 {
355         struct cmm_device *m = lu2cmm_dev(d);
356
357         LASSERT(m->cmm_tgt_count == 0);
358         LASSERT(list_empty(&m->cmm_targets));
359         md_device_fini(&m->cmm_md_dev);
360         OBD_FREE_PTR(m);
361 }
362
363 /* context key constructor/destructor */
364 static void *cmm_key_init(const struct lu_context *ctx,
365                           struct lu_context_key *key)
366 {
367         struct cmm_thread_info *info;
368
369         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
370         OBD_ALLOC_PTR(info);
371         if (info == NULL)
372                 info = ERR_PTR(-ENOMEM);
373         return info;
374 }
375
376 static void cmm_key_fini(const struct lu_context *ctx,
377                          struct lu_context_key *key, void *data)
378 {
379         struct cmm_thread_info *info = data;
380         OBD_FREE_PTR(info);
381 }
382
383 static struct lu_context_key cmm_thread_key = {
384         .lct_tags = LCT_MD_THREAD,
385         .lct_init = cmm_key_init,
386         .lct_fini = cmm_key_fini
387 };
388
389 struct cmm_thread_info *cmm_env_info(const struct lu_env *env)
390 {
391         struct cmm_thread_info *info;
392
393         info = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
394         LASSERT(info != NULL);
395         return info;
396 }
397
398 static int cmm_type_init(struct lu_device_type *t)
399 {
400         return lu_context_key_register(&cmm_thread_key);
401 }
402
403 static void cmm_type_fini(struct lu_device_type *t)
404 {
405         lu_context_key_degister(&cmm_thread_key);
406 }
407
408 static int cmm_device_init(const struct lu_env *env,
409                            struct lu_device *d, struct lu_device *next)
410 {
411         struct cmm_device *m = lu2cmm_dev(d);
412         int err = 0;
413         ENTRY;
414
415         spin_lock_init(&m->cmm_tgt_guard);
416         INIT_LIST_HEAD(&m->cmm_targets);
417         m->cmm_tgt_count = 0;
418         m->cmm_child = lu2md_dev(next);
419
420         RETURN(err);
421 }
422
423 static struct lu_device *cmm_device_fini(const struct lu_env *env,
424                                          struct lu_device *ld)
425 {
426         struct cmm_device *cm = lu2cmm_dev(ld);
427         struct mdc_device *mc, *tmp;
428         ENTRY;
429         /* finish all mdc devices */
430         spin_lock(&cm->cmm_tgt_guard);
431         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
432                 struct lu_device *ld_m = mdc2lu_dev(mc);
433
434                 list_del_init(&mc->mc_linkage);
435                 lu_device_put(cmm2lu_dev(cm));
436                 ld_m->ld_type->ldt_ops->ldto_device_fini(env, ld_m);
437                 ld_m->ld_type->ldt_ops->ldto_device_free(env, ld_m);
438                 cm->cmm_tgt_count--;
439         }
440         spin_unlock(&cm->cmm_tgt_guard);
441
442         RETURN (md2lu_dev(cm->cmm_child));
443 }
444
445 static struct lu_device_type_operations cmm_device_type_ops = {
446         .ldto_init = cmm_type_init,
447         .ldto_fini = cmm_type_fini,
448
449         .ldto_device_alloc = cmm_device_alloc,
450         .ldto_device_free  = cmm_device_free,
451
452         .ldto_device_init = cmm_device_init,
453         .ldto_device_fini = cmm_device_fini
454 };
455
456 static struct lu_device_type cmm_device_type = {
457         .ldt_tags     = LU_DEVICE_MD,
458         .ldt_name     = LUSTRE_CMM_NAME,
459         .ldt_ops      = &cmm_device_type_ops,
460         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
461 };
462
463 struct lprocfs_vars lprocfs_cmm_obd_vars[] = {
464         { 0 }
465 };
466
467 struct lprocfs_vars lprocfs_cmm_module_vars[] = {
468         { 0 }
469 };
470
471 LPROCFS_INIT_VARS(cmm, lprocfs_cmm_module_vars, lprocfs_cmm_obd_vars);
472
473 static int __init cmm_mod_init(void)
474 {
475         struct lprocfs_static_vars lvars;
476
477         printk(KERN_INFO "Lustre: Clustered Metadata Manager; info@clusterfs.com\n");
478
479         lprocfs_init_vars(cmm, &lvars);
480         return class_register_type(&cmm_obd_device_ops, NULL, lvars.module_vars,
481                                    LUSTRE_CMM_NAME, &cmm_device_type);
482 }
483
484 static void __exit cmm_mod_exit(void)
485 {
486         class_unregister_type(LUSTRE_CMM_NAME);
487 }
488
489 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
490 MODULE_DESCRIPTION("Lustre Clustered Metadata Manager ("LUSTRE_CMM_NAME")");
491 MODULE_LICENSE("GPL");
492
493 cfs_module(cmm, "0.1.0", cmm_mod_init, cmm_mod_exit);