Whamcloud - gitweb
8411ed978fb87fa38862ed7ed984ce7d71d3e9ab
[fs/lustre-release.git] / lustre / lod / lod_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2012, Intel, Inc.
27  *
28  */
29 /*
30  * This file is part of Lustre, http://www.lustre.org/
31  * Lustre is a trademark of Sun Microsystems, Inc.
32  *
33  * lustre/lod/lod_dev.c
34  *
35  * Lustre Logical Object Device
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mikhail Pershin <mike.pershin@intel.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <obd_class.h>
47 #include <lustre_param.h>
48
49 #include "lod_internal.h"
50
51 /* Slab for OSD object allocation */
52 cfs_mem_cache_t *lod_object_kmem;
53
54 static struct lu_kmem_descr lod_caches[] = {
55         {
56                 .ckd_cache = &lod_object_kmem,
57                 .ckd_name  = "lod_obj",
58                 .ckd_size  = sizeof(struct lod_object)
59         },
60         {
61                 .ckd_cache = NULL
62         }
63 };
64
65 static struct lu_device *lod_device_fini(const struct lu_env *env,
66                                          struct lu_device *d);
67
68 static int lod_process_config(const struct lu_env *env,
69                               struct lu_device *dev,
70                               struct lustre_cfg *lcfg)
71 {
72         struct lod_device *lod = lu2lod_dev(dev);
73         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
74         char              *arg1;
75         int                rc, i;
76         ENTRY;
77
78         switch(lcfg->lcfg_command) {
79
80         case LCFG_LOV_DEL_OBD:
81         case LCFG_LOV_ADD_INA:
82         case LCFG_LOV_ADD_OBD: {
83                 __u32 index;
84                 int gen;
85                 /* lov_modify_tgts add  0:lov_mdsA  1:osp  2:0  3:1 */
86                 arg1 = lustre_cfg_string(lcfg, 1);
87
88                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
89                         GOTO(out, rc = -EINVAL);
90                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
91                         GOTO(out, rc = -EINVAL);
92
93                 rc = -EINVAL;
94                 break;
95         }
96
97         case LCFG_PARAM: {
98                 struct lprocfs_static_vars  v = { 0 };
99                 struct obd_device         *obd = lod2obd(lod);
100
101                 lprocfs_lod_init_vars(&v);
102
103                 rc = class_process_proc_param(PARAM_LOV, v.obd_vars, lcfg, obd);
104                 if (rc > 0)
105                         rc = 0;
106                 GOTO(out, rc);
107          }
108
109         case LCFG_CLEANUP:
110                 lu_dev_del_linkage(dev->ld_site, dev);
111                 lod_getref(lod);
112                 lod_foreach_ost(lod, i) {
113                         struct lod_ost_desc *ost;
114                         ost = OST_TGT(lod, i);
115                         LASSERT(ost && ost->ltd_ost);
116                         next = &ost->ltd_ost->dd_lu_dev;
117                         rc = next->ld_ops->ldo_process_config(env, next, lcfg);
118                         if (rc)
119                                 CERROR("%s: can't process %u: %d\n",
120                                        lod2obd(lod)->obd_name,
121                                        lcfg->lcfg_command, rc);
122                 }
123                 lod_putref(lod);
124
125                 /*
126                  * do cleanup on underlying storage only when
127                  * all OSPs are cleaned up, as they use that OSD as well
128                  */
129                 next = &lod->lod_child->dd_lu_dev;
130                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
131                 if (rc)
132                         CERROR("%s: can't process %u: %d\n",
133                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
134
135                 rc = obd_disconnect(lod->lod_child_exp);
136                 if (rc)
137                         CERROR("error in disconnect from storage: %d\n", rc);
138                 break;
139
140         default:
141                CERROR("%s: unknown command %u\n", lod2obd(lod)->obd_name,
142                       lcfg->lcfg_command);
143                rc = -EINVAL;
144                break;
145         }
146
147 out:
148         RETURN(rc);
149 }
150
151 static int lod_recovery_complete(const struct lu_env *env,
152                                  struct lu_device *dev)
153 {
154         struct lod_device   *lod = lu2lod_dev(dev);
155         struct lu_device    *next = &lod->lod_child->dd_lu_dev;
156         struct lod_ost_desc *ost;
157         int                  i, rc;
158         ENTRY;
159
160         LASSERT(lod->lod_recovery_completed == 0);
161         lod->lod_recovery_completed = 1;
162
163         rc = next->ld_ops->ldo_recovery_complete(env, next);
164
165         lod_getref(lod);
166         lod_foreach_ost(lod, i) {
167                 ost = OST_TGT(lod, i);
168                 LASSERT(ost && ost->ltd_ost);
169                 next = &ost->ltd_ost->dd_lu_dev;
170                 rc = next->ld_ops->ldo_recovery_complete(env, next);
171                 if (rc)
172                         CERROR("%s: can't complete recovery on #%d: %d\n",
173                                lod2obd(lod)->obd_name, i, rc);
174         }
175         lod_putref(lod);
176
177         RETURN(rc);
178 }
179
180 static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
181                        struct lu_device *cdev)
182 {
183         struct lod_device   *lod = lu2lod_dev(cdev);
184         struct lu_device    *next = &lod->lod_child->dd_lu_dev;
185         int                  rc;
186         ENTRY;
187
188         rc = next->ld_ops->ldo_prepare(env, pdev, next);
189
190         RETURN(rc);
191 }
192
193 const struct lu_device_operations lod_lu_ops = {
194         .ldo_process_config     = lod_process_config,
195         .ldo_recovery_complete  = lod_recovery_complete,
196         .ldo_prepare            = lod_prepare,
197 };
198
199 static int lod_root_get(const struct lu_env *env,
200                         struct dt_device *dev, struct lu_fid *f)
201 {
202         return dt_root_get(env, dt2lod_dev(dev)->lod_child, f);
203 }
204
205 static int lod_statfs(const struct lu_env *env,
206                       struct dt_device *dev, struct obd_statfs *sfs)
207 {
208         return dt_statfs(env, dt2lod_dev(dev)->lod_child, sfs);
209 }
210
211 static struct thandle *lod_trans_create(const struct lu_env *env,
212                                         struct dt_device *dev)
213 {
214         return dt_trans_create(env, dt2lod_dev(dev)->lod_child);
215 }
216
217 static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
218                            struct thandle *th)
219 {
220         return dt_trans_start(env, dt2lod_dev(dev)->lod_child, th);
221 }
222
223 static int lod_trans_stop(const struct lu_env *env, struct thandle *th)
224 {
225         /* XXX: we don't know next device, will be fixed with DNE */
226         return dt_trans_stop(env, th->th_dev, th);
227 }
228
229 static void lod_conf_get(const struct lu_env *env,
230                          const struct dt_device *dev,
231                          struct dt_device_param *param)
232 {
233         dt_conf_get(env, dt2lod_dev((struct dt_device *)dev)->lod_child, param);
234 }
235
236 static int lod_sync(const struct lu_env *env, struct dt_device *dev)
237 {
238         struct lod_device   *lod = dt2lod_dev(dev);
239         struct lod_ost_desc *ost;
240         int                  rc = 0, i;
241         ENTRY;
242
243         lod_getref(lod);
244         lod_foreach_ost(lod, i) {
245                 ost = OST_TGT(lod, i);
246                 LASSERT(ost && ost->ltd_ost);
247                 rc = dt_sync(env, ost->ltd_ost);
248                 if (rc) {
249                         CERROR("%s: can't sync %u: %d\n",
250                                lod2obd(lod)->obd_name, i, rc);
251                         break;
252                 }
253         }
254         lod_putref(lod);
255         if (rc == 0)
256                 rc = dt_sync(env, lod->lod_child);
257
258         RETURN(rc);
259 }
260
261 static int lod_ro(const struct lu_env *env, struct dt_device *dev)
262 {
263         return dt_ro(env, dt2lod_dev(dev)->lod_child);
264 }
265
266 static int lod_commit_async(const struct lu_env *env, struct dt_device *dev)
267 {
268         return dt_commit_async(env, dt2lod_dev(dev)->lod_child);
269 }
270
271 static int lod_init_capa_ctxt(const struct lu_env *env, struct dt_device *dev,
272                               int mode, unsigned long timeout,
273                               __u32 alg, struct lustre_capa_key *keys)
274 {
275         struct dt_device *next = dt2lod_dev(dev)->lod_child;
276         return dt_init_capa_ctxt(env, next, mode, timeout, alg, keys);
277 }
278
279 static const struct dt_device_operations lod_dt_ops = {
280         .dt_root_get         = lod_root_get,
281         .dt_statfs           = lod_statfs,
282         .dt_trans_create     = lod_trans_create,
283         .dt_trans_start      = lod_trans_start,
284         .dt_trans_stop       = lod_trans_stop,
285         .dt_conf_get         = lod_conf_get,
286         .dt_sync             = lod_sync,
287         .dt_ro               = lod_ro,
288         .dt_commit_async     = lod_commit_async,
289         .dt_init_capa_ctxt   = lod_init_capa_ctxt,
290 };
291
292 static int lod_connect_to_osd(const struct lu_env *env, struct lod_device *lod,
293                               struct lustre_cfg *cfg)
294 {
295         struct obd_connect_data *data = NULL;
296         struct obd_device       *obd;
297         char                    *nextdev = NULL, *p, *s;
298         int                      rc, len = 0;
299         ENTRY;
300
301         LASSERT(cfg);
302         LASSERT(lod->lod_child_exp == NULL);
303
304         /* compatibility hack: we still use old config logs
305          * which specify LOV, but we need to learn underlying
306          * OSD device, which is supposed to be:
307          *  <fsname>-MDTxxxx-osd
308          *
309          * 2.x MGS generates lines like the following:
310          *   #03 (176)lov_setup 0:lustre-MDT0000-mdtlov  1:(struct lov_desc)
311          * 1.8 MGS generates lines like the following:
312          *   #03 (168)lov_setup 0:lustre-mdtlov  1:(struct lov_desc)
313          *
314          * we use "-MDT" to differentiate 2.x from 1.8 */
315
316         if ((p = lustre_cfg_string(cfg, 0)) && strstr(p, "-mdtlov")) {
317                 len = strlen(p) + 1;
318                 OBD_ALLOC(nextdev, len);
319                 if (nextdev == NULL)
320                         GOTO(out, rc = -ENOMEM);
321
322                 strcpy(nextdev, p);
323                 s = strstr(nextdev, "-mdtlov");
324                 if (unlikely(s == NULL)) {
325                         CERROR("unable to parse device name %s\n",
326                                lustre_cfg_string(cfg, 0));
327                         GOTO(out, rc = -EINVAL);
328                 }
329
330                 if (strstr(nextdev, "-MDT")) {
331                         /* 2.x config */
332                         strcpy(s, "-osd");
333                 } else {
334                         /* 1.8 config */
335                         strcpy(s, "-MDT0000-osd");
336                 }
337         } else {
338                 CERROR("unable to parse device name %s\n",
339                        lustre_cfg_string(cfg, 0));
340                 GOTO(out, rc = -EINVAL);
341         }
342
343         OBD_ALLOC_PTR(data);
344         if (data == NULL)
345                 GOTO(out, rc = -ENOMEM);
346
347         obd = class_name2obd(nextdev);
348         if (obd == NULL) {
349                 CERROR("can not locate next device: %s\n", nextdev);
350                 GOTO(out, rc = -ENOTCONN);
351         }
352
353         data->ocd_connect_flags = OBD_CONNECT_VERSION;
354         data->ocd_version = LUSTRE_VERSION_CODE;
355
356         rc = obd_connect(env, &lod->lod_child_exp, obd, &obd->obd_uuid,
357                          data, NULL);
358         if (rc) {
359                 CERROR("cannot connect to next dev %s (%d)\n", nextdev, rc);
360                 GOTO(out, rc);
361         }
362
363         lod->lod_dt_dev.dd_lu_dev.ld_site =
364                 lod->lod_child_exp->exp_obd->obd_lu_dev->ld_site;
365         LASSERT(lod->lod_dt_dev.dd_lu_dev.ld_site);
366         lod->lod_child = lu2dt_dev(lod->lod_child_exp->exp_obd->obd_lu_dev);
367
368 out:
369         if (data)
370                 OBD_FREE_PTR(data);
371         if (nextdev)
372                 OBD_FREE(nextdev, len);
373         RETURN(rc);
374 }
375
376 static int lod_init0(const struct lu_env *env, struct lod_device *lod,
377                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
378 {
379         struct dt_device_param ddp;
380         struct proc_dir_entry *lov_proc_dir;
381         struct obd_device     *obd;
382         int                    rc;
383         ENTRY;
384
385         obd = class_name2obd(lustre_cfg_string(cfg, 0));
386         if (obd == NULL) {
387                 CERROR("Cannot find obd with name %s\n",
388                        lustre_cfg_string(cfg, 0));
389                 RETURN(-ENODEV);
390         }
391
392         obd->obd_lu_dev = &lod->lod_dt_dev.dd_lu_dev;
393         lod->lod_dt_dev.dd_lu_dev.ld_obd = obd;
394         lod->lod_dt_dev.dd_lu_dev.ld_ops = &lod_lu_ops;
395         lod->lod_dt_dev.dd_ops = &lod_dt_ops;
396
397         rc = lod_connect_to_osd(env, lod, cfg);
398         if (rc)
399                 RETURN(rc);
400
401         dt_conf_get(env, &lod->lod_dt_dev, &ddp);
402         lod->lod_osd_max_easize = ddp.ddp_max_ea_size;
403
404         /* for compatibility we link old procfs's OSC entries to osp ones */
405         lov_proc_dir = lprocfs_srch(proc_lustre_root, "lov");
406         if (lov_proc_dir) {
407                 cfs_proc_dir_entry_t *symlink = NULL;
408                 char *name;
409                 OBD_ALLOC(name, strlen(obd->obd_name) + 1);
410                 if (name) {
411                         strcpy(name, obd->obd_name);
412                         if (strstr(name, "lov"))
413                                 symlink = lprocfs_add_symlink(name,
414                                                 lov_proc_dir,
415                                                 "../lod/%s",
416                                                 obd->obd_name);
417                         OBD_FREE(name, strlen(obd->obd_name) + 1);
418                         lod->lod_symlink = symlink;
419                 }
420         }
421
422         cfs_mutex_init(&lod->lod_mutex);
423         cfs_init_rwsem(&lod->lod_rw_sem);
424         cfs_spin_lock_init(&lod->lod_desc_lock);
425
426         RETURN(0);
427
428         obd_disconnect(lod->lod_child_exp);
429         RETURN(rc);
430 }
431
432 static struct lu_device *lod_device_free(const struct lu_env *env,
433                                          struct lu_device *lu)
434 {
435         struct lod_device *lod = lu2lod_dev(lu);
436         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
437         ENTRY;
438
439         LASSERT(cfs_atomic_read(&lu->ld_ref) == 0);
440         dt_device_fini(&lod->lod_dt_dev);
441         OBD_FREE_PTR(lod);
442         RETURN(next);
443 }
444
445 static struct lu_device *lod_device_alloc(const struct lu_env *env,
446                                           struct lu_device_type *type,
447                                           struct lustre_cfg *lcfg)
448 {
449         struct lod_device *lod;
450         struct lu_device  *lu_dev;
451
452         OBD_ALLOC_PTR(lod);
453         if (lod == NULL) {
454                 lu_dev = ERR_PTR(-ENOMEM);
455         } else {
456                 int rc;
457
458                 lu_dev = lod2lu_dev(lod);
459                 dt_device_init(&lod->lod_dt_dev, type);
460                 rc = lod_init0(env, lod, type, lcfg);
461                 if (rc != 0) {
462                         lod_device_free(env, lu_dev);
463                         lu_dev = ERR_PTR(rc);
464                 }
465         }
466
467         return lu_dev;
468 }
469
470 static struct lu_device *lod_device_fini(const struct lu_env *env,
471                                          struct lu_device *d)
472 {
473         struct lod_device *lod = lu2lod_dev(d);
474         ENTRY;
475
476         if (lod->lod_symlink)
477                 lprocfs_remove(&lod->lod_symlink);
478
479         RETURN(NULL);
480 }
481
482 /*
483  * we use exports to track all LOD users
484  */
485 static int lod_obd_connect(const struct lu_env *env, struct obd_export **exp,
486                            struct obd_device *obd, struct obd_uuid *cluuid,
487                            struct obd_connect_data *data, void *localdata)
488 {
489         struct lod_device    *lod = lu2lod_dev(obd->obd_lu_dev);
490         struct lustre_handle  conn;
491         int                   rc;
492         ENTRY;
493
494         CDEBUG(D_CONFIG, "connect #%d\n", lod->lod_connects);
495
496         rc = class_connect(&conn, obd, cluuid);
497         if (rc)
498                 RETURN(rc);
499
500         *exp = class_conn2export(&conn);
501
502         cfs_mutex_lock(&lod->lod_mutex);
503         lod->lod_connects++;
504         /* at the moment we expect the only user */
505         LASSERT(lod->lod_connects == 1);
506         cfs_mutex_unlock(&lod->lod_mutex);
507
508         RETURN(0);
509 }
510
511 /*
512  * once last export (we don't count self-export) disappeared
513  * lod can be released
514  */
515 static int lod_obd_disconnect(struct obd_export *exp)
516 {
517         struct obd_device *obd = exp->exp_obd;
518         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
519         int                rc, release = 0;
520         ENTRY;
521
522         /* Only disconnect the underlying layers on the final disconnect. */
523         cfs_mutex_lock(&lod->lod_mutex);
524         lod->lod_connects--;
525         if (lod->lod_connects != 0) {
526                 /* why should there be more than 1 connect? */
527                 cfs_mutex_unlock(&lod->lod_mutex);
528                 CERROR("%s: disconnect #%d\n", exp->exp_obd->obd_name,
529                        lod->lod_connects);
530                 goto out;
531         }
532         cfs_mutex_unlock(&lod->lod_mutex);
533
534         /* the last user of lod has gone, let's release the device */
535         release = 1;
536
537 out:
538         rc = class_disconnect(exp); /* bz 9811 */
539
540         if (rc == 0 && release)
541                 class_manual_cleanup(obd);
542         RETURN(rc);
543 }
544
545 LU_KEY_INIT(lod, struct lod_thread_info);
546
547 static void lod_key_fini(const struct lu_context *ctx,
548                 struct lu_context_key *key, void *data)
549 {
550         struct lod_thread_info *info = data;
551         /* allocated in lod_get_lov_ea
552          * XXX: this is overload, a tread may have such store but used only
553          * once. Probably better would be pool of such stores per LOD.
554          */
555         if (info->lti_ea_store) {
556                 OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size);
557                 info->lti_ea_store = NULL;
558                 info->lti_ea_store_size = 0;
559         }
560         OBD_FREE_PTR(info);
561 }
562
563 /* context key: lod_thread_key */
564 LU_CONTEXT_KEY_DEFINE(lod, LCT_MD_THREAD);
565
566 LU_TYPE_INIT_FINI(lod, &lod_thread_key);
567
568 static struct lu_device_type_operations lod_device_type_ops = {
569         .ldto_init           = lod_type_init,
570         .ldto_fini           = lod_type_fini,
571
572         .ldto_start          = lod_type_start,
573         .ldto_stop           = lod_type_stop,
574
575         .ldto_device_alloc   = lod_device_alloc,
576         .ldto_device_free    = lod_device_free,
577
578         .ldto_device_fini    = lod_device_fini
579 };
580
581 static struct lu_device_type lod_device_type = {
582         .ldt_tags     = LU_DEVICE_DT,
583         .ldt_name     = LUSTRE_LOD_NAME,
584         .ldt_ops      = &lod_device_type_ops,
585         .ldt_ctx_tags = LCT_MD_THREAD,
586 };
587
588 static int lod_obd_health_check(const struct lu_env *env,
589                 struct obd_device *obd)
590 {
591         struct lod_device   *d = lu2lod_dev(obd->obd_lu_dev);
592         struct lod_ost_desc *ost;
593         int                  i, rc = 1;
594         ENTRY;
595
596         LASSERT(d);
597         lod_getref(d);
598         lod_foreach_ost(d, i) {
599                 ost = OST_TGT(d, i);
600                 LASSERT(ost && ost->ltd_ost);
601                 rc = obd_health_check(env, ost->ltd_exp->exp_obd);
602                 /* one healthy device is enough */
603                 if (rc == 0)
604                         break;
605         }
606         lod_putref(d);
607         RETURN(rc);
608 }
609
610 static struct obd_ops lod_obd_device_ops = {
611         .o_owner        = THIS_MODULE,
612         .o_connect      = lod_obd_connect,
613         .o_disconnect   = lod_obd_disconnect,
614         .o_health_check = lod_obd_health_check,
615 };
616
617 static int __init lod_mod_init(void)
618 {
619         struct lprocfs_static_vars  lvars = { 0 };
620         cfs_proc_dir_entry_t       *lov_proc_dir;
621         int                         rc;
622
623         rc = lu_kmem_init(lod_caches);
624         if (rc)
625                 return rc;
626
627         lprocfs_lod_init_vars(&lvars);
628
629         rc = class_register_type(&lod_obd_device_ops, NULL, lvars.module_vars,
630                                  LUSTRE_LOD_NAME, &lod_device_type);
631         if (rc) {
632                 lu_kmem_fini(lod_caches);
633                 return rc;
634         }
635
636         /* create "lov" entry in procfs for compatibility purposes */
637         lov_proc_dir = lprocfs_srch(proc_lustre_root, "lov");
638         if (lov_proc_dir == NULL) {
639                 lov_proc_dir = lprocfs_register("lov", proc_lustre_root,
640                                                 NULL, NULL);
641                 if (IS_ERR(lov_proc_dir))
642                         CERROR("lod: can't create compat entry \"lov\": %d\n",
643                                (int)PTR_ERR(lov_proc_dir));
644         }
645
646         return rc;
647 }
648
649 static void __exit lod_mod_exit(void)
650 {
651
652         lprocfs_try_remove_proc_entry("lov", proc_lustre_root);
653
654         class_unregister_type(LUSTRE_LOD_NAME);
655         lu_kmem_fini(lod_caches);
656 }
657
658 MODULE_AUTHOR("Whamcloud, Inc. <http://www.whamcloud.com/>");
659 MODULE_DESCRIPTION("Lustre Logical Object Device ("LUSTRE_LOD_NAME")");
660 MODULE_LICENSE("GPL");
661
662 module_init(lod_mod_init);
663 module_exit(lod_mod_exit);
664