Whamcloud - gitweb
LU-1187 lod: reorganize lod_ost
[fs/lustre-release.git] / lustre / lod / lod_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, Intel Corporation.
27  *
28  */
29 /*
30  * This file is part of Lustre, http://www.lustre.org/
31  * Lustre is a trademark of Sun Microsystems, Inc.
32  *
33  * lustre/lod/lod_dev.c
34  *
35  * Lustre Logical Object Device
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mikhail Pershin <mike.pershin@intel.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <obd_class.h>
47 #include <lustre_param.h>
48
49 #include "lod_internal.h"
50
51 /**
52  * Lookup MDT/OST index \a tgt by FID \a fid.
53  *
54  * \param lod LOD to be lookup at.
55  * \param fid FID of object to find MDT/OST.
56  * \param tgt MDT/OST index to return.
57  * \param flags indidcate the FID is on MDS or OST.
58  **/
59 int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
60                    const struct lu_fid *fid, __u32 *tgt, int flags)
61 {
62         struct lu_seq_range     range;
63         struct lu_server_fld    *server_fld;
64         int rc = 0;
65         ENTRY;
66
67         LASSERTF(fid_is_sane(fid), "Invalid FID "DFID"\n", PFID(fid));
68         if (fid_is_idif(fid)) {
69                 *tgt = fid_idif_ost_idx(fid);
70                 RETURN(rc);
71         }
72
73         if (!lod->lod_initialized || !fid_is_norm(fid)) {
74                 LASSERT(lu_site2seq(lod2lu_dev(lod)->ld_site) != NULL);
75                 *tgt = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
76                 RETURN(rc);
77         }
78
79         server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld;
80         range.lsr_flags = flags;
81         rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range);
82         if (rc) {
83                 CERROR("%s: Can't find tgt by seq "LPX64", rc %d\n",
84                        lod2obd(lod)->obd_name, fid_seq(fid), rc);
85                 RETURN(rc);
86         }
87
88         *tgt = range.lsr_index;
89
90         CDEBUG(D_INFO, "LOD: got tgt %x for sequence: "
91                LPX64"\n", *tgt, fid_seq(fid));
92
93         RETURN(rc);
94 }
95
96 extern struct lu_object_operations lod_lu_obj_ops;
97 extern struct dt_object_operations lod_obj_ops;
98
99 /* Slab for OSD object allocation */
100 cfs_mem_cache_t *lod_object_kmem;
101
102 static struct lu_kmem_descr lod_caches[] = {
103         {
104                 .ckd_cache = &lod_object_kmem,
105                 .ckd_name  = "lod_obj",
106                 .ckd_size  = sizeof(struct lod_object)
107         },
108         {
109                 .ckd_cache = NULL
110         }
111 };
112
113 static struct lu_device *lod_device_fini(const struct lu_env *env,
114                                          struct lu_device *d);
115
116 struct lu_object *lod_object_alloc(const struct lu_env *env,
117                                    const struct lu_object_header *hdr,
118                                    struct lu_device *dev)
119 {
120         struct lu_object  *lu_obj;
121         struct lod_object *lo;
122
123         OBD_SLAB_ALLOC_PTR_GFP(lo, lod_object_kmem, CFS_ALLOC_IO);
124         if (lo == NULL)
125                 return NULL;
126
127         lu_obj = lod2lu_obj(lo);
128         dt_object_init(&lo->ldo_obj, NULL, dev);
129         lo->ldo_obj.do_ops = &lod_obj_ops;
130         lu_obj->lo_ops = &lod_lu_obj_ops;
131
132         return lu_obj;
133 }
134
135 static int lod_cleanup_desc_tgts(const struct lu_env *env,
136                                  struct lod_device *lod,
137                                  struct lod_tgt_descs *ltd,
138                                  struct lustre_cfg *lcfg)
139 {
140         struct lu_device  *next;
141         int rc = 0;
142         int i;
143
144         lod_getref(ltd);
145         if (ltd->ltd_tgts_size <= 0) {
146                 lod_putref(lod, ltd);
147                 return 0;
148         }
149         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
150                 struct lod_tgt_desc *tgt;
151                 int rc1;
152
153                 tgt = LTD_TGT(ltd, i);
154                 LASSERT(tgt && tgt->ltd_tgt);
155                 next = &tgt->ltd_tgt->dd_lu_dev;
156                 rc1 = next->ld_ops->ldo_process_config(env, next, lcfg);
157                 if (rc1) {
158                         CERROR("%s: error cleaning up LOD index %u: cmd %#x"
159                                ": rc = %d\n", lod2obd(lod)->obd_name, i,
160                                lcfg->lcfg_command, rc1);
161                         rc = rc1;
162                 }
163         }
164         lod_putref(lod, ltd);
165         return rc;
166 }
167
168 static int lod_process_config(const struct lu_env *env,
169                               struct lu_device *dev,
170                               struct lustre_cfg *lcfg)
171 {
172         struct lod_device *lod = lu2lod_dev(dev);
173         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
174         char              *arg1;
175         int                rc;
176         ENTRY;
177
178         switch(lcfg->lcfg_command) {
179         case LCFG_LOV_DEL_OBD:
180         case LCFG_LOV_ADD_INA:
181         case LCFG_LOV_ADD_OBD: {
182                 __u32 index;
183                 int gen;
184                 /* lov_modify_tgts add  0:lov_mdsA  1:osp  2:0  3:1 */
185                 arg1 = lustre_cfg_string(lcfg, 1);
186
187                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
188                         GOTO(out, rc = -EINVAL);
189                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
190                         GOTO(out, rc = -EINVAL);
191
192                 if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD)
193                         rc = lod_add_device(env, lod, arg1, index, gen, 1);
194                 else if (lcfg->lcfg_command == LCFG_LOV_ADD_INA)
195                         rc = lod_add_device(env, lod, arg1, index, gen, 0);
196                 else
197                         rc = lod_del_device(env, lod,
198                                             &lod->lod_ost_descs,
199                                             arg1, index, gen);
200
201                 break;
202         }
203
204         case LCFG_PARAM: {
205                 struct lprocfs_static_vars  v = { 0 };
206                 struct obd_device         *obd = lod2obd(lod);
207
208                 lprocfs_lod_init_vars(&v);
209
210                 rc = class_process_proc_param(PARAM_LOV, v.obd_vars, lcfg, obd);
211                 if (rc > 0)
212                         rc = 0;
213                 GOTO(out, rc);
214         }
215         case LCFG_CLEANUP:
216                 lu_dev_del_linkage(dev->ld_site, dev);
217                 lod_cleanup_desc_tgts(env, lod, &lod->lod_mdt_descs, lcfg);
218                 lod_cleanup_desc_tgts(env, lod, &lod->lod_ost_descs, lcfg);
219                 /*
220                  * do cleanup on underlying storage only when
221                  * all OSPs are cleaned up, as they use that OSD as well
222                  */
223                 next = &lod->lod_child->dd_lu_dev;
224                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
225                 if (rc)
226                         CERROR("%s: can't process %u: %d\n",
227                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
228
229                 rc = obd_disconnect(lod->lod_child_exp);
230                 if (rc)
231                         CERROR("error in disconnect from storage: %d\n", rc);
232                 break;
233
234         default:
235                CERROR("%s: unknown command %u\n", lod2obd(lod)->obd_name,
236                       lcfg->lcfg_command);
237                rc = -EINVAL;
238                break;
239         }
240
241 out:
242         RETURN(rc);
243 }
244
245 static int lod_recovery_complete(const struct lu_env *env,
246                                  struct lu_device *dev)
247 {
248         struct lod_device   *lod = lu2lod_dev(dev);
249         struct lu_device    *next = &lod->lod_child->dd_lu_dev;
250         int                  i, rc;
251         ENTRY;
252
253         LASSERT(lod->lod_recovery_completed == 0);
254         lod->lod_recovery_completed = 1;
255
256         rc = next->ld_ops->ldo_recovery_complete(env, next);
257
258         lod_getref(&lod->lod_ost_descs);
259         if (lod->lod_osts_size > 0) {
260                 cfs_foreach_bit(lod->lod_ost_bitmap, i) {
261                         struct lod_tgt_desc *tgt;
262                         tgt = OST_TGT(lod, i);
263                         LASSERT(tgt && tgt->ltd_tgt);
264                         next = &tgt->ltd_ost->dd_lu_dev;
265                         rc = next->ld_ops->ldo_recovery_complete(env, next);
266                         if (rc)
267                                 CERROR("%s: can't complete recovery on #%d:"
268                                         "%d\n", lod2obd(lod)->obd_name, i, rc);
269                 }
270         }
271         lod_putref(lod, &lod->lod_ost_descs);
272         RETURN(rc);
273 }
274
275 static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
276                        struct lu_device *cdev)
277 {
278         struct lod_device   *lod = lu2lod_dev(cdev);
279         struct lu_device    *next = &lod->lod_child->dd_lu_dev;
280         int                  rc;
281         ENTRY;
282
283         rc = next->ld_ops->ldo_prepare(env, pdev, next);
284         if (rc != 0) {
285                 CERROR("%s: prepare bottom error: rc = %d\n",
286                        lod2obd(lod)->obd_name, rc);
287                 RETURN(rc);
288         }
289
290         lod->lod_initialized = 1;
291
292         RETURN(rc);
293 }
294
295 const struct lu_device_operations lod_lu_ops = {
296         .ldo_object_alloc       = lod_object_alloc,
297         .ldo_process_config     = lod_process_config,
298         .ldo_recovery_complete  = lod_recovery_complete,
299         .ldo_prepare            = lod_prepare,
300 };
301
302 static int lod_root_get(const struct lu_env *env,
303                         struct dt_device *dev, struct lu_fid *f)
304 {
305         return dt_root_get(env, dt2lod_dev(dev)->lod_child, f);
306 }
307
308 static int lod_statfs(const struct lu_env *env,
309                       struct dt_device *dev, struct obd_statfs *sfs)
310 {
311         return dt_statfs(env, dt2lod_dev(dev)->lod_child, sfs);
312 }
313
314 static struct thandle *lod_trans_create(const struct lu_env *env,
315                                         struct dt_device *dev)
316 {
317         return dt_trans_create(env, dt2lod_dev(dev)->lod_child);
318 }
319
320 static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
321                            struct thandle *th)
322 {
323         return dt_trans_start(env, dt2lod_dev(dev)->lod_child, th);
324 }
325
326 static int lod_trans_stop(const struct lu_env *env, struct thandle *th)
327 {
328         /* XXX: we don't know next device, will be fixed with DNE */
329         return dt_trans_stop(env, th->th_dev, th);
330 }
331
332 static void lod_conf_get(const struct lu_env *env,
333                          const struct dt_device *dev,
334                          struct dt_device_param *param)
335 {
336         dt_conf_get(env, dt2lod_dev((struct dt_device *)dev)->lod_child, param);
337 }
338
339 static int lod_sync(const struct lu_env *env, struct dt_device *dev)
340 {
341         struct lod_device   *lod = dt2lod_dev(dev);
342         struct lod_ost_desc *ost;
343         int                  rc = 0, i;
344         ENTRY;
345
346         lod_getref(&lod->lod_ost_descs);
347         lod_foreach_ost(lod, i) {
348                 ost = OST_TGT(lod, i);
349                 LASSERT(ost && ost->ltd_ost);
350                 rc = dt_sync(env, ost->ltd_ost);
351                 if (rc) {
352                         CERROR("%s: can't sync %u: %d\n",
353                                lod2obd(lod)->obd_name, i, rc);
354                         break;
355                 }
356         }
357         lod_putref(lod, &lod->lod_ost_descs);
358         if (rc == 0)
359                 rc = dt_sync(env, lod->lod_child);
360
361         RETURN(rc);
362 }
363
364 static int lod_ro(const struct lu_env *env, struct dt_device *dev)
365 {
366         return dt_ro(env, dt2lod_dev(dev)->lod_child);
367 }
368
369 static int lod_commit_async(const struct lu_env *env, struct dt_device *dev)
370 {
371         return dt_commit_async(env, dt2lod_dev(dev)->lod_child);
372 }
373
374 static int lod_init_capa_ctxt(const struct lu_env *env, struct dt_device *dev,
375                               int mode, unsigned long timeout,
376                               __u32 alg, struct lustre_capa_key *keys)
377 {
378         struct dt_device *next = dt2lod_dev(dev)->lod_child;
379         return dt_init_capa_ctxt(env, next, mode, timeout, alg, keys);
380 }
381
382 static const struct dt_device_operations lod_dt_ops = {
383         .dt_root_get         = lod_root_get,
384         .dt_statfs           = lod_statfs,
385         .dt_trans_create     = lod_trans_create,
386         .dt_trans_start      = lod_trans_start,
387         .dt_trans_stop       = lod_trans_stop,
388         .dt_conf_get         = lod_conf_get,
389         .dt_sync             = lod_sync,
390         .dt_ro               = lod_ro,
391         .dt_commit_async     = lod_commit_async,
392         .dt_init_capa_ctxt   = lod_init_capa_ctxt,
393 };
394
395 static int lod_connect_to_osd(const struct lu_env *env, struct lod_device *lod,
396                               struct lustre_cfg *cfg)
397 {
398         struct obd_connect_data *data = NULL;
399         struct obd_device       *obd;
400         char                    *nextdev = NULL, *p, *s;
401         int                      rc, len = 0;
402         ENTRY;
403
404         LASSERT(cfg);
405         LASSERT(lod->lod_child_exp == NULL);
406
407         /* compatibility hack: we still use old config logs
408          * which specify LOV, but we need to learn underlying
409          * OSD device, which is supposed to be:
410          *  <fsname>-MDTxxxx-osd
411          *
412          * 2.x MGS generates lines like the following:
413          *   #03 (176)lov_setup 0:lustre-MDT0000-mdtlov  1:(struct lov_desc)
414          * 1.8 MGS generates lines like the following:
415          *   #03 (168)lov_setup 0:lustre-mdtlov  1:(struct lov_desc)
416          *
417          * we use "-MDT" to differentiate 2.x from 1.8 */
418
419         if ((p = lustre_cfg_string(cfg, 0)) && strstr(p, "-mdtlov")) {
420                 len = strlen(p) + 1;
421                 OBD_ALLOC(nextdev, len);
422                 if (nextdev == NULL)
423                         GOTO(out, rc = -ENOMEM);
424
425                 strcpy(nextdev, p);
426                 s = strstr(nextdev, "-mdtlov");
427                 if (unlikely(s == NULL)) {
428                         CERROR("unable to parse device name %s\n",
429                                lustre_cfg_string(cfg, 0));
430                         GOTO(out, rc = -EINVAL);
431                 }
432
433                 if (strstr(nextdev, "-MDT")) {
434                         /* 2.x config */
435                         strcpy(s, "-osd");
436                 } else {
437                         /* 1.8 config */
438                         strcpy(s, "-MDT0000-osd");
439                 }
440         } else {
441                 CERROR("unable to parse device name %s\n",
442                        lustre_cfg_string(cfg, 0));
443                 GOTO(out, rc = -EINVAL);
444         }
445
446         OBD_ALLOC_PTR(data);
447         if (data == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         obd = class_name2obd(nextdev);
451         if (obd == NULL) {
452                 CERROR("can not locate next device: %s\n", nextdev);
453                 GOTO(out, rc = -ENOTCONN);
454         }
455
456         data->ocd_connect_flags = OBD_CONNECT_VERSION;
457         data->ocd_version = LUSTRE_VERSION_CODE;
458
459         rc = obd_connect(env, &lod->lod_child_exp, obd, &obd->obd_uuid,
460                          data, NULL);
461         if (rc) {
462                 CERROR("cannot connect to next dev %s (%d)\n", nextdev, rc);
463                 GOTO(out, rc);
464         }
465
466         lod->lod_dt_dev.dd_lu_dev.ld_site =
467                 lod->lod_child_exp->exp_obd->obd_lu_dev->ld_site;
468         LASSERT(lod->lod_dt_dev.dd_lu_dev.ld_site);
469         lod->lod_child = lu2dt_dev(lod->lod_child_exp->exp_obd->obd_lu_dev);
470
471 out:
472         if (data)
473                 OBD_FREE_PTR(data);
474         if (nextdev)
475                 OBD_FREE(nextdev, len);
476         RETURN(rc);
477 }
478
479 static int lod_tgt_desc_init(struct lod_tgt_descs *ltd)
480 {
481         mutex_init(&ltd->ltd_mutex);
482         init_rwsem(&ltd->ltd_rw_sem);
483
484         /* the OST array and bitmap are allocated/grown dynamically as OSTs are
485          * added to the LOD, see lod_add_device() */
486         ltd->ltd_tgt_bitmap = CFS_ALLOCATE_BITMAP(32);
487         if (ltd->ltd_tgt_bitmap == NULL)
488                 RETURN(-ENOMEM);
489
490         ltd->ltd_tgts_size  = 32;
491         ltd->ltd_tgtnr      = 0;
492
493         ltd->ltd_death_row = 0;
494         ltd->ltd_refcount  = 0;
495         return 0;
496 }
497
498 static int lod_init0(const struct lu_env *env, struct lod_device *lod,
499                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
500 {
501         struct dt_device_param ddp;
502         struct obd_device     *obd;
503         int                    rc;
504         ENTRY;
505
506         obd = class_name2obd(lustre_cfg_string(cfg, 0));
507         if (obd == NULL) {
508                 CERROR("Cannot find obd with name %s\n",
509                        lustre_cfg_string(cfg, 0));
510                 RETURN(-ENODEV);
511         }
512
513         obd->obd_lu_dev = &lod->lod_dt_dev.dd_lu_dev;
514         lod->lod_dt_dev.dd_lu_dev.ld_obd = obd;
515         lod->lod_dt_dev.dd_lu_dev.ld_ops = &lod_lu_ops;
516         lod->lod_dt_dev.dd_ops = &lod_dt_ops;
517
518         rc = lod_connect_to_osd(env, lod, cfg);
519         if (rc)
520                 RETURN(rc);
521
522         dt_conf_get(env, &lod->lod_dt_dev, &ddp);
523         lod->lod_osd_max_easize = ddp.ddp_max_ea_size;
524
525         /* setup obd to be used with old lov code */
526         rc = lod_pools_init(lod, cfg);
527         if (rc)
528                 GOTO(out_disconnect, rc);
529
530         rc = lod_procfs_init(lod);
531         if (rc)
532                 GOTO(out_pools, rc);
533
534         spin_lock_init(&lod->lod_desc_lock);
535         spin_lock_init(&lod->lod_connects_lock);
536         lod_tgt_desc_init(&lod->lod_mdt_descs);
537         lod_tgt_desc_init(&lod->lod_ost_descs);
538
539         RETURN(0);
540
541 out_pools:
542         lod_pools_fini(lod);
543 out_disconnect:
544         obd_disconnect(lod->lod_child_exp);
545         RETURN(rc);
546 }
547
548 static struct lu_device *lod_device_free(const struct lu_env *env,
549                                          struct lu_device *lu)
550 {
551         struct lod_device *lod = lu2lod_dev(lu);
552         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
553         ENTRY;
554
555         LASSERT(cfs_atomic_read(&lu->ld_ref) == 0);
556         dt_device_fini(&lod->lod_dt_dev);
557         OBD_FREE_PTR(lod);
558         RETURN(next);
559 }
560
561 static struct lu_device *lod_device_alloc(const struct lu_env *env,
562                                           struct lu_device_type *type,
563                                           struct lustre_cfg *lcfg)
564 {
565         struct lod_device *lod;
566         struct lu_device  *lu_dev;
567
568         OBD_ALLOC_PTR(lod);
569         if (lod == NULL) {
570                 lu_dev = ERR_PTR(-ENOMEM);
571         } else {
572                 int rc;
573
574                 lu_dev = lod2lu_dev(lod);
575                 dt_device_init(&lod->lod_dt_dev, type);
576                 rc = lod_init0(env, lod, type, lcfg);
577                 if (rc != 0) {
578                         lod_device_free(env, lu_dev);
579                         lu_dev = ERR_PTR(rc);
580                 }
581         }
582
583         return lu_dev;
584 }
585
586 static struct lu_device *lod_device_fini(const struct lu_env *env,
587                                          struct lu_device *d)
588 {
589         struct lod_device *lod = lu2lod_dev(d);
590         int                rc;
591         ENTRY;
592
593         lod_pools_fini(lod);
594
595         lod_procfs_fini(lod);
596
597         rc = lod_fini_tgt(lod, &lod->lod_ost_descs);
598         if (rc)
599                 CERROR("%s:can not fini ost descs %d\n",
600                         lod2obd(lod)->obd_name, rc);
601
602         rc = lod_fini_tgt(lod, &lod->lod_mdt_descs);
603         if (rc)
604                 CERROR("%s:can not fini mdt descs %d\n",
605                         lod2obd(lod)->obd_name, rc);
606
607         RETURN(NULL);
608 }
609
610 /*
611  * we use exports to track all LOD users
612  */
613 static int lod_obd_connect(const struct lu_env *env, struct obd_export **exp,
614                            struct obd_device *obd, struct obd_uuid *cluuid,
615                            struct obd_connect_data *data, void *localdata)
616 {
617         struct lod_device    *lod = lu2lod_dev(obd->obd_lu_dev);
618         struct lustre_handle  conn;
619         int                   rc;
620         ENTRY;
621
622         CDEBUG(D_CONFIG, "connect #%d\n", lod->lod_connects);
623
624         rc = class_connect(&conn, obd, cluuid);
625         if (rc)
626                 RETURN(rc);
627
628         *exp = class_conn2export(&conn);
629
630         spin_lock(&lod->lod_connects_lock);
631         lod->lod_connects++;
632         /* at the moment we expect the only user */
633         LASSERT(lod->lod_connects == 1);
634         spin_unlock(&lod->lod_connects_lock);
635
636         RETURN(0);
637 }
638
639 /*
640  * once last export (we don't count self-export) disappeared
641  * lod can be released
642  */
643 static int lod_obd_disconnect(struct obd_export *exp)
644 {
645         struct obd_device *obd = exp->exp_obd;
646         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
647         int                rc, release = 0;
648         ENTRY;
649
650         /* Only disconnect the underlying layers on the final disconnect. */
651         spin_lock(&lod->lod_connects_lock);
652         lod->lod_connects--;
653         if (lod->lod_connects != 0) {
654                 /* why should there be more than 1 connect? */
655                 spin_unlock(&lod->lod_connects_lock);
656                 CERROR("%s: disconnect #%d\n", exp->exp_obd->obd_name,
657                        lod->lod_connects);
658                 goto out;
659         }
660         spin_unlock(&lod->lod_connects_lock);
661
662         /* the last user of lod has gone, let's release the device */
663         release = 1;
664
665 out:
666         rc = class_disconnect(exp); /* bz 9811 */
667
668         if (rc == 0 && release)
669                 class_manual_cleanup(obd);
670         RETURN(rc);
671 }
672
673 LU_KEY_INIT(lod, struct lod_thread_info);
674
675 static void lod_key_fini(const struct lu_context *ctx,
676                 struct lu_context_key *key, void *data)
677 {
678         struct lod_thread_info *info = data;
679         /* allocated in lod_get_lov_ea
680          * XXX: this is overload, a tread may have such store but used only
681          * once. Probably better would be pool of such stores per LOD.
682          */
683         if (info->lti_ea_store) {
684                 OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size);
685                 info->lti_ea_store = NULL;
686                 info->lti_ea_store_size = 0;
687         }
688         OBD_FREE_PTR(info);
689 }
690
691 /* context key: lod_thread_key */
692 LU_CONTEXT_KEY_DEFINE(lod, LCT_MD_THREAD);
693
694 LU_TYPE_INIT_FINI(lod, &lod_thread_key);
695
696 static struct lu_device_type_operations lod_device_type_ops = {
697         .ldto_init           = lod_type_init,
698         .ldto_fini           = lod_type_fini,
699
700         .ldto_start          = lod_type_start,
701         .ldto_stop           = lod_type_stop,
702
703         .ldto_device_alloc   = lod_device_alloc,
704         .ldto_device_free    = lod_device_free,
705
706         .ldto_device_fini    = lod_device_fini
707 };
708
709 static struct lu_device_type lod_device_type = {
710         .ldt_tags     = LU_DEVICE_DT,
711         .ldt_name     = LUSTRE_LOD_NAME,
712         .ldt_ops      = &lod_device_type_ops,
713         .ldt_ctx_tags = LCT_MD_THREAD,
714 };
715
716 static int lod_obd_health_check(const struct lu_env *env,
717                 struct obd_device *obd)
718 {
719         struct lod_device   *d = lu2lod_dev(obd->obd_lu_dev);
720         struct lod_ost_desc *ost;
721         int                  i, rc = 1;
722         ENTRY;
723
724         LASSERT(d);
725         lod_getref(&d->lod_ost_descs);
726         lod_foreach_ost(d, i) {
727                 ost = OST_TGT(d, i);
728                 LASSERT(ost && ost->ltd_ost);
729                 rc = obd_health_check(env, ost->ltd_exp->exp_obd);
730                 /* one healthy device is enough */
731                 if (rc == 0)
732                         break;
733         }
734         lod_putref(d, &d->lod_ost_descs);
735         RETURN(rc);
736 }
737
738 static struct obd_ops lod_obd_device_ops = {
739         .o_owner        = THIS_MODULE,
740         .o_connect      = lod_obd_connect,
741         .o_disconnect   = lod_obd_disconnect,
742         .o_health_check = lod_obd_health_check,
743         .o_pool_new     = lod_pool_new,
744         .o_pool_rem     = lod_pool_remove,
745         .o_pool_add     = lod_pool_add,
746         .o_pool_del     = lod_pool_del,
747 };
748
749 static int __init lod_mod_init(void)
750 {
751         struct lprocfs_static_vars  lvars = { 0 };
752         cfs_proc_dir_entry_t       *lov_proc_dir;
753         int                         rc;
754
755         rc = lu_kmem_init(lod_caches);
756         if (rc)
757                 return rc;
758
759         lprocfs_lod_init_vars(&lvars);
760
761         rc = class_register_type(&lod_obd_device_ops, NULL, lvars.module_vars,
762                                  LUSTRE_LOD_NAME, &lod_device_type);
763         if (rc) {
764                 lu_kmem_fini(lod_caches);
765                 return rc;
766         }
767
768         /* create "lov" entry in procfs for compatibility purposes */
769         lov_proc_dir = lprocfs_srch(proc_lustre_root, "lov");
770         if (lov_proc_dir == NULL) {
771                 lov_proc_dir = lprocfs_register("lov", proc_lustre_root,
772                                                 NULL, NULL);
773                 if (IS_ERR(lov_proc_dir))
774                         CERROR("lod: can't create compat entry \"lov\": %d\n",
775                                (int)PTR_ERR(lov_proc_dir));
776         }
777
778         return rc;
779 }
780
781 static void __exit lod_mod_exit(void)
782 {
783
784         lprocfs_try_remove_proc_entry("lov", proc_lustre_root);
785
786         class_unregister_type(LUSTRE_LOD_NAME);
787         lu_kmem_fini(lod_caches);
788 }
789
790 MODULE_AUTHOR("Whamcloud, Inc. <http://www.whamcloud.com/>");
791 MODULE_DESCRIPTION("Lustre Logical Object Device ("LUSTRE_LOD_NAME")");
792 MODULE_LICENSE("GPL");
793
794 module_init(lod_mod_init);
795 module_exit(lod_mod_exit);
796