Whamcloud - gitweb
LU-1866 lfsck: general framework for LFSCK 1.5
[fs/lustre-release.git] / lustre / mdd / mdd_lfsck.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, Intel Corporation.
24  */
25 /*
26  * lustre/mdd/mdd_lfsck.c
27  *
28  * Top-level entry points into mdd module
29  *
30  * LFSCK controller, which scans the whole device through low layer
31  * iteration APIs, drives all lfsck compeonents, controls the speed.
32  *
33  * Author: Fan Yong <yong.fan@whamcloud.com>
34  */
35
36 #ifndef EXPORT_SYMTAB
37 # define EXPORT_SYMTAB
38 #endif
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <lustre/lustre_idl.h>
42 #include <lustre_fid.h>
43 #include <obd_support.h>
44
45 #include "mdd_internal.h"
46 #include "mdd_lfsck.h"
47
48 #define HALF_SEC                        (CFS_HZ >> 1)
49 #define LFSCK_CHECKPOINT_INTERVAL       60
50
51 const char lfsck_bookmark_name[] = "lfsck_bookmark";
52
53 /* misc functions */
54
55 static inline struct mdd_device *mdd_lfsck2mdd(struct md_lfsck *lfsck)
56 {
57         return container_of0(lfsck, struct mdd_device, mdd_lfsck);
58 }
59
60 static inline char *mdd_lfsck2name(struct md_lfsck *lfsck)
61 {
62         struct mdd_device *mdd = mdd_lfsck2mdd(lfsck);
63
64         return mdd2obd_dev(mdd)->obd_name;
65 }
66
67 static inline void mdd_lfsck_component_put(const struct lu_env *env,
68                                            struct lfsck_component *com)
69 {
70         if (atomic_dec_and_test(&com->lc_ref)) {
71                 if (com->lc_obj != NULL)
72                         lu_object_put(env, &com->lc_obj->do_lu);
73                 if (com->lc_file_ram != NULL)
74                         OBD_FREE(com->lc_file_ram, com->lc_file_size);
75                 if (com->lc_file_disk != NULL)
76                         OBD_FREE(com->lc_file_disk, com->lc_file_size);
77                 OBD_FREE_PTR(com);
78         }
79 }
80
81 static inline struct lfsck_component *
82 __mdd_lfsck_component_find(struct md_lfsck *lfsck, __u16 type, cfs_list_t *list)
83 {
84         struct lfsck_component *com;
85
86         cfs_list_for_each_entry(com, list, lc_link) {
87                 if (com->lc_type == type)
88                         return com;
89         }
90         return NULL;
91 }
92
93 static void mdd_lfsck_component_cleanup(const struct lu_env *env,
94                                         struct lfsck_component *com)
95 {
96         if (!cfs_list_empty(&com->lc_link))
97                 cfs_list_del_init(&com->lc_link);
98         if (!cfs_list_empty(&com->lc_link_dir))
99                 cfs_list_del_init(&com->lc_link_dir);
100
101         mdd_lfsck_component_put(env, com);
102 }
103
104 static void __mdd_lfsck_set_speed(struct md_lfsck *lfsck, __u32 limit)
105 {
106         lfsck->ml_bookmark_ram.lb_speed_limit = limit;
107         if (limit != LFSCK_SPEED_NO_LIMIT) {
108                 if (limit > CFS_HZ) {
109                         lfsck->ml_sleep_rate = limit / CFS_HZ;
110                         lfsck->ml_sleep_jif = 1;
111                 } else {
112                         lfsck->ml_sleep_rate = 1;
113                         lfsck->ml_sleep_jif = CFS_HZ / limit;
114                 }
115         } else {
116                 lfsck->ml_sleep_jif = 0;
117                 lfsck->ml_sleep_rate = 0;
118         }
119 }
120
121 static void mdd_lfsck_control_speed(struct md_lfsck *lfsck)
122 {
123         struct ptlrpc_thread *thread = &lfsck->ml_thread;
124         struct l_wait_info    lwi;
125
126         if (lfsck->ml_sleep_jif > 0 &&
127             lfsck->ml_new_scanned >= lfsck->ml_sleep_rate) {
128                 spin_lock(&lfsck->ml_lock);
129                 if (likely(lfsck->ml_sleep_jif > 0 &&
130                            lfsck->ml_new_scanned >= lfsck->ml_sleep_rate)) {
131                         lwi = LWI_TIMEOUT_INTR(lfsck->ml_sleep_jif, NULL,
132                                                LWI_ON_SIGNAL_NOOP, NULL);
133                         spin_unlock(&lfsck->ml_lock);
134
135                         l_wait_event(thread->t_ctl_waitq,
136                                      !thread_is_running(thread),
137                                      &lwi);
138                         lfsck->ml_new_scanned = 0;
139                 } else {
140                         spin_unlock(&lfsck->ml_lock);
141                 }
142         }
143 }
144
145 /* lfsck_bookmark file ops */
146
147 static void inline mdd_lfsck_bookmark_to_cpu(struct lfsck_bookmark *des,
148                                              struct lfsck_bookmark *src)
149 {
150         des->lb_magic = le32_to_cpu(src->lb_magic);
151         des->lb_version = le16_to_cpu(src->lb_version);
152         des->lb_param = le16_to_cpu(src->lb_param);
153         des->lb_speed_limit = le32_to_cpu(src->lb_speed_limit);
154 }
155
156 static void inline mdd_lfsck_bookmark_to_le(struct lfsck_bookmark *des,
157                                             struct lfsck_bookmark *src)
158 {
159         des->lb_magic = cpu_to_le32(src->lb_magic);
160         des->lb_version = cpu_to_le16(src->lb_version);
161         des->lb_param = cpu_to_le16(src->lb_param);
162         des->lb_speed_limit = cpu_to_le32(src->lb_speed_limit);
163 }
164
165 static int mdd_lfsck_bookmark_load(const struct lu_env *env,
166                                    struct md_lfsck *lfsck)
167 {
168         loff_t pos = 0;
169         int    len = sizeof(struct lfsck_bookmark);
170         int    rc;
171
172         rc = dt_record_read(env, lfsck->ml_bookmark_obj,
173                             mdd_buf_get(env, &lfsck->ml_bookmark_disk, len),
174                             &pos);
175         if (rc == 0) {
176                 struct lfsck_bookmark *bm = &lfsck->ml_bookmark_ram;
177
178                 mdd_lfsck_bookmark_to_cpu(bm, &lfsck->ml_bookmark_disk);
179                 if (bm->lb_magic != LFSCK_BOOKMARK_MAGIC) {
180                         CWARN("%.16s: invalid lfsck_bookmark magic "
181                               "0x%x != 0x%x\n", mdd_lfsck2name(lfsck),
182                               bm->lb_magic, LFSCK_BOOKMARK_MAGIC);
183                         /* Process it as new lfsck_bookmark. */
184                         rc = -ENODATA;
185                 }
186         } else {
187                 if (rc == -EFAULT && pos == 0)
188                         /* return -ENODATA for empty lfsck_bookmark. */
189                         rc = -ENODATA;
190                 else
191                         CERROR("%.16s: fail to load lfsck_bookmark, "
192                                "expected = %d, rc = %d\n",
193                                mdd_lfsck2name(lfsck), len, rc);
194         }
195         return rc;
196 }
197
198 static int mdd_lfsck_bookmark_store(const struct lu_env *env,
199                                     struct md_lfsck *lfsck)
200 {
201         struct mdd_device *mdd    = mdd_lfsck2mdd(lfsck);
202         struct thandle    *handle;
203         struct dt_object  *obj    = lfsck->ml_bookmark_obj;
204         loff_t             pos    = 0;
205         int                len    = sizeof(struct lfsck_bookmark);
206         int                rc;
207         ENTRY;
208
209         mdd_lfsck_bookmark_to_le(&lfsck->ml_bookmark_disk,
210                                  &lfsck->ml_bookmark_ram);
211         handle = dt_trans_create(env, mdd->mdd_bottom);
212         if (IS_ERR(handle)) {
213                 rc = PTR_ERR(handle);
214                 CERROR("%.16s: fail to create trans for storing "
215                        "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
216                 RETURN(rc);
217         }
218
219         rc = dt_declare_record_write(env, obj, len, 0, handle);
220         if (rc != 0) {
221                 CERROR("%.16s: fail to declare trans for storing "
222                        "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
223                 GOTO(out, rc);
224         }
225
226         rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
227         if (rc != 0) {
228                 CERROR("%.16s: fail to start trans for storing "
229                        "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
230                 GOTO(out, rc);
231         }
232
233         rc = dt_record_write(env, obj,
234                              mdd_buf_get(env, &lfsck->ml_bookmark_disk, len),
235                              &pos, handle);
236         if (rc != 0)
237                 CERROR("%.16s: fail to store lfsck_bookmark, expected = %d, "
238                        "rc = %d\n", mdd_lfsck2name(lfsck), len, rc);
239
240         GOTO(out, rc);
241
242 out:
243         dt_trans_stop(env, mdd->mdd_bottom, handle);
244         return rc;
245 }
246
247 static int mdd_lfsck_bookmark_init(const struct lu_env *env,
248                                    struct md_lfsck *lfsck)
249 {
250         struct lfsck_bookmark *mb = &lfsck->ml_bookmark_ram;
251         int rc;
252
253         memset(mb, 0, sizeof(mb));
254         mb->lb_magic = LFSCK_BOOKMARK_MAGIC;
255         mb->lb_version = LFSCK_VERSION_V1;
256         mutex_lock(&lfsck->ml_mutex);
257         rc = mdd_lfsck_bookmark_store(env, lfsck);
258         mutex_unlock(&lfsck->ml_mutex);
259         return rc;
260 }
261
262 /* LFSCK engines */
263
264 static int mdd_lfsck_main(void *args)
265 {
266         struct lu_env            env;
267         struct md_lfsck         *lfsck  = (struct md_lfsck *)args;
268         struct ptlrpc_thread    *thread = &lfsck->ml_thread;
269         struct dt_object        *obj    = lfsck->ml_obj_oit;
270         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
271         struct dt_it            *di;
272         struct lu_fid           *fid;
273         int                      rc;
274         ENTRY;
275
276         cfs_daemonize("lfsck");
277         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
278         if (rc != 0) {
279                 CERROR("%s: LFSCK, fail to init env, rc = %d\n",
280                        mdd_lfsck2name(lfsck), rc);
281                 GOTO(noenv, rc);
282         }
283
284         di = iops->init(&env, obj, lfsck->ml_args_oit, BYPASS_CAPA);
285         if (IS_ERR(di)) {
286                 rc = PTR_ERR(di);
287                 CERROR("%s: LFSCK, fail to init iteration, rc = %d\n",
288                        mdd_lfsck2name(lfsck), rc);
289                 GOTO(fini_env, rc);
290         }
291
292         CDEBUG(D_LFSCK, "LFSCK: flags = 0x%x, pid = %d\n",
293                lfsck->ml_args_oit, cfs_curproc_pid());
294
295         spin_lock(&lfsck->ml_lock);
296         thread_set_flags(thread, SVC_RUNNING);
297         spin_unlock(&lfsck->ml_lock);
298         cfs_waitq_broadcast(&thread->t_ctl_waitq);
299
300         /* The call iops->load() will unplug low layer iteration. */
301         rc = iops->load(&env, di, 0);
302         if (rc != 0)
303                 GOTO(out, rc);
304
305         CDEBUG(D_LFSCK, "LFSCK: iteration start: pos = %s\n",
306                (char *)iops->key(&env, di));
307
308         lfsck->ml_new_scanned = 0;
309         fid = &mdd_env_info(&env)->mti_fid;
310         while (rc == 0) {
311                 iops->rec(&env, di, (struct dt_rec *)fid, 0);
312
313                 /* XXX: here, perform LFSCK when some LFSCK component(s)
314                  *      introduced in the future. */
315                 lfsck->ml_new_scanned++;
316
317                 /* XXX: here, make checkpoint when some LFSCK component(s)
318                  *      introduced in the future. */
319
320                 /* Rate control. */
321                 mdd_lfsck_control_speed(lfsck);
322                 if (unlikely(!thread_is_running(thread)))
323                         GOTO(out, rc = 0);
324
325                 rc = iops->next(&env, di);
326         }
327
328         GOTO(out, rc);
329
330 out:
331         if (lfsck->ml_paused) {
332                 /* XXX: It is hack here: if the lfsck is still running when MDS
333                  *      umounts, it should be restarted automatically after MDS
334                  *      remounts up.
335                  *
336                  *      To support that, we need to record the lfsck status in
337                  *      the lfsck on-disk bookmark file. But now, there is not
338                  *      lfsck component under the lfsck framework. To avoid to
339                  *      introduce unnecessary bookmark incompatibility issues,
340                  *      we write nothing to the lfsck bookmark file now.
341                  *
342                  *      Instead, we will reuse dt_it_ops::put() method to notify
343                  *      low layer iterator to process such case.
344                  *
345                  *      It is just temporary solution, and will be replaced when
346                  *      some lfsck component is introduced in the future. */
347                 iops->put(&env, di);
348                 CDEBUG(D_LFSCK, "LFSCK: iteration pasued: pos = %s, rc = %d\n",
349                        (char *)iops->key(&env, di), rc);
350         } else {
351                 CDEBUG(D_LFSCK, "LFSCK: iteration stop: pos = %s, rc = %d\n",
352                        (char *)iops->key(&env, di), rc);
353         }
354         iops->fini(&env, di);
355
356 fini_env:
357         lu_env_fini(&env);
358
359 noenv:
360         spin_lock(&lfsck->ml_lock);
361         thread_set_flags(thread, SVC_STOPPED);
362         cfs_waitq_broadcast(&thread->t_ctl_waitq);
363         spin_unlock(&lfsck->ml_lock);
364         return rc;
365 }
366
367 /* external interfaces */
368
369 int mdd_lfsck_set_speed(const struct lu_env *env, struct md_lfsck *lfsck,
370                         __u32 limit)
371 {
372         int rc;
373
374         mutex_lock(&lfsck->ml_mutex);
375         __mdd_lfsck_set_speed(lfsck, limit);
376         rc = mdd_lfsck_bookmark_store(env, lfsck);
377         mutex_unlock(&lfsck->ml_mutex);
378         return rc;
379 }
380
381 int mdd_lfsck_start(const struct lu_env *env, struct md_lfsck *lfsck,
382                     struct lfsck_start *start)
383 {
384         struct lfsck_bookmark  *bk     = &lfsck->ml_bookmark_ram;
385         struct ptlrpc_thread   *thread = &lfsck->ml_thread;
386         struct lfsck_component *com;
387         struct l_wait_info      lwi    = { 0 };
388         bool                    dirty  = false;
389         int                     rc     = 0;
390         __u16                   valid  = 0;
391         __u16                   flags  = 0;
392         ENTRY;
393
394         if (lfsck->ml_obj_oit == NULL)
395                 RETURN(-ENOTSUPP);
396
397         /* start == NULL means auto trigger paused LFSCK. */
398         if (start == NULL && cfs_list_empty(&lfsck->ml_list_scan))
399                 RETURN(0);
400
401         mutex_lock(&lfsck->ml_mutex);
402         spin_lock(&lfsck->ml_lock);
403         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
404                 spin_unlock(&lfsck->ml_lock);
405                 mutex_unlock(&lfsck->ml_mutex);
406                 RETURN(-EALREADY);
407         }
408
409         spin_unlock(&lfsck->ml_lock);
410
411         lfsck->ml_paused = 0;
412         lfsck->ml_oit_over = 0;
413         lfsck->ml_drop_dryrun = 0;
414         lfsck->ml_new_scanned = 0;
415
416         /* For auto trigger. */
417         if (start == NULL)
418                 goto trigger;
419
420         start->ls_version = bk->lb_version;
421         if (start->ls_valid & LSV_SPEED_LIMIT) {
422                 __mdd_lfsck_set_speed(lfsck, start->ls_speed_limit);
423                 dirty = true;
424         }
425
426         if (start->ls_valid & LSV_ERROR_HANDLE) {
427                 valid |= DOIV_ERROR_HANDLE;
428                 if (start->ls_flags & LPF_FAILOUT)
429                         flags |= DOIF_FAILOUT;
430
431                 if ((start->ls_flags & LPF_FAILOUT) &&
432                     !(bk->lb_param & LPF_FAILOUT)) {
433                         bk->lb_param |= LPF_FAILOUT;
434                         dirty = true;
435                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
436                            (bk->lb_param & LPF_FAILOUT)) {
437                         bk->lb_param &= ~LPF_FAILOUT;
438                         dirty = true;
439                 }
440         }
441
442         if (start->ls_valid & LSV_DRYRUN) {
443                 if ((start->ls_flags & LPF_DRYRUN) &&
444                     !(bk->lb_param & LPF_DRYRUN)) {
445                         bk->lb_param |= LPF_DRYRUN;
446                         dirty = true;
447                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
448                            (bk->lb_param & LPF_DRYRUN)) {
449                         bk->lb_param &= ~LPF_DRYRUN;
450                         lfsck->ml_drop_dryrun = 1;
451                         dirty = true;
452                 }
453         }
454
455         if (dirty) {
456                 rc = mdd_lfsck_bookmark_store(env, lfsck);
457                 if (rc != 0)
458                         GOTO(out, rc);
459         }
460
461         if (start->ls_flags & LPF_RESET)
462                 flags |= DOIF_RESET;
463
464         if (start->ls_active != 0) {
465                 struct lfsck_component *next;
466                 __u16 type = 1;
467
468                 if (start->ls_active == LFSCK_TYPES_ALL)
469                         start->ls_active = LFSCK_TYPES_SUPPORTED;
470
471                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
472                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
473                         GOTO(out, rc = -ENOTSUPP);
474                 }
475
476                 cfs_list_for_each_entry_safe(com, next,
477                                              &lfsck->ml_list_scan, lc_link) {
478                         if (!(com->lc_type & start->ls_active)) {
479                                 rc = com->lc_ops->lfsck_post(env, com, 0);
480                                 if (rc != 0)
481                                         GOTO(out, rc);
482                         }
483                 }
484
485                 while (start->ls_active != 0) {
486                         if (type & start->ls_active) {
487                                 com = __mdd_lfsck_component_find(lfsck, type,
488                                                         &lfsck->ml_list_idle);
489                                 if (com != NULL) {
490                                         /* The component status will be updated
491                                          * when its prep() is called later by
492                                          * the LFSCK main engine. */
493                                         cfs_list_del_init(&com->lc_link);
494                                         cfs_list_add_tail(&com->lc_link,
495                                                           &lfsck->ml_list_scan);
496                                 }
497                                 start->ls_active &= ~type;
498                         }
499                         type <<= 1;
500                 }
501         }
502
503         cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
504                 start->ls_active |= com->lc_type;
505                 if (flags & DOIF_RESET) {
506                         rc = com->lc_ops->lfsck_reset(env, com, false);
507                         if (rc != 0)
508                                 GOTO(out, rc);
509                 }
510         }
511
512 trigger:
513         lfsck->ml_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
514         if (bk->lb_param & LPF_DRYRUN)
515                 lfsck->ml_args_dir |= LUDA_VERIFY_DRYRUN;
516
517         if (bk->lb_param & LPF_FAILOUT) {
518                 valid |= DOIV_ERROR_HANDLE;
519                 flags |= DOIF_FAILOUT;
520         }
521
522         if (!cfs_list_empty(&lfsck->ml_list_scan))
523                 flags |= DOIF_OUTUSED;
524
525         lfsck->ml_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
526         thread_set_flags(thread, 0);
527         rc = cfs_create_thread(mdd_lfsck_main, lfsck, 0);
528         if (rc < 0)
529                 CERROR("%s: cannot start LFSCK thread, rc = %d\n",
530                        mdd_lfsck2name(lfsck), rc);
531         else
532                 l_wait_event(thread->t_ctl_waitq,
533                              thread_is_running(thread) ||
534                              thread_is_stopped(thread),
535                              &lwi);
536
537         GOTO(out, rc = 0);
538
539 out:
540         mutex_unlock(&lfsck->ml_mutex);
541         return (rc < 0 ? rc : 0);
542 }
543
544 int mdd_lfsck_stop(const struct lu_env *env, struct md_lfsck *lfsck,
545                    bool pause)
546 {
547         struct ptlrpc_thread *thread = &lfsck->ml_thread;
548         struct l_wait_info    lwi    = { 0 };
549         ENTRY;
550
551         mutex_lock(&lfsck->ml_mutex);
552         spin_lock(&lfsck->ml_lock);
553         if (thread_is_init(thread) || thread_is_stopped(thread)) {
554                 spin_unlock(&lfsck->ml_lock);
555                 mutex_unlock(&lfsck->ml_mutex);
556                 RETURN(-EALREADY);
557         }
558
559         if (pause)
560                 lfsck->ml_paused = 1;
561         thread_set_flags(thread, SVC_STOPPING);
562         /* The LFSCK thread may be sleeping on low layer wait queue,
563          * wake it up. */
564         if (likely(lfsck->ml_di_oit != NULL))
565                 lfsck->ml_obj_oit->do_index_ops->dio_it.put(env,
566                                                             lfsck->ml_di_oit);
567         spin_unlock(&lfsck->ml_lock);
568
569         cfs_waitq_broadcast(&thread->t_ctl_waitq);
570         l_wait_event(thread->t_ctl_waitq,
571                      thread_is_stopped(thread),
572                      &lwi);
573         mutex_unlock(&lfsck->ml_mutex);
574
575         RETURN(0);
576 }
577
578 static const struct lu_fid lfsck_it_fid = { .f_seq = FID_SEQ_LOCAL_FILE,
579                                             .f_oid = OTABLE_IT_OID,
580                                             .f_ver = 0 };
581
582 int mdd_lfsck_setup(const struct lu_env *env, struct mdd_device *mdd)
583 {
584         struct md_lfsck  *lfsck = &mdd->mdd_lfsck;
585         struct dt_object *obj;
586         int               rc;
587         ENTRY;
588
589         LASSERT(!lfsck->ml_initialized);
590
591         lfsck->ml_initialized = 1;
592         mutex_init(&lfsck->ml_mutex);
593         spin_lock_init(&lfsck->ml_lock);
594         CFS_INIT_LIST_HEAD(&lfsck->ml_list_scan);
595         CFS_INIT_LIST_HEAD(&lfsck->ml_list_dir);
596         CFS_INIT_LIST_HEAD(&lfsck->ml_list_double_scan);
597         CFS_INIT_LIST_HEAD(&lfsck->ml_list_idle);
598         cfs_waitq_init(&lfsck->ml_thread.t_ctl_waitq);
599
600         obj = dt_locate(env, mdd->mdd_bottom, &lfsck_it_fid);
601         if (IS_ERR(obj))
602                 RETURN(PTR_ERR(obj));
603
604         lfsck->ml_obj_oit = obj;
605         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
606         if (rc != 0) {
607                 if (rc == -ENOTSUPP)
608                         rc = 0;
609
610                 RETURN(rc);
611         }
612
613         obj = dt_store_open(env, mdd->mdd_bottom, "", lfsck_bookmark_name,
614                             &mdd_env_info(env)->mti_fid);
615         if (IS_ERR(obj))
616                 RETURN(PTR_ERR(obj));
617
618         lfsck->ml_bookmark_obj = obj;
619         rc = mdd_lfsck_bookmark_load(env, lfsck);
620         if (rc == -ENODATA)
621                 rc = mdd_lfsck_bookmark_init(env, lfsck);
622
623         /* XXX: LFSCK components initialization to be added here. */
624
625         RETURN(rc);
626 }
627
628 void mdd_lfsck_cleanup(const struct lu_env *env, struct mdd_device *mdd)
629 {
630         struct md_lfsck         *lfsck  = &mdd->mdd_lfsck;
631         struct ptlrpc_thread    *thread = &lfsck->ml_thread;
632         struct lfsck_component  *com;
633
634         if (!lfsck->ml_initialized)
635                 return;
636
637         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
638
639         if (lfsck->ml_obj_oit != NULL) {
640                 lu_object_put(env, &lfsck->ml_obj_oit->do_lu);
641                 lfsck->ml_obj_oit = NULL;
642         }
643
644         LASSERT(lfsck->ml_obj_dir == NULL);
645
646         if (lfsck->ml_bookmark_obj != NULL) {
647                 lu_object_put(env, &lfsck->ml_bookmark_obj->do_lu);
648                 lfsck->ml_bookmark_obj = NULL;
649         }
650
651         while (!cfs_list_empty(&lfsck->ml_list_scan)) {
652                 com = cfs_list_entry(lfsck->ml_list_scan.next,
653                                      struct lfsck_component,
654                                      lc_link);
655                 mdd_lfsck_component_cleanup(env, com);
656         }
657
658         LASSERT(cfs_list_empty(&lfsck->ml_list_dir));
659
660         while (!cfs_list_empty(&lfsck->ml_list_double_scan)) {
661                 com = cfs_list_entry(lfsck->ml_list_double_scan.next,
662                                      struct lfsck_component,
663                                      lc_link);
664                 mdd_lfsck_component_cleanup(env, com);
665         }
666
667         while (!cfs_list_empty(&lfsck->ml_list_idle)) {
668                 com = cfs_list_entry(lfsck->ml_list_idle.next,
669                                      struct lfsck_component,
670                                      lc_link);
671                 mdd_lfsck_component_cleanup(env, com);
672         }
673 }