Whamcloud - gitweb
LU-1187 lod: Add remote object for DNE
[fs/lustre-release.git] / lustre / lod / lod_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, Intel Corporation.
27  *
28  */
29 /*
30  * This file is part of Lustre, http://www.lustre.org/
31  * Lustre is a trademark of Sun Microsystems, Inc.
32  *
33  * lustre/lod/lod_dev.c
34  *
35  * Lustre Logical Object Device
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mikhail Pershin <mike.pershin@intel.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <obd_class.h>
47 #include <lustre_param.h>
48
49 #include "lod_internal.h"
50
51 /**
52  * Lookup MDT/OST index \a tgt by FID \a fid.
53  *
54  * \param lod LOD to be lookup at.
55  * \param fid FID of object to find MDT/OST.
56  * \param tgt MDT/OST index to return.
57  * \param flags indidcate the FID is on MDS or OST.
58  **/
59 int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
60                    const struct lu_fid *fid, __u32 *tgt, int flags)
61 {
62         struct lu_seq_range     range;
63         struct lu_server_fld    *server_fld;
64         int rc = 0;
65         ENTRY;
66
67         LASSERTF(fid_is_sane(fid), "Invalid FID "DFID"\n", PFID(fid));
68         if (fid_is_idif(fid)) {
69                 *tgt = fid_idif_ost_idx(fid);
70                 RETURN(rc);
71         }
72
73         if (!lod->lod_initialized || !fid_is_norm(fid)) {
74                 LASSERT(lu_site2seq(lod2lu_dev(lod)->ld_site) != NULL);
75                 *tgt = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
76                 RETURN(rc);
77         }
78
79         server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld;
80         range.lsr_flags = flags;
81         rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range);
82         if (rc) {
83                 CERROR("%s: Can't find tgt by seq "LPX64", rc %d\n",
84                        lod2obd(lod)->obd_name, fid_seq(fid), rc);
85                 RETURN(rc);
86         }
87
88         *tgt = range.lsr_index;
89
90         CDEBUG(D_INFO, "LOD: got tgt %x for sequence: "
91                LPX64"\n", *tgt, fid_seq(fid));
92
93         RETURN(rc);
94 }
95
96 extern struct lu_object_operations lod_lu_obj_ops;
97 extern struct lu_object_operations lod_lu_robj_ops;
98 extern struct dt_object_operations lod_obj_ops;
99
100 /* Slab for OSD object allocation */
101 cfs_mem_cache_t *lod_object_kmem;
102
103 static struct lu_kmem_descr lod_caches[] = {
104         {
105                 .ckd_cache = &lod_object_kmem,
106                 .ckd_name  = "lod_obj",
107                 .ckd_size  = sizeof(struct lod_object)
108         },
109         {
110                 .ckd_cache = NULL
111         }
112 };
113
114 static struct lu_device *lod_device_fini(const struct lu_env *env,
115                                          struct lu_device *d);
116
117 struct lu_object *lod_object_alloc(const struct lu_env *env,
118                                    const struct lu_object_header *hdr,
119                                    struct lu_device *dev)
120 {
121         struct lod_object       *lod_obj;
122         struct lu_object        *lu_obj;
123         const struct lu_fid     *fid = &hdr->loh_fid;
124         mdsno_t                 mds;
125         int                     rc = 0;
126         ENTRY;
127
128         OBD_SLAB_ALLOC_PTR_GFP(lod_obj, lod_object_kmem, CFS_ALLOC_IO);
129         if (lod_obj == NULL)
130                 RETURN(ERR_PTR(-ENOMEM));
131
132         rc = lod_fld_lookup(env, lu2lod_dev(dev), fid, &mds, LU_SEQ_RANGE_MDT);
133         if (rc) {
134                 OBD_SLAB_FREE_PTR(lod_obj, lod_object_kmem);
135                 RETURN(ERR_PTR(rc));
136         }
137
138         lod_obj->ldo_mds_num = mds;
139         lu_obj = lod2lu_obj(lod_obj);
140         dt_object_init(&lod_obj->ldo_obj, NULL, dev);
141         lod_obj->ldo_obj.do_ops = &lod_obj_ops;
142         if (likely(mds == lu_site2seq(dev->ld_site)->ss_node_id))
143                 lu_obj->lo_ops = &lod_lu_obj_ops;
144         else
145                 lu_obj->lo_ops = &lod_lu_robj_ops;
146         RETURN(lu_obj);
147 }
148
149 static int lod_cleanup_desc_tgts(const struct lu_env *env,
150                                  struct lod_device *lod,
151                                  struct lod_tgt_descs *ltd,
152                                  struct lustre_cfg *lcfg)
153 {
154         struct lu_device  *next;
155         int rc = 0;
156         int i;
157
158         lod_getref(ltd);
159         if (ltd->ltd_tgts_size <= 0) {
160                 lod_putref(lod, ltd);
161                 return 0;
162         }
163         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
164                 struct lod_tgt_desc *tgt;
165                 int rc1;
166
167                 tgt = LTD_TGT(ltd, i);
168                 LASSERT(tgt && tgt->ltd_tgt);
169                 next = &tgt->ltd_tgt->dd_lu_dev;
170                 rc1 = next->ld_ops->ldo_process_config(env, next, lcfg);
171                 if (rc1) {
172                         CERROR("%s: error cleaning up LOD index %u: cmd %#x"
173                                ": rc = %d\n", lod2obd(lod)->obd_name, i,
174                                lcfg->lcfg_command, rc1);
175                         rc = rc1;
176                 }
177         }
178         lod_putref(lod, ltd);
179         return rc;
180 }
181
182 static int lodname2mdt_index(char *lodname, int *index)
183 {
184         char *ptr, *tmp;
185
186         /* The lodname suppose to be fsname-MDTxxxx-mdtlov */
187         ptr = strrchr(lodname, '-');
188         if (ptr == NULL) {
189                 CERROR("invalid MDT index in '%s'\n", lodname);
190                 return -EINVAL;
191         }
192
193         if (strncmp(ptr, "-mdtlov", 7) != 0) {
194                 CERROR("invalid MDT index in '%s'\n", lodname);
195                 return -EINVAL;
196         }
197
198         if ((unsigned long)ptr - (unsigned long)lodname <= 8) {
199                 CERROR("invalid MDT index in '%s'\n", lodname);
200                 return -EINVAL;
201         }
202
203         if (strncmp(ptr - 8, "-MDT", 4) != 0) {
204                 CERROR("invalid MDT index in '%s'\n", lodname);
205                 return -EINVAL;
206         }
207
208         *index = simple_strtol(ptr - 4, &tmp, 16);
209         if (*tmp != '-' || *index > INT_MAX || *index < 0) {
210                 CERROR("invalid MDT index in '%s'\n", lodname);
211                 return -EINVAL;
212         }
213         return 0;
214 }
215
216 static int lod_process_config(const struct lu_env *env,
217                               struct lu_device *dev,
218                               struct lustre_cfg *lcfg)
219 {
220         struct lod_device *lod = lu2lod_dev(dev);
221         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
222         char              *arg1;
223         int                rc = 0;
224         ENTRY;
225
226         switch(lcfg->lcfg_command) {
227         case LCFG_LOV_DEL_OBD:
228         case LCFG_LOV_ADD_INA:
229         case LCFG_LOV_ADD_OBD:
230         case LCFG_ADD_MDC: {
231                 __u32 index;
232                 __u32 mdt_index;
233                 int gen;
234                 /* lov_modify_tgts add  0:lov_mdsA  1:osp  2:0  3:1
235                  * modify_mdc_tgts add  0:lustre-MDT0001
236                  *                    1:lustre-MDT0001-mdc0002
237                  *                    2:2  3:1*/
238                 arg1 = lustre_cfg_string(lcfg, 1);
239
240                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
241                         GOTO(out, rc = -EINVAL);
242                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
243                         GOTO(out, rc = -EINVAL);
244
245                 if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
246                         char *mdt;
247                         mdt = strstr(lustre_cfg_string(lcfg, 0), "-MDT");
248                         /* 1.8 configs don't have "-MDT0000" at the end */
249                         if (mdt == NULL) {
250                                 mdt_index = 0;
251                         } else {
252                                 rc = lodname2mdt_index(
253                                         lustre_cfg_string(lcfg, 0), &mdt_index);
254                                 if (rc != 0)
255                                         GOTO(out, rc);
256                         }
257                         rc = lod_add_device(env, lod, arg1, index, gen,
258                                             mdt_index, LUSTRE_OSC_NAME, 1);
259                 } else if (lcfg->lcfg_command == LCFG_ADD_MDC) {
260                         mdt_index = index;
261                         rc = lod_add_device(env, lod, arg1, index, gen,
262                                             mdt_index, LUSTRE_MDC_NAME, 1);
263                 } else if (lcfg->lcfg_command == LCFG_LOV_ADD_INA) {
264                         /*FIXME: Add mdt_index for LCFG_LOV_ADD_INA*/
265                         mdt_index = 0;
266                         rc = lod_add_device(env, lod, arg1, index, gen,
267                                             mdt_index, LUSTRE_OSC_NAME, 0);
268                 } else {
269                         rc = lod_del_device(env, lod,
270                                             &lod->lod_ost_descs,
271                                             arg1, index, gen);
272                 }
273
274                 break;
275         }
276
277         case LCFG_PARAM: {
278                 struct lprocfs_static_vars  v = { 0 };
279                 struct obd_device         *obd = lod2obd(lod);
280
281                 lprocfs_lod_init_vars(&v);
282
283                 rc = class_process_proc_param(PARAM_LOV, v.obd_vars, lcfg, obd);
284                 if (rc > 0)
285                         rc = 0;
286                 GOTO(out, rc);
287         }
288         case LCFG_CLEANUP:
289                 lu_dev_del_linkage(dev->ld_site, dev);
290                 lod_cleanup_desc_tgts(env, lod, &lod->lod_mdt_descs, lcfg);
291                 lod_cleanup_desc_tgts(env, lod, &lod->lod_ost_descs, lcfg);
292                 /*
293                  * do cleanup on underlying storage only when
294                  * all OSPs are cleaned up, as they use that OSD as well
295                  */
296                 next = &lod->lod_child->dd_lu_dev;
297                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
298                 if (rc)
299                         CERROR("%s: can't process %u: %d\n",
300                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
301
302                 rc = obd_disconnect(lod->lod_child_exp);
303                 if (rc)
304                         CERROR("error in disconnect from storage: %d\n", rc);
305                 break;
306
307         default:
308                CERROR("%s: unknown command %u\n", lod2obd(lod)->obd_name,
309                       lcfg->lcfg_command);
310                rc = -EINVAL;
311                break;
312         }
313
314 out:
315         RETURN(rc);
316 }
317
318 static int lod_recovery_complete(const struct lu_env *env,
319                                  struct lu_device *dev)
320 {
321         struct lod_device   *lod = lu2lod_dev(dev);
322         struct lu_device    *next = &lod->lod_child->dd_lu_dev;
323         int                  i, rc;
324         ENTRY;
325
326         LASSERT(lod->lod_recovery_completed == 0);
327         lod->lod_recovery_completed = 1;
328
329         rc = next->ld_ops->ldo_recovery_complete(env, next);
330
331         lod_getref(&lod->lod_ost_descs);
332         if (lod->lod_osts_size > 0) {
333                 cfs_foreach_bit(lod->lod_ost_bitmap, i) {
334                         struct lod_tgt_desc *tgt;
335                         tgt = OST_TGT(lod, i);
336                         LASSERT(tgt && tgt->ltd_tgt);
337                         next = &tgt->ltd_ost->dd_lu_dev;
338                         rc = next->ld_ops->ldo_recovery_complete(env, next);
339                         if (rc)
340                                 CERROR("%s: can't complete recovery on #%d:"
341                                         "%d\n", lod2obd(lod)->obd_name, i, rc);
342                 }
343         }
344         lod_putref(lod, &lod->lod_ost_descs);
345         RETURN(rc);
346 }
347
348 static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
349                        struct lu_device *cdev)
350 {
351         struct lod_device   *lod = lu2lod_dev(cdev);
352         struct lu_device    *next = &lod->lod_child->dd_lu_dev;
353         int                  rc;
354         ENTRY;
355
356         rc = next->ld_ops->ldo_prepare(env, pdev, next);
357         if (rc != 0) {
358                 CERROR("%s: prepare bottom error: rc = %d\n",
359                        lod2obd(lod)->obd_name, rc);
360                 RETURN(rc);
361         }
362
363         lod->lod_initialized = 1;
364
365         RETURN(rc);
366 }
367
368 const struct lu_device_operations lod_lu_ops = {
369         .ldo_object_alloc       = lod_object_alloc,
370         .ldo_process_config     = lod_process_config,
371         .ldo_recovery_complete  = lod_recovery_complete,
372         .ldo_prepare            = lod_prepare,
373 };
374
375 static int lod_root_get(const struct lu_env *env,
376                         struct dt_device *dev, struct lu_fid *f)
377 {
378         return dt_root_get(env, dt2lod_dev(dev)->lod_child, f);
379 }
380
381 static int lod_statfs(const struct lu_env *env,
382                       struct dt_device *dev, struct obd_statfs *sfs)
383 {
384         return dt_statfs(env, dt2lod_dev(dev)->lod_child, sfs);
385 }
386
387 static struct thandle *lod_trans_create(const struct lu_env *env,
388                                         struct dt_device *dev)
389 {
390         return dt_trans_create(env, dt2lod_dev(dev)->lod_child);
391 }
392
393 static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
394                            struct thandle *th)
395 {
396         return dt_trans_start(env, dt2lod_dev(dev)->lod_child, th);
397 }
398
399 static int lod_trans_stop(const struct lu_env *env, struct thandle *th)
400 {
401         /* XXX: we don't know next device, will be fixed with DNE */
402         return dt_trans_stop(env, th->th_dev, th);
403 }
404
405 static void lod_conf_get(const struct lu_env *env,
406                          const struct dt_device *dev,
407                          struct dt_device_param *param)
408 {
409         dt_conf_get(env, dt2lod_dev((struct dt_device *)dev)->lod_child, param);
410 }
411
412 static int lod_sync(const struct lu_env *env, struct dt_device *dev)
413 {
414         struct lod_device   *lod = dt2lod_dev(dev);
415         struct lod_ost_desc *ost;
416         int                  rc = 0, i;
417         ENTRY;
418
419         lod_getref(&lod->lod_ost_descs);
420         lod_foreach_ost(lod, i) {
421                 ost = OST_TGT(lod, i);
422                 LASSERT(ost && ost->ltd_ost);
423                 rc = dt_sync(env, ost->ltd_ost);
424                 if (rc) {
425                         CERROR("%s: can't sync %u: %d\n",
426                                lod2obd(lod)->obd_name, i, rc);
427                         break;
428                 }
429         }
430         lod_putref(lod, &lod->lod_ost_descs);
431         if (rc == 0)
432                 rc = dt_sync(env, lod->lod_child);
433
434         RETURN(rc);
435 }
436
437 static int lod_ro(const struct lu_env *env, struct dt_device *dev)
438 {
439         return dt_ro(env, dt2lod_dev(dev)->lod_child);
440 }
441
442 static int lod_commit_async(const struct lu_env *env, struct dt_device *dev)
443 {
444         return dt_commit_async(env, dt2lod_dev(dev)->lod_child);
445 }
446
447 static int lod_init_capa_ctxt(const struct lu_env *env, struct dt_device *dev,
448                               int mode, unsigned long timeout,
449                               __u32 alg, struct lustre_capa_key *keys)
450 {
451         struct dt_device *next = dt2lod_dev(dev)->lod_child;
452         return dt_init_capa_ctxt(env, next, mode, timeout, alg, keys);
453 }
454
455 static const struct dt_device_operations lod_dt_ops = {
456         .dt_root_get         = lod_root_get,
457         .dt_statfs           = lod_statfs,
458         .dt_trans_create     = lod_trans_create,
459         .dt_trans_start      = lod_trans_start,
460         .dt_trans_stop       = lod_trans_stop,
461         .dt_conf_get         = lod_conf_get,
462         .dt_sync             = lod_sync,
463         .dt_ro               = lod_ro,
464         .dt_commit_async     = lod_commit_async,
465         .dt_init_capa_ctxt   = lod_init_capa_ctxt,
466 };
467
468 static int lod_connect_to_osd(const struct lu_env *env, struct lod_device *lod,
469                               struct lustre_cfg *cfg)
470 {
471         struct obd_connect_data *data = NULL;
472         struct obd_device       *obd;
473         char                    *nextdev = NULL, *p, *s;
474         int                      rc, len = 0;
475         ENTRY;
476
477         LASSERT(cfg);
478         LASSERT(lod->lod_child_exp == NULL);
479
480         /* compatibility hack: we still use old config logs
481          * which specify LOV, but we need to learn underlying
482          * OSD device, which is supposed to be:
483          *  <fsname>-MDTxxxx-osd
484          *
485          * 2.x MGS generates lines like the following:
486          *   #03 (176)lov_setup 0:lustre-MDT0000-mdtlov  1:(struct lov_desc)
487          * 1.8 MGS generates lines like the following:
488          *   #03 (168)lov_setup 0:lustre-mdtlov  1:(struct lov_desc)
489          *
490          * we use "-MDT" to differentiate 2.x from 1.8 */
491
492         if ((p = lustre_cfg_string(cfg, 0)) && strstr(p, "-mdtlov")) {
493                 len = strlen(p) + 1;
494                 OBD_ALLOC(nextdev, len);
495                 if (nextdev == NULL)
496                         GOTO(out, rc = -ENOMEM);
497
498                 strcpy(nextdev, p);
499                 s = strstr(nextdev, "-mdtlov");
500                 if (unlikely(s == NULL)) {
501                         CERROR("unable to parse device name %s\n",
502                                lustre_cfg_string(cfg, 0));
503                         GOTO(out, rc = -EINVAL);
504                 }
505
506                 if (strstr(nextdev, "-MDT")) {
507                         /* 2.x config */
508                         strcpy(s, "-osd");
509                 } else {
510                         /* 1.8 config */
511                         strcpy(s, "-MDT0000-osd");
512                 }
513         } else {
514                 CERROR("unable to parse device name %s\n",
515                        lustre_cfg_string(cfg, 0));
516                 GOTO(out, rc = -EINVAL);
517         }
518
519         OBD_ALLOC_PTR(data);
520         if (data == NULL)
521                 GOTO(out, rc = -ENOMEM);
522
523         obd = class_name2obd(nextdev);
524         if (obd == NULL) {
525                 CERROR("can not locate next device: %s\n", nextdev);
526                 GOTO(out, rc = -ENOTCONN);
527         }
528
529         data->ocd_connect_flags = OBD_CONNECT_VERSION;
530         data->ocd_version = LUSTRE_VERSION_CODE;
531
532         rc = obd_connect(env, &lod->lod_child_exp, obd, &obd->obd_uuid,
533                          data, NULL);
534         if (rc) {
535                 CERROR("cannot connect to next dev %s (%d)\n", nextdev, rc);
536                 GOTO(out, rc);
537         }
538
539         lod->lod_dt_dev.dd_lu_dev.ld_site =
540                 lod->lod_child_exp->exp_obd->obd_lu_dev->ld_site;
541         LASSERT(lod->lod_dt_dev.dd_lu_dev.ld_site);
542         lod->lod_child = lu2dt_dev(lod->lod_child_exp->exp_obd->obd_lu_dev);
543
544 out:
545         if (data)
546                 OBD_FREE_PTR(data);
547         if (nextdev)
548                 OBD_FREE(nextdev, len);
549         RETURN(rc);
550 }
551
552 static int lod_tgt_desc_init(struct lod_tgt_descs *ltd)
553 {
554         mutex_init(&ltd->ltd_mutex);
555         init_rwsem(&ltd->ltd_rw_sem);
556
557         /* the OST array and bitmap are allocated/grown dynamically as OSTs are
558          * added to the LOD, see lod_add_device() */
559         ltd->ltd_tgt_bitmap = CFS_ALLOCATE_BITMAP(32);
560         if (ltd->ltd_tgt_bitmap == NULL)
561                 RETURN(-ENOMEM);
562
563         ltd->ltd_tgts_size  = 32;
564         ltd->ltd_tgtnr      = 0;
565
566         ltd->ltd_death_row = 0;
567         ltd->ltd_refcount  = 0;
568         return 0;
569 }
570
571 static int lod_init0(const struct lu_env *env, struct lod_device *lod,
572                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
573 {
574         struct dt_device_param ddp;
575         struct obd_device     *obd;
576         int                    rc;
577         ENTRY;
578
579         obd = class_name2obd(lustre_cfg_string(cfg, 0));
580         if (obd == NULL) {
581                 CERROR("Cannot find obd with name %s\n",
582                        lustre_cfg_string(cfg, 0));
583                 RETURN(-ENODEV);
584         }
585
586         obd->obd_lu_dev = &lod->lod_dt_dev.dd_lu_dev;
587         lod->lod_dt_dev.dd_lu_dev.ld_obd = obd;
588         lod->lod_dt_dev.dd_lu_dev.ld_ops = &lod_lu_ops;
589         lod->lod_dt_dev.dd_ops = &lod_dt_ops;
590
591         rc = lod_connect_to_osd(env, lod, cfg);
592         if (rc)
593                 RETURN(rc);
594
595         dt_conf_get(env, &lod->lod_dt_dev, &ddp);
596         lod->lod_osd_max_easize = ddp.ddp_max_ea_size;
597
598         /* setup obd to be used with old lov code */
599         rc = lod_pools_init(lod, cfg);
600         if (rc)
601                 GOTO(out_disconnect, rc);
602
603         rc = lod_procfs_init(lod);
604         if (rc)
605                 GOTO(out_pools, rc);
606
607         spin_lock_init(&lod->lod_desc_lock);
608         spin_lock_init(&lod->lod_connects_lock);
609         lod_tgt_desc_init(&lod->lod_mdt_descs);
610         lod_tgt_desc_init(&lod->lod_ost_descs);
611
612         RETURN(0);
613
614 out_pools:
615         lod_pools_fini(lod);
616 out_disconnect:
617         obd_disconnect(lod->lod_child_exp);
618         RETURN(rc);
619 }
620
621 static struct lu_device *lod_device_free(const struct lu_env *env,
622                                          struct lu_device *lu)
623 {
624         struct lod_device *lod = lu2lod_dev(lu);
625         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
626         ENTRY;
627
628         LASSERT(cfs_atomic_read(&lu->ld_ref) == 0);
629         dt_device_fini(&lod->lod_dt_dev);
630         OBD_FREE_PTR(lod);
631         RETURN(next);
632 }
633
634 static struct lu_device *lod_device_alloc(const struct lu_env *env,
635                                           struct lu_device_type *type,
636                                           struct lustre_cfg *lcfg)
637 {
638         struct lod_device *lod;
639         struct lu_device  *lu_dev;
640
641         OBD_ALLOC_PTR(lod);
642         if (lod == NULL) {
643                 lu_dev = ERR_PTR(-ENOMEM);
644         } else {
645                 int rc;
646
647                 lu_dev = lod2lu_dev(lod);
648                 dt_device_init(&lod->lod_dt_dev, type);
649                 rc = lod_init0(env, lod, type, lcfg);
650                 if (rc != 0) {
651                         lod_device_free(env, lu_dev);
652                         lu_dev = ERR_PTR(rc);
653                 }
654         }
655
656         return lu_dev;
657 }
658
659 static struct lu_device *lod_device_fini(const struct lu_env *env,
660                                          struct lu_device *d)
661 {
662         struct lod_device *lod = lu2lod_dev(d);
663         int                rc;
664         ENTRY;
665
666         lod_pools_fini(lod);
667
668         lod_procfs_fini(lod);
669
670         rc = lod_fini_tgt(lod, &lod->lod_ost_descs);
671         if (rc)
672                 CERROR("%s:can not fini ost descs %d\n",
673                         lod2obd(lod)->obd_name, rc);
674
675         rc = lod_fini_tgt(lod, &lod->lod_mdt_descs);
676         if (rc)
677                 CERROR("%s:can not fini mdt descs %d\n",
678                         lod2obd(lod)->obd_name, rc);
679
680         RETURN(NULL);
681 }
682
683 /*
684  * we use exports to track all LOD users
685  */
686 static int lod_obd_connect(const struct lu_env *env, struct obd_export **exp,
687                            struct obd_device *obd, struct obd_uuid *cluuid,
688                            struct obd_connect_data *data, void *localdata)
689 {
690         struct lod_device    *lod = lu2lod_dev(obd->obd_lu_dev);
691         struct lustre_handle  conn;
692         int                   rc;
693         ENTRY;
694
695         CDEBUG(D_CONFIG, "connect #%d\n", lod->lod_connects);
696
697         rc = class_connect(&conn, obd, cluuid);
698         if (rc)
699                 RETURN(rc);
700
701         *exp = class_conn2export(&conn);
702
703         spin_lock(&lod->lod_connects_lock);
704         lod->lod_connects++;
705         /* at the moment we expect the only user */
706         LASSERT(lod->lod_connects == 1);
707         spin_unlock(&lod->lod_connects_lock);
708
709         RETURN(0);
710 }
711
712 /*
713  * once last export (we don't count self-export) disappeared
714  * lod can be released
715  */
716 static int lod_obd_disconnect(struct obd_export *exp)
717 {
718         struct obd_device *obd = exp->exp_obd;
719         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
720         int                rc, release = 0;
721         ENTRY;
722
723         /* Only disconnect the underlying layers on the final disconnect. */
724         spin_lock(&lod->lod_connects_lock);
725         lod->lod_connects--;
726         if (lod->lod_connects != 0) {
727                 /* why should there be more than 1 connect? */
728                 spin_unlock(&lod->lod_connects_lock);
729                 CERROR("%s: disconnect #%d\n", exp->exp_obd->obd_name,
730                        lod->lod_connects);
731                 goto out;
732         }
733         spin_unlock(&lod->lod_connects_lock);
734
735         /* the last user of lod has gone, let's release the device */
736         release = 1;
737
738 out:
739         rc = class_disconnect(exp); /* bz 9811 */
740
741         if (rc == 0 && release)
742                 class_manual_cleanup(obd);
743         RETURN(rc);
744 }
745
746 LU_KEY_INIT(lod, struct lod_thread_info);
747
748 static void lod_key_fini(const struct lu_context *ctx,
749                 struct lu_context_key *key, void *data)
750 {
751         struct lod_thread_info *info = data;
752         /* allocated in lod_get_lov_ea
753          * XXX: this is overload, a tread may have such store but used only
754          * once. Probably better would be pool of such stores per LOD.
755          */
756         if (info->lti_ea_store) {
757                 OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size);
758                 info->lti_ea_store = NULL;
759                 info->lti_ea_store_size = 0;
760         }
761         OBD_FREE_PTR(info);
762 }
763
764 /* context key: lod_thread_key */
765 LU_CONTEXT_KEY_DEFINE(lod, LCT_MD_THREAD);
766
767 LU_TYPE_INIT_FINI(lod, &lod_thread_key);
768
769 static struct lu_device_type_operations lod_device_type_ops = {
770         .ldto_init           = lod_type_init,
771         .ldto_fini           = lod_type_fini,
772
773         .ldto_start          = lod_type_start,
774         .ldto_stop           = lod_type_stop,
775
776         .ldto_device_alloc   = lod_device_alloc,
777         .ldto_device_free    = lod_device_free,
778
779         .ldto_device_fini    = lod_device_fini
780 };
781
782 static struct lu_device_type lod_device_type = {
783         .ldt_tags     = LU_DEVICE_DT,
784         .ldt_name     = LUSTRE_LOD_NAME,
785         .ldt_ops      = &lod_device_type_ops,
786         .ldt_ctx_tags = LCT_MD_THREAD,
787 };
788
789 static int lod_obd_health_check(const struct lu_env *env,
790                 struct obd_device *obd)
791 {
792         struct lod_device   *d = lu2lod_dev(obd->obd_lu_dev);
793         struct lod_ost_desc *ost;
794         int                  i, rc = 1;
795         ENTRY;
796
797         LASSERT(d);
798         lod_getref(&d->lod_ost_descs);
799         lod_foreach_ost(d, i) {
800                 ost = OST_TGT(d, i);
801                 LASSERT(ost && ost->ltd_ost);
802                 rc = obd_health_check(env, ost->ltd_exp->exp_obd);
803                 /* one healthy device is enough */
804                 if (rc == 0)
805                         break;
806         }
807         lod_putref(d, &d->lod_ost_descs);
808         RETURN(rc);
809 }
810
811 static struct obd_ops lod_obd_device_ops = {
812         .o_owner        = THIS_MODULE,
813         .o_connect      = lod_obd_connect,
814         .o_disconnect   = lod_obd_disconnect,
815         .o_health_check = lod_obd_health_check,
816         .o_pool_new     = lod_pool_new,
817         .o_pool_rem     = lod_pool_remove,
818         .o_pool_add     = lod_pool_add,
819         .o_pool_del     = lod_pool_del,
820 };
821
822 static int __init lod_mod_init(void)
823 {
824         struct lprocfs_static_vars  lvars = { 0 };
825         cfs_proc_dir_entry_t       *lov_proc_dir;
826         int                         rc;
827
828         rc = lu_kmem_init(lod_caches);
829         if (rc)
830                 return rc;
831
832         lprocfs_lod_init_vars(&lvars);
833
834         rc = class_register_type(&lod_obd_device_ops, NULL, lvars.module_vars,
835                                  LUSTRE_LOD_NAME, &lod_device_type);
836         if (rc) {
837                 lu_kmem_fini(lod_caches);
838                 return rc;
839         }
840
841         /* create "lov" entry in procfs for compatibility purposes */
842         lov_proc_dir = lprocfs_srch(proc_lustre_root, "lov");
843         if (lov_proc_dir == NULL) {
844                 lov_proc_dir = lprocfs_register("lov", proc_lustre_root,
845                                                 NULL, NULL);
846                 if (IS_ERR(lov_proc_dir))
847                         CERROR("lod: can't create compat entry \"lov\": %d\n",
848                                (int)PTR_ERR(lov_proc_dir));
849         }
850
851         return rc;
852 }
853
854 static void __exit lod_mod_exit(void)
855 {
856
857         lprocfs_try_remove_proc_entry("lov", proc_lustre_root);
858
859         class_unregister_type(LUSTRE_LOD_NAME);
860         lu_kmem_fini(lod_caches);
861 }
862
863 MODULE_AUTHOR("Whamcloud, Inc. <http://www.whamcloud.com/>");
864 MODULE_DESCRIPTION("Lustre Logical Object Device ("LUSTRE_LOD_NAME")");
865 MODULE_LICENSE("GPL");
866
867 module_init(lod_mod_init);
868 module_exit(lod_mod_exit);
869