Whamcloud - gitweb
LU-1187 lod: Fix config log and setup process for DNE
[fs/lustre-release.git] / lustre / lod / lod_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, Intel Corporation.
27  *
28  */
29 /*
30  * This file is part of Lustre, http://www.lustre.org/
31  * Lustre is a trademark of Sun Microsystems, Inc.
32  *
33  * lustre/lod/lod_dev.c
34  *
35  * Lustre Logical Object Device
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mikhail Pershin <mike.pershin@intel.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <obd_class.h>
47 #include <lustre_param.h>
48
49 #include "lod_internal.h"
50
51 /**
52  * Lookup MDT/OST index \a tgt by FID \a fid.
53  *
54  * \param lod LOD to be lookup at.
55  * \param fid FID of object to find MDT/OST.
56  * \param tgt MDT/OST index to return.
57  * \param flags indidcate the FID is on MDS or OST.
58  **/
59 int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
60                    const struct lu_fid *fid, __u32 *tgt, int flags)
61 {
62         struct lu_seq_range     range;
63         struct lu_server_fld    *server_fld;
64         int rc = 0;
65         ENTRY;
66
67         LASSERTF(fid_is_sane(fid), "Invalid FID "DFID"\n", PFID(fid));
68         if (fid_is_idif(fid)) {
69                 *tgt = fid_idif_ost_idx(fid);
70                 RETURN(rc);
71         }
72
73         if (!lod->lod_initialized || !fid_is_norm(fid)) {
74                 LASSERT(lu_site2seq(lod2lu_dev(lod)->ld_site) != NULL);
75                 *tgt = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
76                 RETURN(rc);
77         }
78
79         server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld;
80         range.lsr_flags = flags;
81         rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range);
82         if (rc) {
83                 CERROR("%s: Can't find tgt by seq "LPX64", rc %d\n",
84                        lod2obd(lod)->obd_name, fid_seq(fid), rc);
85                 RETURN(rc);
86         }
87
88         *tgt = range.lsr_index;
89
90         CDEBUG(D_INFO, "LOD: got tgt %x for sequence: "
91                LPX64"\n", *tgt, fid_seq(fid));
92
93         RETURN(rc);
94 }
95
96 extern struct lu_object_operations lod_lu_obj_ops;
97 extern struct dt_object_operations lod_obj_ops;
98
99 /* Slab for OSD object allocation */
100 cfs_mem_cache_t *lod_object_kmem;
101
102 static struct lu_kmem_descr lod_caches[] = {
103         {
104                 .ckd_cache = &lod_object_kmem,
105                 .ckd_name  = "lod_obj",
106                 .ckd_size  = sizeof(struct lod_object)
107         },
108         {
109                 .ckd_cache = NULL
110         }
111 };
112
113 static struct lu_device *lod_device_fini(const struct lu_env *env,
114                                          struct lu_device *d);
115
116 struct lu_object *lod_object_alloc(const struct lu_env *env,
117                                    const struct lu_object_header *hdr,
118                                    struct lu_device *dev)
119 {
120         struct lu_object  *lu_obj;
121         struct lod_object *lo;
122
123         OBD_SLAB_ALLOC_PTR_GFP(lo, lod_object_kmem, CFS_ALLOC_IO);
124         if (lo == NULL)
125                 return NULL;
126
127         lu_obj = lod2lu_obj(lo);
128         dt_object_init(&lo->ldo_obj, NULL, dev);
129         lo->ldo_obj.do_ops = &lod_obj_ops;
130         lu_obj->lo_ops = &lod_lu_obj_ops;
131
132         return lu_obj;
133 }
134
135 static int lod_cleanup_desc_tgts(const struct lu_env *env,
136                                  struct lod_device *lod,
137                                  struct lod_tgt_descs *ltd,
138                                  struct lustre_cfg *lcfg)
139 {
140         struct lu_device  *next;
141         int rc = 0;
142         int i;
143
144         lod_getref(ltd);
145         if (ltd->ltd_tgts_size <= 0) {
146                 lod_putref(lod, ltd);
147                 return 0;
148         }
149         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
150                 struct lod_tgt_desc *tgt;
151                 int rc1;
152
153                 tgt = LTD_TGT(ltd, i);
154                 LASSERT(tgt && tgt->ltd_tgt);
155                 next = &tgt->ltd_tgt->dd_lu_dev;
156                 rc1 = next->ld_ops->ldo_process_config(env, next, lcfg);
157                 if (rc1) {
158                         CERROR("%s: error cleaning up LOD index %u: cmd %#x"
159                                ": rc = %d\n", lod2obd(lod)->obd_name, i,
160                                lcfg->lcfg_command, rc1);
161                         rc = rc1;
162                 }
163         }
164         lod_putref(lod, ltd);
165         return rc;
166 }
167
168 static int lodname2mdt_index(char *lodname, int *index)
169 {
170         char *ptr, *tmp;
171
172         /* The lodname suppose to be fsname-MDTxxxx-mdtlov */
173         ptr = strrchr(lodname, '-');
174         if (ptr == NULL) {
175                 CERROR("invalid MDT index in '%s'\n", lodname);
176                 return -EINVAL;
177         }
178
179         if (strncmp(ptr, "-mdtlov", 7) != 0) {
180                 CERROR("invalid MDT index in '%s'\n", lodname);
181                 return -EINVAL;
182         }
183
184         if ((unsigned long)ptr - (unsigned long)lodname <= 8) {
185                 CERROR("invalid MDT index in '%s'\n", lodname);
186                 return -EINVAL;
187         }
188
189         if (strncmp(ptr - 8, "-MDT", 4) != 0) {
190                 CERROR("invalid MDT index in '%s'\n", lodname);
191                 return -EINVAL;
192         }
193
194         *index = simple_strtol(ptr - 4, &tmp, 16);
195         if (*tmp != '-' || *index > INT_MAX || *index < 0) {
196                 CERROR("invalid MDT index in '%s'\n", lodname);
197                 return -EINVAL;
198         }
199         return 0;
200 }
201
202 static int lod_process_config(const struct lu_env *env,
203                               struct lu_device *dev,
204                               struct lustre_cfg *lcfg)
205 {
206         struct lod_device *lod = lu2lod_dev(dev);
207         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
208         char              *arg1;
209         int                rc = 0;
210         ENTRY;
211
212         switch(lcfg->lcfg_command) {
213         case LCFG_LOV_DEL_OBD:
214         case LCFG_LOV_ADD_INA:
215         case LCFG_LOV_ADD_OBD:
216         case LCFG_ADD_MDC: {
217                 __u32 index;
218                 __u32 mdt_index;
219                 int gen;
220                 /* lov_modify_tgts add  0:lov_mdsA  1:osp  2:0  3:1
221                  * modify_mdc_tgts add  0:lustre-MDT0001
222                  *                    1:lustre-MDT0001-mdc0002
223                  *                    2:2  3:1*/
224                 arg1 = lustre_cfg_string(lcfg, 1);
225
226                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
227                         GOTO(out, rc = -EINVAL);
228                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
229                         GOTO(out, rc = -EINVAL);
230
231                 if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
232                         char *mdt;
233                         mdt = strstr(lustre_cfg_string(lcfg, 0), "-MDT");
234                         /* 1.8 configs don't have "-MDT0000" at the end */
235                         if (mdt == NULL) {
236                                 mdt_index = 0;
237                         } else {
238                                 rc = lodname2mdt_index(
239                                         lustre_cfg_string(lcfg, 0), &mdt_index);
240                                 if (rc != 0)
241                                         GOTO(out, rc);
242                         }
243                         rc = lod_add_device(env, lod, arg1, index, gen,
244                                             mdt_index, LUSTRE_OSC_NAME, 1);
245                 } else if (lcfg->lcfg_command == LCFG_ADD_MDC) {
246                         mdt_index = index;
247                         rc = lod_add_device(env, lod, arg1, index, gen,
248                                             mdt_index, LUSTRE_MDC_NAME, 1);
249                 } else if (lcfg->lcfg_command == LCFG_LOV_ADD_INA) {
250                         /*FIXME: Add mdt_index for LCFG_LOV_ADD_INA*/
251                         mdt_index = 0;
252                         rc = lod_add_device(env, lod, arg1, index, gen,
253                                             mdt_index, LUSTRE_OSC_NAME, 0);
254                 } else {
255                         rc = lod_del_device(env, lod,
256                                             &lod->lod_ost_descs,
257                                             arg1, index, gen);
258                 }
259
260                 break;
261         }
262
263         case LCFG_PARAM: {
264                 struct lprocfs_static_vars  v = { 0 };
265                 struct obd_device         *obd = lod2obd(lod);
266
267                 lprocfs_lod_init_vars(&v);
268
269                 rc = class_process_proc_param(PARAM_LOV, v.obd_vars, lcfg, obd);
270                 if (rc > 0)
271                         rc = 0;
272                 GOTO(out, rc);
273         }
274         case LCFG_CLEANUP:
275                 lu_dev_del_linkage(dev->ld_site, dev);
276                 lod_cleanup_desc_tgts(env, lod, &lod->lod_mdt_descs, lcfg);
277                 lod_cleanup_desc_tgts(env, lod, &lod->lod_ost_descs, lcfg);
278                 /*
279                  * do cleanup on underlying storage only when
280                  * all OSPs are cleaned up, as they use that OSD as well
281                  */
282                 next = &lod->lod_child->dd_lu_dev;
283                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
284                 if (rc)
285                         CERROR("%s: can't process %u: %d\n",
286                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
287
288                 rc = obd_disconnect(lod->lod_child_exp);
289                 if (rc)
290                         CERROR("error in disconnect from storage: %d\n", rc);
291                 break;
292
293         default:
294                CERROR("%s: unknown command %u\n", lod2obd(lod)->obd_name,
295                       lcfg->lcfg_command);
296                rc = -EINVAL;
297                break;
298         }
299
300 out:
301         RETURN(rc);
302 }
303
304 static int lod_recovery_complete(const struct lu_env *env,
305                                  struct lu_device *dev)
306 {
307         struct lod_device   *lod = lu2lod_dev(dev);
308         struct lu_device    *next = &lod->lod_child->dd_lu_dev;
309         int                  i, rc;
310         ENTRY;
311
312         LASSERT(lod->lod_recovery_completed == 0);
313         lod->lod_recovery_completed = 1;
314
315         rc = next->ld_ops->ldo_recovery_complete(env, next);
316
317         lod_getref(&lod->lod_ost_descs);
318         if (lod->lod_osts_size > 0) {
319                 cfs_foreach_bit(lod->lod_ost_bitmap, i) {
320                         struct lod_tgt_desc *tgt;
321                         tgt = OST_TGT(lod, i);
322                         LASSERT(tgt && tgt->ltd_tgt);
323                         next = &tgt->ltd_ost->dd_lu_dev;
324                         rc = next->ld_ops->ldo_recovery_complete(env, next);
325                         if (rc)
326                                 CERROR("%s: can't complete recovery on #%d:"
327                                         "%d\n", lod2obd(lod)->obd_name, i, rc);
328                 }
329         }
330         lod_putref(lod, &lod->lod_ost_descs);
331         RETURN(rc);
332 }
333
334 static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
335                        struct lu_device *cdev)
336 {
337         struct lod_device   *lod = lu2lod_dev(cdev);
338         struct lu_device    *next = &lod->lod_child->dd_lu_dev;
339         int                  rc;
340         ENTRY;
341
342         rc = next->ld_ops->ldo_prepare(env, pdev, next);
343         if (rc != 0) {
344                 CERROR("%s: prepare bottom error: rc = %d\n",
345                        lod2obd(lod)->obd_name, rc);
346                 RETURN(rc);
347         }
348
349         lod->lod_initialized = 1;
350
351         RETURN(rc);
352 }
353
354 const struct lu_device_operations lod_lu_ops = {
355         .ldo_object_alloc       = lod_object_alloc,
356         .ldo_process_config     = lod_process_config,
357         .ldo_recovery_complete  = lod_recovery_complete,
358         .ldo_prepare            = lod_prepare,
359 };
360
361 static int lod_root_get(const struct lu_env *env,
362                         struct dt_device *dev, struct lu_fid *f)
363 {
364         return dt_root_get(env, dt2lod_dev(dev)->lod_child, f);
365 }
366
367 static int lod_statfs(const struct lu_env *env,
368                       struct dt_device *dev, struct obd_statfs *sfs)
369 {
370         return dt_statfs(env, dt2lod_dev(dev)->lod_child, sfs);
371 }
372
373 static struct thandle *lod_trans_create(const struct lu_env *env,
374                                         struct dt_device *dev)
375 {
376         return dt_trans_create(env, dt2lod_dev(dev)->lod_child);
377 }
378
379 static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
380                            struct thandle *th)
381 {
382         return dt_trans_start(env, dt2lod_dev(dev)->lod_child, th);
383 }
384
385 static int lod_trans_stop(const struct lu_env *env, struct thandle *th)
386 {
387         /* XXX: we don't know next device, will be fixed with DNE */
388         return dt_trans_stop(env, th->th_dev, th);
389 }
390
391 static void lod_conf_get(const struct lu_env *env,
392                          const struct dt_device *dev,
393                          struct dt_device_param *param)
394 {
395         dt_conf_get(env, dt2lod_dev((struct dt_device *)dev)->lod_child, param);
396 }
397
398 static int lod_sync(const struct lu_env *env, struct dt_device *dev)
399 {
400         struct lod_device   *lod = dt2lod_dev(dev);
401         struct lod_ost_desc *ost;
402         int                  rc = 0, i;
403         ENTRY;
404
405         lod_getref(&lod->lod_ost_descs);
406         lod_foreach_ost(lod, i) {
407                 ost = OST_TGT(lod, i);
408                 LASSERT(ost && ost->ltd_ost);
409                 rc = dt_sync(env, ost->ltd_ost);
410                 if (rc) {
411                         CERROR("%s: can't sync %u: %d\n",
412                                lod2obd(lod)->obd_name, i, rc);
413                         break;
414                 }
415         }
416         lod_putref(lod, &lod->lod_ost_descs);
417         if (rc == 0)
418                 rc = dt_sync(env, lod->lod_child);
419
420         RETURN(rc);
421 }
422
423 static int lod_ro(const struct lu_env *env, struct dt_device *dev)
424 {
425         return dt_ro(env, dt2lod_dev(dev)->lod_child);
426 }
427
428 static int lod_commit_async(const struct lu_env *env, struct dt_device *dev)
429 {
430         return dt_commit_async(env, dt2lod_dev(dev)->lod_child);
431 }
432
433 static int lod_init_capa_ctxt(const struct lu_env *env, struct dt_device *dev,
434                               int mode, unsigned long timeout,
435                               __u32 alg, struct lustre_capa_key *keys)
436 {
437         struct dt_device *next = dt2lod_dev(dev)->lod_child;
438         return dt_init_capa_ctxt(env, next, mode, timeout, alg, keys);
439 }
440
441 static const struct dt_device_operations lod_dt_ops = {
442         .dt_root_get         = lod_root_get,
443         .dt_statfs           = lod_statfs,
444         .dt_trans_create     = lod_trans_create,
445         .dt_trans_start      = lod_trans_start,
446         .dt_trans_stop       = lod_trans_stop,
447         .dt_conf_get         = lod_conf_get,
448         .dt_sync             = lod_sync,
449         .dt_ro               = lod_ro,
450         .dt_commit_async     = lod_commit_async,
451         .dt_init_capa_ctxt   = lod_init_capa_ctxt,
452 };
453
454 static int lod_connect_to_osd(const struct lu_env *env, struct lod_device *lod,
455                               struct lustre_cfg *cfg)
456 {
457         struct obd_connect_data *data = NULL;
458         struct obd_device       *obd;
459         char                    *nextdev = NULL, *p, *s;
460         int                      rc, len = 0;
461         ENTRY;
462
463         LASSERT(cfg);
464         LASSERT(lod->lod_child_exp == NULL);
465
466         /* compatibility hack: we still use old config logs
467          * which specify LOV, but we need to learn underlying
468          * OSD device, which is supposed to be:
469          *  <fsname>-MDTxxxx-osd
470          *
471          * 2.x MGS generates lines like the following:
472          *   #03 (176)lov_setup 0:lustre-MDT0000-mdtlov  1:(struct lov_desc)
473          * 1.8 MGS generates lines like the following:
474          *   #03 (168)lov_setup 0:lustre-mdtlov  1:(struct lov_desc)
475          *
476          * we use "-MDT" to differentiate 2.x from 1.8 */
477
478         if ((p = lustre_cfg_string(cfg, 0)) && strstr(p, "-mdtlov")) {
479                 len = strlen(p) + 1;
480                 OBD_ALLOC(nextdev, len);
481                 if (nextdev == NULL)
482                         GOTO(out, rc = -ENOMEM);
483
484                 strcpy(nextdev, p);
485                 s = strstr(nextdev, "-mdtlov");
486                 if (unlikely(s == NULL)) {
487                         CERROR("unable to parse device name %s\n",
488                                lustre_cfg_string(cfg, 0));
489                         GOTO(out, rc = -EINVAL);
490                 }
491
492                 if (strstr(nextdev, "-MDT")) {
493                         /* 2.x config */
494                         strcpy(s, "-osd");
495                 } else {
496                         /* 1.8 config */
497                         strcpy(s, "-MDT0000-osd");
498                 }
499         } else {
500                 CERROR("unable to parse device name %s\n",
501                        lustre_cfg_string(cfg, 0));
502                 GOTO(out, rc = -EINVAL);
503         }
504
505         OBD_ALLOC_PTR(data);
506         if (data == NULL)
507                 GOTO(out, rc = -ENOMEM);
508
509         obd = class_name2obd(nextdev);
510         if (obd == NULL) {
511                 CERROR("can not locate next device: %s\n", nextdev);
512                 GOTO(out, rc = -ENOTCONN);
513         }
514
515         data->ocd_connect_flags = OBD_CONNECT_VERSION;
516         data->ocd_version = LUSTRE_VERSION_CODE;
517
518         rc = obd_connect(env, &lod->lod_child_exp, obd, &obd->obd_uuid,
519                          data, NULL);
520         if (rc) {
521                 CERROR("cannot connect to next dev %s (%d)\n", nextdev, rc);
522                 GOTO(out, rc);
523         }
524
525         lod->lod_dt_dev.dd_lu_dev.ld_site =
526                 lod->lod_child_exp->exp_obd->obd_lu_dev->ld_site;
527         LASSERT(lod->lod_dt_dev.dd_lu_dev.ld_site);
528         lod->lod_child = lu2dt_dev(lod->lod_child_exp->exp_obd->obd_lu_dev);
529
530 out:
531         if (data)
532                 OBD_FREE_PTR(data);
533         if (nextdev)
534                 OBD_FREE(nextdev, len);
535         RETURN(rc);
536 }
537
538 static int lod_tgt_desc_init(struct lod_tgt_descs *ltd)
539 {
540         mutex_init(&ltd->ltd_mutex);
541         init_rwsem(&ltd->ltd_rw_sem);
542
543         /* the OST array and bitmap are allocated/grown dynamically as OSTs are
544          * added to the LOD, see lod_add_device() */
545         ltd->ltd_tgt_bitmap = CFS_ALLOCATE_BITMAP(32);
546         if (ltd->ltd_tgt_bitmap == NULL)
547                 RETURN(-ENOMEM);
548
549         ltd->ltd_tgts_size  = 32;
550         ltd->ltd_tgtnr      = 0;
551
552         ltd->ltd_death_row = 0;
553         ltd->ltd_refcount  = 0;
554         return 0;
555 }
556
557 static int lod_init0(const struct lu_env *env, struct lod_device *lod,
558                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
559 {
560         struct dt_device_param ddp;
561         struct obd_device     *obd;
562         int                    rc;
563         ENTRY;
564
565         obd = class_name2obd(lustre_cfg_string(cfg, 0));
566         if (obd == NULL) {
567                 CERROR("Cannot find obd with name %s\n",
568                        lustre_cfg_string(cfg, 0));
569                 RETURN(-ENODEV);
570         }
571
572         obd->obd_lu_dev = &lod->lod_dt_dev.dd_lu_dev;
573         lod->lod_dt_dev.dd_lu_dev.ld_obd = obd;
574         lod->lod_dt_dev.dd_lu_dev.ld_ops = &lod_lu_ops;
575         lod->lod_dt_dev.dd_ops = &lod_dt_ops;
576
577         rc = lod_connect_to_osd(env, lod, cfg);
578         if (rc)
579                 RETURN(rc);
580
581         dt_conf_get(env, &lod->lod_dt_dev, &ddp);
582         lod->lod_osd_max_easize = ddp.ddp_max_ea_size;
583
584         /* setup obd to be used with old lov code */
585         rc = lod_pools_init(lod, cfg);
586         if (rc)
587                 GOTO(out_disconnect, rc);
588
589         rc = lod_procfs_init(lod);
590         if (rc)
591                 GOTO(out_pools, rc);
592
593         spin_lock_init(&lod->lod_desc_lock);
594         spin_lock_init(&lod->lod_connects_lock);
595         lod_tgt_desc_init(&lod->lod_mdt_descs);
596         lod_tgt_desc_init(&lod->lod_ost_descs);
597
598         RETURN(0);
599
600 out_pools:
601         lod_pools_fini(lod);
602 out_disconnect:
603         obd_disconnect(lod->lod_child_exp);
604         RETURN(rc);
605 }
606
607 static struct lu_device *lod_device_free(const struct lu_env *env,
608                                          struct lu_device *lu)
609 {
610         struct lod_device *lod = lu2lod_dev(lu);
611         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
612         ENTRY;
613
614         LASSERT(cfs_atomic_read(&lu->ld_ref) == 0);
615         dt_device_fini(&lod->lod_dt_dev);
616         OBD_FREE_PTR(lod);
617         RETURN(next);
618 }
619
620 static struct lu_device *lod_device_alloc(const struct lu_env *env,
621                                           struct lu_device_type *type,
622                                           struct lustre_cfg *lcfg)
623 {
624         struct lod_device *lod;
625         struct lu_device  *lu_dev;
626
627         OBD_ALLOC_PTR(lod);
628         if (lod == NULL) {
629                 lu_dev = ERR_PTR(-ENOMEM);
630         } else {
631                 int rc;
632
633                 lu_dev = lod2lu_dev(lod);
634                 dt_device_init(&lod->lod_dt_dev, type);
635                 rc = lod_init0(env, lod, type, lcfg);
636                 if (rc != 0) {
637                         lod_device_free(env, lu_dev);
638                         lu_dev = ERR_PTR(rc);
639                 }
640         }
641
642         return lu_dev;
643 }
644
645 static struct lu_device *lod_device_fini(const struct lu_env *env,
646                                          struct lu_device *d)
647 {
648         struct lod_device *lod = lu2lod_dev(d);
649         int                rc;
650         ENTRY;
651
652         lod_pools_fini(lod);
653
654         lod_procfs_fini(lod);
655
656         rc = lod_fini_tgt(lod, &lod->lod_ost_descs);
657         if (rc)
658                 CERROR("%s:can not fini ost descs %d\n",
659                         lod2obd(lod)->obd_name, rc);
660
661         rc = lod_fini_tgt(lod, &lod->lod_mdt_descs);
662         if (rc)
663                 CERROR("%s:can not fini mdt descs %d\n",
664                         lod2obd(lod)->obd_name, rc);
665
666         RETURN(NULL);
667 }
668
669 /*
670  * we use exports to track all LOD users
671  */
672 static int lod_obd_connect(const struct lu_env *env, struct obd_export **exp,
673                            struct obd_device *obd, struct obd_uuid *cluuid,
674                            struct obd_connect_data *data, void *localdata)
675 {
676         struct lod_device    *lod = lu2lod_dev(obd->obd_lu_dev);
677         struct lustre_handle  conn;
678         int                   rc;
679         ENTRY;
680
681         CDEBUG(D_CONFIG, "connect #%d\n", lod->lod_connects);
682
683         rc = class_connect(&conn, obd, cluuid);
684         if (rc)
685                 RETURN(rc);
686
687         *exp = class_conn2export(&conn);
688
689         spin_lock(&lod->lod_connects_lock);
690         lod->lod_connects++;
691         /* at the moment we expect the only user */
692         LASSERT(lod->lod_connects == 1);
693         spin_unlock(&lod->lod_connects_lock);
694
695         RETURN(0);
696 }
697
698 /*
699  * once last export (we don't count self-export) disappeared
700  * lod can be released
701  */
702 static int lod_obd_disconnect(struct obd_export *exp)
703 {
704         struct obd_device *obd = exp->exp_obd;
705         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
706         int                rc, release = 0;
707         ENTRY;
708
709         /* Only disconnect the underlying layers on the final disconnect. */
710         spin_lock(&lod->lod_connects_lock);
711         lod->lod_connects--;
712         if (lod->lod_connects != 0) {
713                 /* why should there be more than 1 connect? */
714                 spin_unlock(&lod->lod_connects_lock);
715                 CERROR("%s: disconnect #%d\n", exp->exp_obd->obd_name,
716                        lod->lod_connects);
717                 goto out;
718         }
719         spin_unlock(&lod->lod_connects_lock);
720
721         /* the last user of lod has gone, let's release the device */
722         release = 1;
723
724 out:
725         rc = class_disconnect(exp); /* bz 9811 */
726
727         if (rc == 0 && release)
728                 class_manual_cleanup(obd);
729         RETURN(rc);
730 }
731
732 LU_KEY_INIT(lod, struct lod_thread_info);
733
734 static void lod_key_fini(const struct lu_context *ctx,
735                 struct lu_context_key *key, void *data)
736 {
737         struct lod_thread_info *info = data;
738         /* allocated in lod_get_lov_ea
739          * XXX: this is overload, a tread may have such store but used only
740          * once. Probably better would be pool of such stores per LOD.
741          */
742         if (info->lti_ea_store) {
743                 OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size);
744                 info->lti_ea_store = NULL;
745                 info->lti_ea_store_size = 0;
746         }
747         OBD_FREE_PTR(info);
748 }
749
750 /* context key: lod_thread_key */
751 LU_CONTEXT_KEY_DEFINE(lod, LCT_MD_THREAD);
752
753 LU_TYPE_INIT_FINI(lod, &lod_thread_key);
754
755 static struct lu_device_type_operations lod_device_type_ops = {
756         .ldto_init           = lod_type_init,
757         .ldto_fini           = lod_type_fini,
758
759         .ldto_start          = lod_type_start,
760         .ldto_stop           = lod_type_stop,
761
762         .ldto_device_alloc   = lod_device_alloc,
763         .ldto_device_free    = lod_device_free,
764
765         .ldto_device_fini    = lod_device_fini
766 };
767
768 static struct lu_device_type lod_device_type = {
769         .ldt_tags     = LU_DEVICE_DT,
770         .ldt_name     = LUSTRE_LOD_NAME,
771         .ldt_ops      = &lod_device_type_ops,
772         .ldt_ctx_tags = LCT_MD_THREAD,
773 };
774
775 static int lod_obd_health_check(const struct lu_env *env,
776                 struct obd_device *obd)
777 {
778         struct lod_device   *d = lu2lod_dev(obd->obd_lu_dev);
779         struct lod_ost_desc *ost;
780         int                  i, rc = 1;
781         ENTRY;
782
783         LASSERT(d);
784         lod_getref(&d->lod_ost_descs);
785         lod_foreach_ost(d, i) {
786                 ost = OST_TGT(d, i);
787                 LASSERT(ost && ost->ltd_ost);
788                 rc = obd_health_check(env, ost->ltd_exp->exp_obd);
789                 /* one healthy device is enough */
790                 if (rc == 0)
791                         break;
792         }
793         lod_putref(d, &d->lod_ost_descs);
794         RETURN(rc);
795 }
796
797 static struct obd_ops lod_obd_device_ops = {
798         .o_owner        = THIS_MODULE,
799         .o_connect      = lod_obd_connect,
800         .o_disconnect   = lod_obd_disconnect,
801         .o_health_check = lod_obd_health_check,
802         .o_pool_new     = lod_pool_new,
803         .o_pool_rem     = lod_pool_remove,
804         .o_pool_add     = lod_pool_add,
805         .o_pool_del     = lod_pool_del,
806 };
807
808 static int __init lod_mod_init(void)
809 {
810         struct lprocfs_static_vars  lvars = { 0 };
811         cfs_proc_dir_entry_t       *lov_proc_dir;
812         int                         rc;
813
814         rc = lu_kmem_init(lod_caches);
815         if (rc)
816                 return rc;
817
818         lprocfs_lod_init_vars(&lvars);
819
820         rc = class_register_type(&lod_obd_device_ops, NULL, lvars.module_vars,
821                                  LUSTRE_LOD_NAME, &lod_device_type);
822         if (rc) {
823                 lu_kmem_fini(lod_caches);
824                 return rc;
825         }
826
827         /* create "lov" entry in procfs for compatibility purposes */
828         lov_proc_dir = lprocfs_srch(proc_lustre_root, "lov");
829         if (lov_proc_dir == NULL) {
830                 lov_proc_dir = lprocfs_register("lov", proc_lustre_root,
831                                                 NULL, NULL);
832                 if (IS_ERR(lov_proc_dir))
833                         CERROR("lod: can't create compat entry \"lov\": %d\n",
834                                (int)PTR_ERR(lov_proc_dir));
835         }
836
837         return rc;
838 }
839
840 static void __exit lod_mod_exit(void)
841 {
842
843         lprocfs_try_remove_proc_entry("lov", proc_lustre_root);
844
845         class_unregister_type(LUSTRE_LOD_NAME);
846         lu_kmem_fini(lod_caches);
847 }
848
849 MODULE_AUTHOR("Whamcloud, Inc. <http://www.whamcloud.com/>");
850 MODULE_DESCRIPTION("Lustre Logical Object Device ("LUSTRE_LOD_NAME")");
851 MODULE_LICENSE("GPL");
852
853 module_init(lod_mod_init);
854 module_exit(lod_mod_exit);
855