Whamcloud - gitweb
LU-1866 lfsck: LFSCK 1.5 user space control
[fs/lustre-release.git] / lustre / mdd / mdd_lfsck.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, Intel Corporation.
24  */
25 /*
26  * lustre/mdd/mdd_lfsck.c
27  *
28  * Top-level entry points into mdd module
29  *
30  * LFSCK controller, which scans the whole device through low layer
31  * iteration APIs, drives all lfsck compeonents, controls the speed.
32  *
33  * Author: Fan Yong <yong.fan@whamcloud.com>
34  */
35
36 #ifndef EXPORT_SYMTAB
37 # define EXPORT_SYMTAB
38 #endif
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <lustre/lustre_idl.h>
42 #include <lustre_fid.h>
43 #include <obd_support.h>
44
45 #include "mdd_internal.h"
46 #include "mdd_lfsck.h"
47
48 #define HALF_SEC                        (CFS_HZ >> 1)
49 #define LFSCK_CHECKPOINT_INTERVAL       60
50 #define MDS_DIR_DUMMY_START             0xffffffffffffffffULL
51
52 const char lfsck_bookmark_name[] = "lfsck_bookmark";
53 const char lfsck_namespace_name[] = "lfsck_namespace";
54
55 static const char *lfsck_status_names[] = {
56         "init",
57         "scanning-phase1",
58         "scanning-phase2",
59         "completed",
60         "failed",
61         "stopped",
62         "paused",
63         "crashed",
64         NULL
65 };
66
67 static const char *lfsck_flags_names[] = {
68         "scanned-once",
69         "inconsistent",
70         "upgrade",
71         NULL
72 };
73
74 static const char *lfsck_param_names[] = {
75         "failout",
76         "dryrun",
77         NULL
78 };
79
80 /* misc functions */
81
82 static inline struct mdd_device *mdd_lfsck2mdd(struct md_lfsck *lfsck)
83 {
84         return container_of0(lfsck, struct mdd_device, mdd_lfsck);
85 }
86
87 static inline char *mdd_lfsck2name(struct md_lfsck *lfsck)
88 {
89         struct mdd_device *mdd = mdd_lfsck2mdd(lfsck);
90
91         return mdd2obd_dev(mdd)->obd_name;
92 }
93
94 static inline void mdd_lfsck_component_get(struct lfsck_component *com)
95 {
96         atomic_inc(&com->lc_ref);
97 }
98
99 static inline void mdd_lfsck_component_put(const struct lu_env *env,
100                                            struct lfsck_component *com)
101 {
102         if (atomic_dec_and_test(&com->lc_ref)) {
103                 if (com->lc_obj != NULL)
104                         lu_object_put(env, &com->lc_obj->do_lu);
105                 if (com->lc_file_ram != NULL)
106                         OBD_FREE(com->lc_file_ram, com->lc_file_size);
107                 if (com->lc_file_disk != NULL)
108                         OBD_FREE(com->lc_file_disk, com->lc_file_size);
109                 OBD_FREE_PTR(com);
110         }
111 }
112
113 static inline struct lfsck_component *
114 __mdd_lfsck_component_find(struct md_lfsck *lfsck, __u16 type, cfs_list_t *list)
115 {
116         struct lfsck_component *com;
117
118         cfs_list_for_each_entry(com, list, lc_link) {
119                 if (com->lc_type == type)
120                         return com;
121         }
122         return NULL;
123 }
124
125 static struct lfsck_component *
126 mdd_lfsck_component_find(struct md_lfsck *lfsck, __u16 type)
127 {
128         struct lfsck_component *com;
129
130         spin_lock(&lfsck->ml_lock);
131         com = __mdd_lfsck_component_find(lfsck, type, &lfsck->ml_list_scan);
132         if (com != NULL)
133                 goto unlock;
134
135         com = __mdd_lfsck_component_find(lfsck, type,
136                                          &lfsck->ml_list_double_scan);
137         if (com != NULL)
138                 goto unlock;
139
140         com = __mdd_lfsck_component_find(lfsck, type, &lfsck->ml_list_idle);
141
142 unlock:
143         if (com != NULL)
144                 mdd_lfsck_component_get(com);
145         spin_unlock(&lfsck->ml_lock);
146         return com;
147 }
148
149 static void mdd_lfsck_component_cleanup(const struct lu_env *env,
150                                         struct lfsck_component *com)
151 {
152         if (!cfs_list_empty(&com->lc_link))
153                 cfs_list_del_init(&com->lc_link);
154         if (!cfs_list_empty(&com->lc_link_dir))
155                 cfs_list_del_init(&com->lc_link_dir);
156
157         mdd_lfsck_component_put(env, com);
158 }
159
160 static int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
161                            const char *prefix)
162 {
163         int save = *len;
164         int flag;
165         int rc;
166         int i;
167
168         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
169         if (rc <= 0)
170                 return -ENOSPC;
171
172         *buf += rc;
173         *len -= rc;
174         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
175                 if (flag & bits) {
176                         bits &= ~flag;
177                         rc = snprintf(*buf, *len, "%s%c", names[i],
178                                       bits != 0 ? ',' : '\n');
179                         if (rc <= 0)
180                                 return -ENOSPC;
181
182                         *buf += rc;
183                         *len -= rc;
184                 }
185         }
186         return save - *len;
187 }
188
189 static int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
190 {
191         int rc;
192
193         if (time != 0)
194                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
195                               cfs_time_current_sec() - time);
196         else
197                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
198         if (rc <= 0)
199                 return -ENOSPC;
200
201         *buf += rc;
202         *len -= rc;
203         return rc;
204 }
205
206 static int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
207                           const char *prefix)
208 {
209         int rc;
210
211         if (fid_is_zero(&pos->lp_dir_parent)) {
212                 if (pos->lp_oit_cookie == 0)
213                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
214                                       prefix);
215                 else
216                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
217                                       prefix, pos->lp_oit_cookie);
218         } else {
219                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
220                               prefix, pos->lp_oit_cookie,
221                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
222         }
223         if (rc <= 0)
224                 return -ENOSPC;
225
226         *buf += rc;
227         *len -= rc;
228         return rc;
229 }
230
231 static void mdd_lfsck_pos_fill(const struct lu_env *env, struct md_lfsck *lfsck,
232                                struct lfsck_position *pos, bool oit_processed,
233                                bool dir_processed)
234 {
235         const struct dt_it_ops *iops = &lfsck->ml_obj_oit->do_index_ops->dio_it;
236
237         spin_lock(&lfsck->ml_lock);
238         if (unlikely(lfsck->ml_di_oit == NULL)) {
239                 spin_unlock(&lfsck->ml_lock);
240                 memset(pos, 0, sizeof(*pos));
241                 return;
242         }
243
244         pos->lp_oit_cookie = iops->store(env, lfsck->ml_di_oit);
245
246         LASSERT(pos->lp_oit_cookie > 0);
247
248         if (!oit_processed)
249                 pos->lp_oit_cookie--;
250
251         if (lfsck->ml_di_dir != NULL) {
252                 struct dt_object *dto = lfsck->ml_obj_dir;
253
254                 pos->lp_dir_parent = *lu_object_fid(&dto->do_lu);
255                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
256                                                         lfsck->ml_di_dir);
257
258                 LASSERT(pos->lp_dir_cookie != MDS_DIR_DUMMY_START);
259
260                 if (pos->lp_dir_cookie == MDS_DIR_END_OFF)
261                         LASSERT(dir_processed);
262
263                 /* For the dir which just to be processed,
264                  * lp_dir_cookie will become MDS_DIR_DUMMY_START,
265                  * which can be correctly handled by mdd_lfsck_prep. */
266                 if (!dir_processed)
267                         pos->lp_dir_cookie--;
268         } else {
269                 fid_zero(&pos->lp_dir_parent);
270                 pos->lp_dir_cookie = 0;
271         }
272         spin_unlock(&lfsck->ml_lock);
273 }
274
275 static inline void mdd_lfsck_pos_set_zero(struct lfsck_position *pos)
276 {
277         memset(pos, 0, sizeof(*pos));
278 }
279
280 static inline int mdd_lfsck_pos_is_zero(const struct lfsck_position *pos)
281 {
282         return pos->lp_oit_cookie == 0 && fid_is_zero(&pos->lp_dir_parent);
283 }
284
285 static inline int mdd_lfsck_pos_is_eq(const struct lfsck_position *pos1,
286                                       const struct lfsck_position *pos2)
287 {
288         if (pos1->lp_oit_cookie < pos2->lp_oit_cookie)
289                 return -1;
290
291         if (pos1->lp_oit_cookie > pos2->lp_oit_cookie)
292                 return 1;
293
294         if (fid_is_zero(&pos1->lp_dir_parent) &&
295             !fid_is_zero(&pos2->lp_dir_parent))
296                 return -1;
297
298         if (!fid_is_zero(&pos1->lp_dir_parent) &&
299             fid_is_zero(&pos2->lp_dir_parent))
300                 return 1;
301
302         if (fid_is_zero(&pos1->lp_dir_parent) &&
303             fid_is_zero(&pos2->lp_dir_parent))
304                 return 0;
305
306         LASSERT(lu_fid_eq(&pos1->lp_dir_parent, &pos2->lp_dir_parent));
307
308         if (pos1->lp_dir_cookie < pos2->lp_dir_cookie)
309                 return -1;
310
311         if (pos1->lp_dir_cookie > pos2->lp_dir_cookie)
312                 return 1;
313
314         return 0;
315 }
316
317 static void mdd_lfsck_close_dir(const struct lu_env *env,
318                                 struct md_lfsck *lfsck)
319 {
320         struct dt_object        *dir_obj  = lfsck->ml_obj_dir;
321         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
322         struct dt_it            *dir_di   = lfsck->ml_di_dir;
323
324         spin_lock(&lfsck->ml_lock);
325         lfsck->ml_di_dir = NULL;
326         spin_unlock(&lfsck->ml_lock);
327
328         dir_iops->put(env, dir_di);
329         dir_iops->fini(env, dir_di);
330         lfsck->ml_obj_dir = NULL;
331         lu_object_put(env, &dir_obj->do_lu);
332 }
333
334 static void __mdd_lfsck_set_speed(struct md_lfsck *lfsck, __u32 limit)
335 {
336         lfsck->ml_bookmark_ram.lb_speed_limit = limit;
337         if (limit != LFSCK_SPEED_NO_LIMIT) {
338                 if (limit > CFS_HZ) {
339                         lfsck->ml_sleep_rate = limit / CFS_HZ;
340                         lfsck->ml_sleep_jif = 1;
341                 } else {
342                         lfsck->ml_sleep_rate = 1;
343                         lfsck->ml_sleep_jif = CFS_HZ / limit;
344                 }
345         } else {
346                 lfsck->ml_sleep_jif = 0;
347                 lfsck->ml_sleep_rate = 0;
348         }
349 }
350
351 static void mdd_lfsck_control_speed(struct md_lfsck *lfsck)
352 {
353         struct ptlrpc_thread *thread = &lfsck->ml_thread;
354         struct l_wait_info    lwi;
355
356         if (lfsck->ml_sleep_jif > 0 &&
357             lfsck->ml_new_scanned >= lfsck->ml_sleep_rate) {
358                 spin_lock(&lfsck->ml_lock);
359                 if (likely(lfsck->ml_sleep_jif > 0 &&
360                            lfsck->ml_new_scanned >= lfsck->ml_sleep_rate)) {
361                         lwi = LWI_TIMEOUT_INTR(lfsck->ml_sleep_jif, NULL,
362                                                LWI_ON_SIGNAL_NOOP, NULL);
363                         spin_unlock(&lfsck->ml_lock);
364
365                         l_wait_event(thread->t_ctl_waitq,
366                                      !thread_is_running(thread),
367                                      &lwi);
368                         lfsck->ml_new_scanned = 0;
369                 } else {
370                         spin_unlock(&lfsck->ml_lock);
371                 }
372         }
373 }
374
375 /* lfsck_bookmark file ops */
376
377 static void inline mdd_lfsck_bookmark_to_cpu(struct lfsck_bookmark *des,
378                                              struct lfsck_bookmark *src)
379 {
380         des->lb_magic = le32_to_cpu(src->lb_magic);
381         des->lb_version = le16_to_cpu(src->lb_version);
382         des->lb_param = le16_to_cpu(src->lb_param);
383         des->lb_speed_limit = le32_to_cpu(src->lb_speed_limit);
384 }
385
386 static void inline mdd_lfsck_bookmark_to_le(struct lfsck_bookmark *des,
387                                             struct lfsck_bookmark *src)
388 {
389         des->lb_magic = cpu_to_le32(src->lb_magic);
390         des->lb_version = cpu_to_le16(src->lb_version);
391         des->lb_param = cpu_to_le16(src->lb_param);
392         des->lb_speed_limit = cpu_to_le32(src->lb_speed_limit);
393 }
394
395 static int mdd_lfsck_bookmark_load(const struct lu_env *env,
396                                    struct md_lfsck *lfsck)
397 {
398         loff_t pos = 0;
399         int    len = sizeof(struct lfsck_bookmark);
400         int    rc;
401
402         rc = dt_record_read(env, lfsck->ml_bookmark_obj,
403                             mdd_buf_get(env, &lfsck->ml_bookmark_disk, len),
404                             &pos);
405         if (rc == 0) {
406                 struct lfsck_bookmark *bm = &lfsck->ml_bookmark_ram;
407
408                 mdd_lfsck_bookmark_to_cpu(bm, &lfsck->ml_bookmark_disk);
409                 if (bm->lb_magic != LFSCK_BOOKMARK_MAGIC) {
410                         CWARN("%.16s: invalid lfsck_bookmark magic "
411                               "0x%x != 0x%x\n", mdd_lfsck2name(lfsck),
412                               bm->lb_magic, LFSCK_BOOKMARK_MAGIC);
413                         /* Process it as new lfsck_bookmark. */
414                         rc = -ENODATA;
415                 }
416         } else {
417                 if (rc == -EFAULT && pos == 0)
418                         /* return -ENODATA for empty lfsck_bookmark. */
419                         rc = -ENODATA;
420                 else
421                         CERROR("%.16s: fail to load lfsck_bookmark, "
422                                "expected = %d, rc = %d\n",
423                                mdd_lfsck2name(lfsck), len, rc);
424         }
425         return rc;
426 }
427
428 static int mdd_lfsck_bookmark_store(const struct lu_env *env,
429                                     struct md_lfsck *lfsck)
430 {
431         struct mdd_device *mdd    = mdd_lfsck2mdd(lfsck);
432         struct thandle    *handle;
433         struct dt_object  *obj    = lfsck->ml_bookmark_obj;
434         loff_t             pos    = 0;
435         int                len    = sizeof(struct lfsck_bookmark);
436         int                rc;
437         ENTRY;
438
439         mdd_lfsck_bookmark_to_le(&lfsck->ml_bookmark_disk,
440                                  &lfsck->ml_bookmark_ram);
441         handle = dt_trans_create(env, mdd->mdd_bottom);
442         if (IS_ERR(handle)) {
443                 rc = PTR_ERR(handle);
444                 CERROR("%.16s: fail to create trans for storing "
445                        "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
446                 RETURN(rc);
447         }
448
449         rc = dt_declare_record_write(env, obj, len, 0, handle);
450         if (rc != 0) {
451                 CERROR("%.16s: fail to declare trans for storing "
452                        "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
453                 GOTO(out, rc);
454         }
455
456         rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
457         if (rc != 0) {
458                 CERROR("%.16s: fail to start trans for storing "
459                        "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
460                 GOTO(out, rc);
461         }
462
463         rc = dt_record_write(env, obj,
464                              mdd_buf_get(env, &lfsck->ml_bookmark_disk, len),
465                              &pos, handle);
466         if (rc != 0)
467                 CERROR("%.16s: fail to store lfsck_bookmark, expected = %d, "
468                        "rc = %d\n", mdd_lfsck2name(lfsck), len, rc);
469
470         GOTO(out, rc);
471
472 out:
473         dt_trans_stop(env, mdd->mdd_bottom, handle);
474         return rc;
475 }
476
477 static int mdd_lfsck_bookmark_init(const struct lu_env *env,
478                                    struct md_lfsck *lfsck)
479 {
480         struct lfsck_bookmark *mb = &lfsck->ml_bookmark_ram;
481         int rc;
482
483         memset(mb, 0, sizeof(mb));
484         mb->lb_magic = LFSCK_BOOKMARK_MAGIC;
485         mb->lb_version = LFSCK_VERSION_V2;
486         mutex_lock(&lfsck->ml_mutex);
487         rc = mdd_lfsck_bookmark_store(env, lfsck);
488         mutex_unlock(&lfsck->ml_mutex);
489         return rc;
490 }
491
492 /* lfsck_namespace file ops */
493
494 static void inline mdd_lfsck_position_to_cpu(struct lfsck_position *des,
495                                              struct lfsck_position *src)
496 {
497         des->lp_oit_cookie = le64_to_cpu(src->lp_oit_cookie);
498         fid_le_to_cpu(&des->lp_dir_parent, &src->lp_dir_parent);
499         des->lp_dir_cookie = le64_to_cpu(src->lp_dir_cookie);
500 }
501
502 static void inline mdd_lfsck_position_to_le(struct lfsck_position *des,
503                                              struct lfsck_position *src)
504 {
505         des->lp_oit_cookie = cpu_to_le64(src->lp_oit_cookie);
506         fid_cpu_to_le(&des->lp_dir_parent, &src->lp_dir_parent);
507         des->lp_dir_cookie = cpu_to_le64(src->lp_dir_cookie);
508 }
509
510 static void inline mdd_lfsck_namespace_to_cpu(struct lfsck_namespace *des,
511                                               struct lfsck_namespace *src)
512 {
513         des->ln_magic = le32_to_cpu(src->ln_magic);
514         des->ln_status = le32_to_cpu(src->ln_status);
515         des->ln_flags = le32_to_cpu(src->ln_flags);
516         des->ln_success_count = le32_to_cpu(src->ln_success_count);
517         des->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
518         des->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
519         des->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
520         des->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
521         des->ln_time_last_checkpoint =
522                                 le64_to_cpu(src->ln_time_last_checkpoint);
523         mdd_lfsck_position_to_cpu(&des->ln_pos_latest_start,
524                                   &src->ln_pos_latest_start);
525         mdd_lfsck_position_to_cpu(&des->ln_pos_last_checkpoint,
526                                   &src->ln_pos_last_checkpoint);
527         mdd_lfsck_position_to_cpu(&des->ln_pos_first_inconsistent,
528                                   &src->ln_pos_first_inconsistent);
529         des->ln_items_checked = le64_to_cpu(src->ln_items_checked);
530         des->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
531         des->ln_items_failed = le64_to_cpu(src->ln_items_failed);
532         des->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
533         des->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
534         des->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
535         des->ln_objs_repaired_phase2 =
536                                 le64_to_cpu(src->ln_objs_repaired_phase2);
537         des->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
538         des->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
539         des->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
540         fid_le_to_cpu(&des->ln_fid_latest_scanned_phase2,
541                       &src->ln_fid_latest_scanned_phase2);
542 }
543
544 static void inline mdd_lfsck_namespace_to_le(struct lfsck_namespace *des,
545                                              struct lfsck_namespace *src)
546 {
547         des->ln_magic = cpu_to_le32(src->ln_magic);
548         des->ln_status = cpu_to_le32(src->ln_status);
549         des->ln_flags = cpu_to_le32(src->ln_flags);
550         des->ln_success_count = cpu_to_le32(src->ln_success_count);
551         des->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
552         des->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
553         des->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
554         des->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
555         des->ln_time_last_checkpoint =
556                                 cpu_to_le64(src->ln_time_last_checkpoint);
557         mdd_lfsck_position_to_le(&des->ln_pos_latest_start,
558                                  &src->ln_pos_latest_start);
559         mdd_lfsck_position_to_le(&des->ln_pos_last_checkpoint,
560                                  &src->ln_pos_last_checkpoint);
561         mdd_lfsck_position_to_le(&des->ln_pos_first_inconsistent,
562                                  &src->ln_pos_first_inconsistent);
563         des->ln_items_checked = cpu_to_le64(src->ln_items_checked);
564         des->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
565         des->ln_items_failed = cpu_to_le64(src->ln_items_failed);
566         des->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
567         des->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
568         des->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
569         des->ln_objs_repaired_phase2 =
570                                 cpu_to_le64(src->ln_objs_repaired_phase2);
571         des->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
572         des->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
573         des->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
574         fid_cpu_to_le(&des->ln_fid_latest_scanned_phase2,
575                       &src->ln_fid_latest_scanned_phase2);
576 }
577
578 /**
579  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
580  * \retval 0: succeed.
581  * \retval -ve: failed cases.
582  */
583 static int mdd_lfsck_namespace_load(const struct lu_env *env,
584                                     struct lfsck_component *com)
585 {
586         int len = com->lc_file_size;
587         int rc;
588
589         rc = dt_xattr_get(env, com->lc_obj,
590                           mdd_buf_get(env, com->lc_file_disk, len),
591                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
592         if (rc == len) {
593                 struct lfsck_namespace *ns = com->lc_file_ram;
594
595                 mdd_lfsck_namespace_to_cpu(ns,
596                                 (struct lfsck_namespace *)com->lc_file_disk);
597                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
598                         CWARN("%.16s: invalid lfsck_namespace magic "
599                               "0x%x != 0x%x\n",
600                               mdd_lfsck2name(com->lc_lfsck),
601                               ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
602                         rc = 1;
603                 } else {
604                         rc = 0;
605                 }
606         } else if (rc != -ENODATA) {
607                 CERROR("%.16s: fail to load lfsck_namespace, expected = %d, "
608                        "rc = %d\n", mdd_lfsck2name(com->lc_lfsck), len, rc);
609                 if (rc >= 0)
610                         rc = 1;
611         }
612         return rc;
613 }
614
615 static int mdd_lfsck_namespace_store(const struct lu_env *env,
616                                      struct lfsck_component *com, bool init)
617 {
618         struct dt_object  *obj    = com->lc_obj;
619         struct md_lfsck   *lfsck  = com->lc_lfsck;
620         struct mdd_device *mdd    = mdd_lfsck2mdd(lfsck);
621         struct thandle    *handle;
622         int                len    = com->lc_file_size;
623         int                rc;
624         ENTRY;
625
626         mdd_lfsck_namespace_to_le((struct lfsck_namespace *)com->lc_file_disk,
627                                   (struct lfsck_namespace *)com->lc_file_ram);
628         handle = dt_trans_create(env, mdd->mdd_bottom);
629         if (IS_ERR(handle)) {
630                 rc = PTR_ERR(handle);
631                 CERROR("%.16s: fail to create trans for storing "
632                        "lfsck_namespace: %d\n,", mdd_lfsck2name(lfsck), rc);
633                 RETURN(rc);
634         }
635
636         rc = dt_declare_xattr_set(env, obj,
637                                   mdd_buf_get(env, com->lc_file_disk, len),
638                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
639         if (rc != 0) {
640                 CERROR("%.16s: fail to declare trans for storing "
641                        "lfsck_namespace: %d\n,", mdd_lfsck2name(lfsck), rc);
642                 GOTO(out, rc);
643         }
644
645         rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
646         if (rc != 0) {
647                 CERROR("%.16s: fail to start trans for storing "
648                        "lfsck_namespace: %d\n,", mdd_lfsck2name(lfsck), rc);
649                 GOTO(out, rc);
650         }
651
652         rc = dt_xattr_set(env, obj,
653                           mdd_buf_get(env, com->lc_file_disk, len),
654                           XATTR_NAME_LFSCK_NAMESPACE,
655                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
656                           handle, BYPASS_CAPA);
657         if (rc != 0)
658                 CERROR("%.16s: fail to store lfsck_namespace, len = %d, "
659                        "rc = %d\n", mdd_lfsck2name(lfsck), len, rc);
660
661         GOTO(out, rc);
662
663 out:
664         dt_trans_stop(env, mdd->mdd_bottom, handle);
665         return rc;
666 }
667
668 static int mdd_lfsck_namespace_init(const struct lu_env *env,
669                                     struct lfsck_component *com)
670 {
671         struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
672         int rc;
673
674         memset(ns, 0, sizeof(*ns));
675         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
676         ns->ln_status = LS_INIT;
677         down_write(&com->lc_sem);
678         rc = mdd_lfsck_namespace_store(env, com, true);
679         up_write(&com->lc_sem);
680         return rc;
681 }
682
683 static int mdd_declare_lfsck_namespace_unlink(const struct lu_env *env,
684                                               struct mdd_device *mdd,
685                                               struct dt_object *p,
686                                               struct dt_object *c,
687                                               const char *name,
688                                               struct thandle *handle)
689 {
690         int rc;
691
692         rc = dt_declare_delete(env, p, (const struct dt_key *)name, handle);
693         if (rc != 0)
694                 return rc;
695
696         rc = dt_declare_ref_del(env, c, handle);
697         if (rc != 0)
698                 return rc;
699
700         rc = dt_declare_destroy(env, c, handle);
701         return rc;
702 }
703
704 static int mdd_lfsck_namespace_unlink(const struct lu_env *env,
705                                       struct mdd_device *mdd,
706                                       struct lfsck_component *com)
707 {
708         struct mdd_thread_info  *info   = mdd_env_info(env);
709         struct lu_fid           *fid    = &info->mti_fid;
710         struct dt_object        *child  = com->lc_obj;
711         struct dt_object        *parent;
712         struct thandle          *handle;
713         int                      rc;
714         ENTRY;
715
716         parent = dt_store_resolve(env, mdd->mdd_bottom, "", fid);
717         if (IS_ERR(parent))
718                 RETURN(rc = PTR_ERR(parent));
719
720         if (dt_try_as_dir(env, parent))
721                 GOTO(out, rc = -ENOTDIR);
722
723         handle = dt_trans_create(env, mdd->mdd_bottom);
724         if (IS_ERR(handle))
725                 GOTO(out, rc = PTR_ERR(handle));
726
727         rc = mdd_declare_lfsck_namespace_unlink(env, mdd, parent, child,
728                                                 lfsck_namespace_name, handle);
729         if (rc != 0)
730                 GOTO(stop, rc);
731
732         rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
733         if (rc != 0)
734                 GOTO(stop, rc);
735
736         rc = dt_delete(env, parent, (struct dt_key *)lfsck_namespace_name,
737                        handle, BYPASS_CAPA);
738         if (rc != 0)
739                 GOTO(stop, rc);
740
741         rc = child->do_ops->do_ref_del(env, child, handle);
742         if (rc != 0) {
743                 lu_local_obj_fid(fid, LFSCK_NAMESPACE_OID);
744                 rc = dt_insert(env, parent,
745                                (const struct dt_rec*)fid,
746                                (const struct dt_key *)lfsck_namespace_name,
747                                handle, BYPASS_CAPA, 1);
748
749                 GOTO(stop, rc);
750         }
751
752
753         rc = dt_destroy(env, child, handle);
754         if (rc == 0) {
755                 lu_object_put(env, &child->do_lu);
756                 com->lc_obj = NULL;
757         }
758
759         GOTO(stop, rc);
760
761 stop:
762         dt_trans_stop(env, mdd->mdd_bottom, handle);
763
764 out:
765         lu_object_put(env, &parent->do_lu);
766         return rc;
767 }
768
769 /* namespace APIs */
770
771 static int mdd_lfsck_namespace_reset(const struct lu_env *env,
772                                      struct lfsck_component *com, bool init)
773 {
774         struct mdd_thread_info  *info = mdd_env_info(env);
775         struct lu_fid           *fid  = &info->mti_fid;
776         struct lfsck_namespace  *ns   = (struct lfsck_namespace *)com->lc_file_ram;
777         struct mdd_device       *mdd  = mdd_lfsck2mdd(com->lc_lfsck);
778         struct md_object        *mdo;
779         struct dt_object        *dto;
780         int                      rc;
781         ENTRY;
782
783         down_write(&com->lc_sem);
784         if (init) {
785                 memset(ns, 0, sizeof(*ns));
786         } else {
787                 __u32 count = ns->ln_success_count;
788                 __u64 last_time = ns->ln_time_last_complete;
789
790                 memset(ns, 0, sizeof(*ns));
791                 ns->ln_success_count = count;
792                 ns->ln_time_last_complete = last_time;
793         }
794         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
795         ns->ln_status = LS_INIT;
796
797         rc = mdd_lfsck_namespace_unlink(env, mdd, com);
798         if (rc != 0)
799                 GOTO(out, rc);
800
801         lu_local_obj_fid(fid, LFSCK_NAMESPACE_OID);
802         mdo = llo_store_create_index(env, &mdd->mdd_md_dev, mdd->mdd_bottom, "",
803                                      lfsck_namespace_name, fid,
804                                      &dt_lfsck_features);
805         if (IS_ERR(mdo))
806                 GOTO(out, rc = PTR_ERR(mdo));
807
808         lu_object_put(env, &mdo->mo_lu);
809         dto = dt_store_open(env, mdd->mdd_bottom, "", lfsck_namespace_name, fid);
810         if (IS_ERR(dto))
811                 GOTO(out, rc = PTR_ERR(dto));
812
813         com->lc_obj = dto;
814         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
815         if (rc != 0)
816                 GOTO(out, rc);
817
818         rc = mdd_lfsck_namespace_store(env, com, true);
819
820         GOTO(out, rc);
821
822 out:
823         up_write(&com->lc_sem);
824         return rc;
825 }
826
827 static void
828 mdd_lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
829                          bool oit, bool new_checked)
830 {
831         struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
832
833         down_write(&com->lc_sem);
834         if (new_checked)
835                 com->lc_new_checked++;
836         ns->ln_items_failed++;
837         if (mdd_lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
838                 mdd_lfsck_pos_fill(env, com->lc_lfsck,
839                                    &ns->ln_pos_first_inconsistent, oit, !oit);
840         up_write(&com->lc_sem);
841 }
842
843 static int mdd_lfsck_namespace_checkpoint(const struct lu_env *env,
844                                           struct lfsck_component *com,
845                                           bool init)
846 {
847         struct md_lfsck         *lfsck = com->lc_lfsck;
848         struct lfsck_namespace  *ns    =
849                                 (struct lfsck_namespace *)com->lc_file_ram;
850         int                      rc;
851
852         if (com->lc_new_checked == 0 && !init)
853                 return 0;
854
855         down_write(&com->lc_sem);
856
857         ns->ln_pos_last_checkpoint = lfsck->ml_pos_current;
858         if (init) {
859                 ns->ln_time_last_checkpoint = ns->ln_time_latest_start;
860                 ns->ln_pos_latest_start = lfsck->ml_pos_current;
861         } else {
862                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
863                                 HALF_SEC - lfsck->ml_time_last_checkpoint);
864                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
865                 ns->ln_items_checked += com->lc_new_checked;
866                 com->lc_new_checked = 0;
867         }
868
869         rc = mdd_lfsck_namespace_store(env, com, false);
870
871         up_write(&com->lc_sem);
872         return rc;
873 }
874
875 static int mdd_lfsck_namespace_prep(const struct lu_env *env,
876                                     struct lfsck_component *com)
877 {
878         struct md_lfsck         *lfsck  = com->lc_lfsck;
879         struct lfsck_namespace  *ns     =
880                                 (struct lfsck_namespace *)com->lc_file_ram;
881         struct lfsck_position   *pos    = &com->lc_pos_start;
882
883         if (ns->ln_status == LS_COMPLETED) {
884                 int rc;
885
886                 rc = mdd_lfsck_namespace_reset(env, com, false);
887                 if (rc != 0)
888                         return rc;
889         }
890
891         down_write(&com->lc_sem);
892
893         ns->ln_time_latest_start = cfs_time_current_sec();
894
895         spin_lock(&lfsck->ml_lock);
896         if (ns->ln_flags & LF_SCANNED_ONCE) {
897                 if (!lfsck->ml_drop_dryrun ||
898                     mdd_lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
899                         ns->ln_status = LS_SCANNING_PHASE2;
900                         cfs_list_del_init(&com->lc_link);
901                         cfs_list_add_tail(&com->lc_link,
902                                           &lfsck->ml_list_double_scan);
903                         if (!cfs_list_empty(&com->lc_link_dir))
904                                 cfs_list_del_init(&com->lc_link_dir);
905                         mdd_lfsck_pos_set_zero(pos);
906                 } else {
907                         ns->ln_status = LS_SCANNING_PHASE1;
908                         ns->ln_run_time_phase1 = 0;
909                         ns->ln_run_time_phase2 = 0;
910                         ns->ln_items_checked = 0;
911                         ns->ln_items_repaired = 0;
912                         ns->ln_items_failed = 0;
913                         ns->ln_dirs_checked = 0;
914                         ns->ln_mlinked_checked = 0;
915                         ns->ln_objs_checked_phase2 = 0;
916                         ns->ln_objs_repaired_phase2 = 0;
917                         ns->ln_objs_failed_phase2 = 0;
918                         ns->ln_objs_nlink_repaired = 0;
919                         ns->ln_objs_lost_found = 0;
920                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
921                         if (cfs_list_empty(&com->lc_link_dir))
922                                 cfs_list_add_tail(&com->lc_link_dir,
923                                                   &lfsck->ml_list_dir);
924                         *pos = ns->ln_pos_first_inconsistent;
925                 }
926         } else {
927                 ns->ln_status = LS_SCANNING_PHASE1;
928                 if (cfs_list_empty(&com->lc_link_dir))
929                         cfs_list_add_tail(&com->lc_link_dir,
930                                           &lfsck->ml_list_dir);
931                 if (!lfsck->ml_drop_dryrun ||
932                     mdd_lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
933                         *pos = ns->ln_pos_last_checkpoint;
934                         pos->lp_oit_cookie++;
935                         if (!fid_is_zero(&pos->lp_dir_parent)) {
936                                 if (pos->lp_dir_cookie == MDS_DIR_END_OFF) {
937                                         fid_zero(&pos->lp_dir_parent);
938                                 } else {
939                                         pos->lp_dir_cookie++;
940                                 }
941                         }
942                 } else {
943                         *pos = ns->ln_pos_first_inconsistent;
944                 }
945         }
946         spin_unlock(&lfsck->ml_lock);
947
948         up_write(&com->lc_sem);
949         return 0;
950 }
951
952 static int mdd_lfsck_namespace_exec_oit(const struct lu_env *env,
953                                         struct lfsck_component *com,
954                                         struct mdd_object *obj)
955 {
956         down_write(&com->lc_sem);
957         com->lc_new_checked++;
958         if (S_ISDIR(mdd_object_type(obj)))
959                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
960         up_write(&com->lc_sem);
961         return 0;
962 }
963
964 /* XXX: to be implemented in other patch.  */
965 static int mdd_lfsck_namespace_exec_dir(const struct lu_env *env,
966                                         struct lfsck_component *com,
967                                         struct mdd_object *obj,
968                                         struct lu_dirent *ent)
969 {
970         struct lfsck_namespace     *ns       =
971                                 (struct lfsck_namespace *)com->lc_file_ram;
972         const struct lu_name       *cname;
973         int                         repaired;
974
975         cname = mdd_name_get_const(env, ent->lde_name, ent->lde_namelen);
976         down_write(&com->lc_sem);
977         com->lc_new_checked++;
978
979         if (ent->lde_attrs & LUDA_UPGRADE) {
980                 ns->ln_flags |= LF_UPGRADE;
981                 repaired = 1;
982         } else if (ent->lde_attrs & LUDA_REPAIR) {
983                 ns->ln_flags |= LF_INCONSISTENT;
984                 repaired = 1;
985         } else {
986                 repaired = 0;
987         }
988
989         ns->ln_items_repaired += repaired;
990         up_write(&com->lc_sem);
991         return 0;
992 }
993
994 static int mdd_lfsck_namespace_post(const struct lu_env *env,
995                                     struct lfsck_component *com,
996                                     int result)
997 {
998         struct md_lfsck         *lfsck = com->lc_lfsck;
999         struct lfsck_namespace  *ns    =
1000                                 (struct lfsck_namespace *)com->lc_file_ram;
1001         int                      rc;
1002
1003         down_write(&com->lc_sem);
1004
1005         spin_lock(&lfsck->ml_lock);
1006         if (result > 0) {
1007                 ns->ln_status = LS_SCANNING_PHASE2;
1008                 ns->ln_flags |= LF_SCANNED_ONCE;
1009                 ns->ln_flags &= ~LF_UPGRADE;
1010                 cfs_list_del_init(&com->lc_link);
1011                 cfs_list_del_init(&com->lc_link_dir);
1012                 cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_double_scan);
1013         } else if (result == 0) {
1014                 if (lfsck->ml_paused) {
1015                         ns->ln_status = LS_PAUSED;
1016                 } else {
1017                         ns->ln_status = LS_STOPPED;
1018                         cfs_list_del_init(&com->lc_link);
1019                         cfs_list_del_init(&com->lc_link_dir);
1020                         cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_idle);
1021                 }
1022         } else {
1023                 ns->ln_status = LS_FAILED;
1024                 cfs_list_del_init(&com->lc_link);
1025                 cfs_list_del_init(&com->lc_link_dir);
1026                 cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_idle);
1027         }
1028         spin_unlock(&lfsck->ml_lock);
1029
1030         ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1031                                 HALF_SEC - lfsck->ml_time_last_checkpoint);
1032         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1033         ns->ln_items_checked += com->lc_new_checked;
1034         com->lc_new_checked = 0;
1035
1036         rc = mdd_lfsck_namespace_store(env, com, false);
1037
1038         up_write(&com->lc_sem);
1039         return rc;
1040 }
1041
1042 static int
1043 mdd_lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1044                          char *buf, int len)
1045 {
1046         struct md_lfsck         *lfsck = com->lc_lfsck;
1047         struct lfsck_bookmark   *bk    = &lfsck->ml_bookmark_ram;
1048         struct lfsck_namespace  *ns    =
1049                                 (struct lfsck_namespace *)com->lc_file_ram;
1050         int                      save  = len;
1051         int                      ret   = -ENOSPC;
1052         int                      rc;
1053
1054         down_read(&com->lc_sem);
1055         rc = snprintf(buf, len,
1056                       "name: lfsck_namespace\n"
1057                       "magic: 0x%x\n"
1058                       "version: %d\n"
1059                       "status: %s\n",
1060                       ns->ln_magic,
1061                       bk->lb_version,
1062                       lfsck_status_names[ns->ln_status]);
1063         if (rc <= 0)
1064                 goto out;
1065
1066         buf += rc;
1067         len -= rc;
1068         rc = lfsck_bits_dump(&buf, &len, ns->ln_flags, lfsck_flags_names,
1069                              "flags");
1070         if (rc < 0)
1071                 goto out;
1072
1073         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
1074                              "param");
1075         if (rc < 0)
1076                 goto out;
1077
1078         rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_complete,
1079                              "time_since_last_completed");
1080         if (rc < 0)
1081                 goto out;
1082
1083         rc = lfsck_time_dump(&buf, &len, ns->ln_time_latest_start,
1084                              "time_since_latest_start");
1085         if (rc < 0)
1086                 goto out;
1087
1088         rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_checkpoint,
1089                              "time_since_last_checkpoint");
1090         if (rc < 0)
1091                 goto out;
1092
1093         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_latest_start,
1094                             "latest_start_position");
1095         if (rc < 0)
1096                 goto out;
1097
1098         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_last_checkpoint,
1099                             "last_checkpoint_position");
1100         if (rc < 0)
1101                 goto out;
1102
1103         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_first_inconsistent,
1104                             "first_failure_position");
1105         if (rc < 0)
1106                 goto out;
1107
1108         if (ns->ln_status == LS_SCANNING_PHASE1) {
1109                 struct lfsck_position pos;
1110                 cfs_duration_t duration = cfs_time_current() -
1111                                           lfsck->ml_time_last_checkpoint;
1112                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1113                 __u64 speed = checked;
1114                 __u64 new_checked = com->lc_new_checked * CFS_HZ;
1115                 __u32 rtime = ns->ln_run_time_phase1 +
1116                               cfs_duration_sec(duration + HALF_SEC);
1117
1118                 if (duration != 0)
1119                         do_div(new_checked, duration);
1120                 if (rtime != 0)
1121                         do_div(speed, rtime);
1122                 rc = snprintf(buf, len,
1123                               "checked_phase1: "LPU64"\n"
1124                               "checked_phase2: "LPU64"\n"
1125                               "updated_phase1: "LPU64"\n"
1126                               "updated_phase2: "LPU64"\n"
1127                               "failed_phase1: "LPU64"\n"
1128                               "failed_phase2: "LPU64"\n"
1129                               "dirs: "LPU64"\n"
1130                               "M-linked: "LPU64"\n"
1131                               "nlinks_repaired: "LPU64"\n"
1132                               "lost_found: "LPU64"\n"
1133                               "success_count: %u\n"
1134                               "run_time_phase1: %u seconds\n"
1135                               "run_time_phase2: %u seconds\n"
1136                               "average_speed_phase1: "LPU64" items/sec\n"
1137                               "average_speed_phase2: N/A\n"
1138                               "real-time_speed_phase1: "LPU64" items/sec\n"
1139                               "real-time_speed_phase2: N/A\n",
1140                               checked,
1141                               ns->ln_objs_checked_phase2,
1142                               ns->ln_items_repaired,
1143                               ns->ln_objs_repaired_phase2,
1144                               ns->ln_items_failed,
1145                               ns->ln_objs_failed_phase2,
1146                               ns->ln_dirs_checked,
1147                               ns->ln_mlinked_checked,
1148                               ns->ln_objs_nlink_repaired,
1149                               ns->ln_objs_lost_found,
1150                               ns->ln_success_count,
1151                               rtime,
1152                               ns->ln_run_time_phase2,
1153                               speed,
1154                               new_checked);
1155                 if (rc <= 0)
1156                         goto out;
1157
1158                 buf += rc;
1159                 len -= rc;
1160                 mdd_lfsck_pos_fill(env, lfsck, &pos, true, true);
1161                 rc = lfsck_pos_dump(&buf, &len, &pos, "current_position");
1162                 if (rc <= 0)
1163                         goto out;
1164         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1165                 cfs_duration_t duration = cfs_time_current() -
1166                                           lfsck->ml_time_last_checkpoint;
1167                 __u64 checked = ns->ln_objs_checked_phase2 +
1168                                 com->lc_new_checked;
1169                 __u64 speed1 = ns->ln_items_checked;
1170                 __u64 speed2 = checked;
1171                 __u64 new_checked = com->lc_new_checked * CFS_HZ;
1172                 __u32 rtime = ns->ln_run_time_phase2 +
1173                               cfs_duration_sec(duration + HALF_SEC);
1174
1175                 if (duration != 0)
1176                         do_div(new_checked, duration);
1177                 if (ns->ln_run_time_phase1 != 0)
1178                         do_div(speed1, ns->ln_run_time_phase1);
1179                 if (rtime != 0)
1180                         do_div(speed2, rtime);
1181                 rc = snprintf(buf, len,
1182                               "checked_phase1: "LPU64"\n"
1183                               "checked_phase2: "LPU64"\n"
1184                               "updated_phase1: "LPU64"\n"
1185                               "updated_phase2: "LPU64"\n"
1186                               "failed_phase1: "LPU64"\n"
1187                               "failed_phase2: "LPU64"\n"
1188                               "dirs: "LPU64"\n"
1189                               "M-linked: "LPU64"\n"
1190                               "nlinks_repaired: "LPU64"\n"
1191                               "lost_found: "LPU64"\n"
1192                               "success_count: %u\n"
1193                               "run_time_phase1: %u seconds\n"
1194                               "run_time_phase2: %u seconds\n"
1195                               "average_speed_phase1: "LPU64" items/sec\n"
1196                               "average_speed_phase2: "LPU64" objs/sec\n"
1197                               "real-time_speed_phase1: N/A\n"
1198                               "real-time_speed_phase2: "LPU64" objs/sec\n"
1199                               "current_position: "DFID"\n",
1200                               ns->ln_items_checked,
1201                               checked,
1202                               ns->ln_items_repaired,
1203                               ns->ln_objs_repaired_phase2,
1204                               ns->ln_items_failed,
1205                               ns->ln_objs_failed_phase2,
1206                               ns->ln_dirs_checked,
1207                               ns->ln_mlinked_checked,
1208                               ns->ln_objs_nlink_repaired,
1209                               ns->ln_objs_lost_found,
1210                               ns->ln_success_count,
1211                               ns->ln_run_time_phase1,
1212                               rtime,
1213                               speed1,
1214                               speed2,
1215                               new_checked,
1216                               PFID(&ns->ln_fid_latest_scanned_phase2));
1217                 if (rc <= 0)
1218                         goto out;
1219
1220                 buf += rc;
1221                 len -= rc;
1222         } else {
1223                 __u64 speed1 = ns->ln_items_checked;
1224                 __u64 speed2 = ns->ln_objs_checked_phase2;
1225
1226                 if (ns->ln_run_time_phase1 != 0)
1227                         do_div(speed1, ns->ln_run_time_phase1);
1228                 if (ns->ln_run_time_phase2 != 0)
1229                         do_div(speed2, ns->ln_run_time_phase2);
1230                 rc = snprintf(buf, len,
1231                               "checked_phase1: "LPU64"\n"
1232                               "checked_phase2: "LPU64"\n"
1233                               "updated_phase1: "LPU64"\n"
1234                               "updated_phase2: "LPU64"\n"
1235                               "failed_phase1: "LPU64"\n"
1236                               "failed_phase2: "LPU64"\n"
1237                               "dirs: "LPU64"\n"
1238                               "M-linked: "LPU64"\n"
1239                               "nlinks_repaired: "LPU64"\n"
1240                               "lost_found: "LPU64"\n"
1241                               "success_count: %u\n"
1242                               "run_time_phase1: %u seconds\n"
1243                               "run_time_phase2: %u seconds\n"
1244                               "average_speed_phase1: "LPU64" items/sec\n"
1245                               "average_speed_phase2: "LPU64" objs/sec\n"
1246                               "real-time_speed_phase1: N/A\n"
1247                               "real-time_speed_phase2: N/A\n"
1248                               "current_position: N/A\n",
1249                               ns->ln_items_checked,
1250                               ns->ln_objs_checked_phase2,
1251                               ns->ln_items_repaired,
1252                               ns->ln_objs_repaired_phase2,
1253                               ns->ln_items_failed,
1254                               ns->ln_objs_failed_phase2,
1255                               ns->ln_dirs_checked,
1256                               ns->ln_mlinked_checked,
1257                               ns->ln_objs_nlink_repaired,
1258                               ns->ln_objs_lost_found,
1259                               ns->ln_success_count,
1260                               ns->ln_run_time_phase1,
1261                               ns->ln_run_time_phase2,
1262                               speed1,
1263                               speed2);
1264                 if (rc <= 0)
1265                         goto out;
1266
1267                 buf += rc;
1268                 len -= rc;
1269         }
1270         ret = save - len;
1271
1272 out:
1273         up_read(&com->lc_sem);
1274         return ret;
1275 }
1276
1277 /* XXX: to be implemented in other patch.  */
1278 static int mdd_lfsck_namespace_double_scan(const struct lu_env *env,
1279                                            struct lfsck_component *com)
1280 {
1281         struct md_lfsck         *lfsck  = com->lc_lfsck;
1282         struct lfsck_bookmark   *bk     = &lfsck->ml_bookmark_ram;
1283         struct lfsck_namespace  *ns     =
1284                                 (struct lfsck_namespace *)com->lc_file_ram;
1285         int                      rc;
1286
1287         down_write(&com->lc_sem);
1288
1289         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1290         com->lc_new_checked = 0;
1291         com->lc_journal = 0;
1292
1293         ns->ln_status = LS_COMPLETED;
1294         if (!(bk->lb_param & LPF_DRYRUN))
1295                 ns->ln_flags &=
1296                 ~(LF_SCANNED_ONCE | LF_INCONSISTENT | LF_UPGRADE);
1297         ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1298         ns->ln_success_count++;
1299
1300         spin_lock(&lfsck->ml_lock);
1301         cfs_list_del_init(&com->lc_link);
1302         cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_idle);
1303         spin_unlock(&lfsck->ml_lock);
1304
1305         rc = mdd_lfsck_namespace_store(env, com, false);
1306
1307         up_write(&com->lc_sem);
1308         return rc;
1309 }
1310
1311 static struct lfsck_operations mdd_lfsck_namespace_ops = {
1312         .lfsck_reset            = mdd_lfsck_namespace_reset,
1313         .lfsck_fail             = mdd_lfsck_namespace_fail,
1314         .lfsck_checkpoint       = mdd_lfsck_namespace_checkpoint,
1315         .lfsck_prep             = mdd_lfsck_namespace_prep,
1316         .lfsck_exec_oit         = mdd_lfsck_namespace_exec_oit,
1317         .lfsck_exec_dir         = mdd_lfsck_namespace_exec_dir,
1318         .lfsck_post             = mdd_lfsck_namespace_post,
1319         .lfsck_dump             = mdd_lfsck_namespace_dump,
1320         .lfsck_double_scan      = mdd_lfsck_namespace_double_scan,
1321 };
1322
1323 /* LFSCK component setup/cleanup functions */
1324
1325 static int mdd_lfsck_namespace_setup(const struct lu_env *env,
1326                                      struct md_lfsck *lfsck)
1327 {
1328         struct mdd_device      *mdd = mdd_lfsck2mdd(lfsck);
1329         struct lfsck_component *com;
1330         struct lfsck_namespace *ns;
1331         struct dt_object       *obj;
1332         int                     rc;
1333         ENTRY;
1334
1335         OBD_ALLOC_PTR(com);
1336         if (com == NULL)
1337                 RETURN(-ENOMEM);
1338
1339         CFS_INIT_LIST_HEAD(&com->lc_link);
1340         CFS_INIT_LIST_HEAD(&com->lc_link_dir);
1341         init_rwsem(&com->lc_sem);
1342         atomic_set(&com->lc_ref, 1);
1343         com->lc_lfsck = lfsck;
1344         com->lc_type = LT_NAMESPACE;
1345         com->lc_ops = &mdd_lfsck_namespace_ops;
1346         com->lc_file_size = sizeof(struct lfsck_namespace);
1347         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
1348         if (com->lc_file_ram == NULL)
1349                 GOTO(out, rc = -ENOMEM);
1350
1351         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
1352         if (com->lc_file_disk == NULL)
1353                 GOTO(out, rc = -ENOMEM);
1354
1355         obj = dt_store_open(env, mdd->mdd_bottom, "", lfsck_namespace_name,
1356                             &mdd_env_info(env)->mti_fid);
1357         if (IS_ERR(obj))
1358                 GOTO(out, rc = PTR_ERR(obj));
1359
1360         com->lc_obj = obj;
1361         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
1362         if (rc != 0)
1363                 GOTO(out, rc);
1364
1365         rc = mdd_lfsck_namespace_load(env, com);
1366         if (rc > 0)
1367                 rc = mdd_lfsck_namespace_reset(env, com, true);
1368         else if (rc == -ENODATA)
1369                 rc = mdd_lfsck_namespace_init(env, com);
1370         if (rc != 0)
1371                 GOTO(out, rc);
1372
1373         ns = (struct lfsck_namespace *)com->lc_file_ram;
1374         switch (ns->ln_status) {
1375         case LS_INIT:
1376         case LS_COMPLETED:
1377         case LS_FAILED:
1378         case LS_STOPPED:
1379                 cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_idle);
1380                 break;
1381         default:
1382                 CERROR("%s: unknown status: %u\n",
1383                        mdd_lfsck2name(lfsck), ns->ln_status);
1384                 /* fall through */
1385         case LS_SCANNING_PHASE1:
1386         case LS_SCANNING_PHASE2:
1387                 /* No need to store the status to disk right now.
1388                  * If the system crashed before the status stored,
1389                  * it will be loaded back when next time. */
1390                 ns->ln_status = LS_CRASHED;
1391                 /* fall through */
1392         case LS_PAUSED:
1393         case LS_CRASHED:
1394                 cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_scan);
1395                 cfs_list_add_tail(&com->lc_link_dir, &lfsck->ml_list_dir);
1396                 break;
1397         }
1398
1399         GOTO(out, rc = 0);
1400
1401 out:
1402         if (rc != 0)
1403                 mdd_lfsck_component_cleanup(env, com);
1404         return rc;
1405 }
1406
1407 /* helper functions for framework */
1408
1409 static int object_is_client_visible(const struct lu_env *env,
1410                                     struct mdd_device *mdd,
1411                                     struct mdd_object *obj)
1412 {
1413         struct lu_fid *fid   = &mdd_env_info(env)->mti_fid;
1414         int            depth = 0;
1415         int            rc;
1416
1417         LASSERT(S_ISDIR(mdd_object_type(obj)));
1418
1419         while (1) {
1420                 if (mdd_is_root(mdd, mdo2fid(obj))) {
1421                         if (depth > 0)
1422                                 mdd_object_put(env, obj);
1423                         return 1;
1424                 }
1425
1426                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1427                 if (unlikely(mdd_is_dead_obj(obj))) {
1428                         mdd_read_unlock(env, obj);
1429                         if (depth > 0)
1430                                 mdd_object_put(env, obj);
1431                         return 0;
1432                 }
1433
1434                 rc = dt_xattr_get(env, mdd_object_child(obj),
1435                                   mdd_buf_get(env, NULL, 0), XATTR_NAME_LINK,
1436                                   BYPASS_CAPA);
1437                 mdd_read_unlock(env, obj);
1438                 if (rc >= 0) {
1439                         if (depth > 0)
1440                                 mdd_object_put(env, obj);
1441                         return 1;
1442                 }
1443
1444                 if (rc < 0 && rc != -ENODATA) {
1445                         if (depth > 0)
1446                                 mdd_object_put(env, obj);
1447                         return rc;
1448                 }
1449
1450                 rc = mdd_parent_fid(env, obj, fid);
1451                 if (depth > 0)
1452                         mdd_object_put(env, obj);
1453                 if (rc != 0)
1454                         return rc;
1455
1456                 if (unlikely(lu_fid_eq(fid, &mdd->mdd_local_root_fid)))
1457                         return 0;
1458
1459                 obj = mdd_object_find(env, mdd, fid);
1460                 if (obj == NULL)
1461                         return 0;
1462                 else if (IS_ERR(obj))
1463                         return PTR_ERR(obj);
1464
1465                 /* XXX: need more processing for remote object in the future. */
1466                 if (!mdd_object_exists(obj) || mdd_object_remote(obj)) {
1467                         mdd_object_put(env, obj);
1468                         return 0;
1469                 }
1470
1471                 depth++;
1472         }
1473         return 0;
1474 }
1475
1476 static void mdd_lfsck_unpack_ent(struct lu_dirent *ent)
1477 {
1478         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
1479         ent->lde_hash = le64_to_cpu(ent->lde_hash);
1480         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
1481         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1482         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
1483
1484         /* Make sure the name is terminated with '0'.
1485          * The data (type) after ent::lde_name maybe
1486          * broken, but we do not care. */
1487         ent->lde_name[ent->lde_namelen] = 0;
1488 }
1489
1490 /* LFSCK wrap functions */
1491
1492 static void mdd_lfsck_fail(const struct lu_env *env, struct md_lfsck *lfsck,
1493                            bool oit, bool new_checked)
1494 {
1495         struct lfsck_component *com;
1496
1497         cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
1498                 com->lc_ops->lfsck_fail(env, com, oit, new_checked);
1499         }
1500 }
1501
1502 static int mdd_lfsck_checkpoint(const struct lu_env *env,
1503                                 struct md_lfsck *lfsck, bool oit)
1504 {
1505         struct lfsck_component *com;
1506         int                     rc;
1507
1508         if (likely(cfs_time_beforeq(cfs_time_current(),
1509                                     lfsck->ml_time_next_checkpoint)))
1510                 return 0;
1511
1512         mdd_lfsck_pos_fill(env, lfsck, &lfsck->ml_pos_current, oit, !oit);
1513         cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
1514                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
1515                 if (rc != 0)
1516                         return rc;;
1517         }
1518
1519         lfsck->ml_time_last_checkpoint = cfs_time_current();
1520         lfsck->ml_time_next_checkpoint = lfsck->ml_time_last_checkpoint +
1521                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1522         return 0;
1523 }
1524
1525 static int mdd_lfsck_prep(struct lu_env *env, struct md_lfsck *lfsck)
1526 {
1527         struct mdd_device      *mdd     = mdd_lfsck2mdd(lfsck);
1528         struct mdd_object      *obj     = NULL;
1529         struct dt_object       *dt_obj;
1530         struct lfsck_component *com;
1531         struct lfsck_component *next;
1532         struct lfsck_position  *pos     = NULL;
1533         const struct dt_it_ops *iops    =
1534                                 &lfsck->ml_obj_oit->do_index_ops->dio_it;
1535         struct dt_it           *di;
1536         int                     rc;
1537         ENTRY;
1538
1539         LASSERT(lfsck->ml_obj_dir == NULL);
1540         LASSERT(lfsck->ml_di_dir == NULL);
1541
1542         cfs_list_for_each_entry_safe(com, next, &lfsck->ml_list_scan, lc_link) {
1543                 com->lc_new_checked = 0;
1544                 if (lfsck->ml_bookmark_ram.lb_param & LPF_DRYRUN)
1545                         com->lc_journal = 0;
1546
1547                 rc = com->lc_ops->lfsck_prep(env, com);
1548                 if (rc != 0)
1549                         RETURN(rc);
1550
1551                 if ((pos == NULL) ||
1552                     (!mdd_lfsck_pos_is_zero(&com->lc_pos_start) &&
1553                      mdd_lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
1554                         pos = &com->lc_pos_start;
1555         }
1556
1557         /* Init otable-based iterator. */
1558         if (pos == NULL) {
1559                 rc = iops->load(env, lfsck->ml_di_oit, 0);
1560                 GOTO(out, rc = (rc >= 0 ? 0 : rc));
1561         }
1562
1563         rc = iops->load(env, lfsck->ml_di_oit, pos->lp_oit_cookie);
1564         if (rc < 0)
1565                 GOTO(out, rc);
1566
1567         if (fid_is_zero(&pos->lp_dir_parent))
1568                 GOTO(out, rc = 0);
1569
1570         /* Find the directory for namespace-based traverse. */
1571         obj = mdd_object_find(env, mdd, &pos->lp_dir_parent);
1572         if (obj == NULL)
1573                 GOTO(out, rc = 0);
1574         else if (IS_ERR(obj))
1575                 RETURN(PTR_ERR(obj));
1576
1577         /* XXX: need more processing for remote object in the future. */
1578         if (!mdd_object_exists(obj) || mdd_object_remote(obj) ||
1579             unlikely(!S_ISDIR(mdd_object_type(obj))))
1580                 GOTO(out, rc = 0);
1581
1582         if (unlikely(mdd_is_dead_obj(obj)))
1583                 GOTO(out, rc = 0);
1584
1585         dt_obj = mdd_object_child(obj);
1586         if (unlikely(!dt_try_as_dir(env, dt_obj)))
1587                 GOTO(out, rc = -ENOTDIR);
1588
1589         /* Init the namespace-based directory traverse. */
1590         iops = &dt_obj->do_index_ops->dio_it;
1591         di = iops->init(env, dt_obj, lfsck->ml_args_dir, BYPASS_CAPA);
1592         if (IS_ERR(di))
1593                 GOTO(out, rc = PTR_ERR(di));
1594
1595         rc = iops->load(env, di, pos->lp_dir_cookie);
1596         if (rc == 0)
1597                 rc = iops->next(env, di);
1598         else if (rc > 0)
1599                 rc = 0;
1600
1601         if (rc != 0) {
1602                 iops->put(env, di);
1603                 iops->fini(env, di);
1604                 GOTO(out, rc);
1605         }
1606
1607         lfsck->ml_obj_dir = dt_obj;
1608         spin_lock(&lfsck->ml_lock);
1609         lfsck->ml_di_dir = di;
1610         spin_unlock(&lfsck->ml_lock);
1611         obj = NULL;
1612
1613         GOTO(out, rc = 0);
1614
1615 out:
1616         if (obj != NULL)
1617                 mdd_object_put(env, obj);
1618
1619         if (rc != 0)
1620                 return (rc > 0 ? 0 : rc);
1621
1622         mdd_lfsck_pos_fill(env, lfsck, &lfsck->ml_pos_current, false, false);
1623         cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
1624                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
1625                 if (rc != 0)
1626                         break;
1627         }
1628
1629         lfsck->ml_time_last_checkpoint = cfs_time_current();
1630         lfsck->ml_time_next_checkpoint = lfsck->ml_time_last_checkpoint +
1631                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1632         return rc;
1633 }
1634
1635 static int mdd_lfsck_exec_oit(const struct lu_env *env, struct md_lfsck *lfsck,
1636                               struct mdd_object *obj)
1637 {
1638         struct lfsck_component *com;
1639         struct dt_object       *dt_obj;
1640         const struct dt_it_ops *iops;
1641         struct dt_it           *di;
1642         int                     rc;
1643         ENTRY;
1644
1645         LASSERT(lfsck->ml_obj_dir == NULL);
1646
1647         cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
1648                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
1649                 if (rc != 0)
1650                         RETURN(rc);
1651         }
1652
1653         if (!S_ISDIR(mdd_object_type(obj)) ||
1654             cfs_list_empty(&lfsck->ml_list_dir))
1655                RETURN(0);
1656
1657         rc = object_is_client_visible(env, mdd_lfsck2mdd(lfsck), obj);
1658         if (rc <= 0)
1659                 GOTO(out, rc);
1660
1661         if (unlikely(mdd_is_dead_obj(obj)))
1662                 GOTO(out, rc = 0);
1663
1664         dt_obj = mdd_object_child(obj);
1665         if (unlikely(!dt_try_as_dir(env, dt_obj)))
1666                 GOTO(out, rc = -ENOTDIR);
1667
1668         iops = &dt_obj->do_index_ops->dio_it;
1669         di = iops->init(env, dt_obj, lfsck->ml_args_dir, BYPASS_CAPA);
1670         if (IS_ERR(di))
1671                 GOTO(out, rc = PTR_ERR(di));
1672
1673         rc = iops->load(env, di, 0);
1674         if (rc == 0)
1675                 rc = iops->next(env, di);
1676         else if (rc > 0)
1677                 rc = 0;
1678
1679         if (rc != 0) {
1680                 iops->put(env, di);
1681                 iops->fini(env, di);
1682                 GOTO(out, rc);
1683         }
1684
1685         mdd_object_get(obj);
1686         lfsck->ml_obj_dir = dt_obj;
1687         spin_lock(&lfsck->ml_lock);
1688         lfsck->ml_di_dir = di;
1689         spin_unlock(&lfsck->ml_lock);
1690
1691         GOTO(out, rc = 0);
1692
1693 out:
1694         if (rc < 0)
1695                 mdd_lfsck_fail(env, lfsck, false, false);
1696         return (rc > 0 ? 0 : rc);
1697 }
1698
1699 static int mdd_lfsck_exec_dir(const struct lu_env *env, struct md_lfsck *lfsck,
1700                               struct mdd_object *obj, struct lu_dirent *ent)
1701 {
1702         struct lfsck_component *com;
1703         int                     rc;
1704
1705         cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
1706                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
1707                 if (rc != 0)
1708                         return rc;
1709         }
1710         return 0;
1711 }
1712
1713 static int mdd_lfsck_post(const struct lu_env *env, struct md_lfsck *lfsck,
1714                           int result)
1715 {
1716         struct lfsck_component *com;
1717         struct lfsck_component *next;
1718         int                     rc;
1719
1720         mdd_lfsck_pos_fill(env, lfsck, &lfsck->ml_pos_current, true, true);
1721         cfs_list_for_each_entry_safe(com, next, &lfsck->ml_list_scan, lc_link) {
1722                 rc = com->lc_ops->lfsck_post(env, com, result);
1723                 if (rc != 0)
1724                         return rc;
1725         }
1726
1727         lfsck->ml_time_last_checkpoint = cfs_time_current();
1728         lfsck->ml_time_next_checkpoint = lfsck->ml_time_last_checkpoint +
1729                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1730         return result;
1731 }
1732
1733 static int mdd_lfsck_double_scan(const struct lu_env *env,
1734                                  struct md_lfsck *lfsck)
1735 {
1736         struct lfsck_component *com;
1737         struct lfsck_component *next;
1738         int                     rc;
1739
1740         cfs_list_for_each_entry_safe(com, next, &lfsck->ml_list_double_scan,
1741                                      lc_link) {
1742                 if (lfsck->ml_bookmark_ram.lb_param & LPF_DRYRUN)
1743                         com->lc_journal = 0;
1744
1745                 rc = com->lc_ops->lfsck_double_scan(env, com);
1746                 if (rc != 0)
1747                         return rc;
1748         }
1749         return 0;
1750 }
1751
1752 /* LFSCK engines */
1753
1754 static int mdd_lfsck_dir_engine(const struct lu_env *env,
1755                                 struct md_lfsck *lfsck)
1756 {
1757         struct mdd_thread_info  *info   = mdd_env_info(env);
1758         struct mdd_device       *mdd    = mdd_lfsck2mdd(lfsck);
1759         const struct dt_it_ops  *iops   =
1760                         &lfsck->ml_obj_dir->do_index_ops->dio_it;
1761         struct dt_it            *di     = lfsck->ml_di_dir;
1762         struct lu_dirent        *ent    = &info->mti_ent;
1763         struct lu_fid           *fid    = &info->mti_fid;
1764         struct lfsck_bookmark   *bk     = &lfsck->ml_bookmark_ram;
1765         struct ptlrpc_thread    *thread = &lfsck->ml_thread;
1766         int                      rc;
1767         ENTRY;
1768
1769         do {
1770                 struct mdd_object *child;
1771
1772                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
1773                     cfs_fail_val > 0) {
1774                         struct l_wait_info lwi;
1775
1776                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1777                                           NULL, NULL);
1778                         l_wait_event(thread->t_ctl_waitq,
1779                                      !thread_is_running(thread),
1780                                      &lwi);
1781                 }
1782
1783                 lfsck->ml_new_scanned++;
1784                 rc = iops->rec(env, di, (struct dt_rec *)ent,
1785                                lfsck->ml_args_dir);
1786                 if (rc != 0) {
1787                         mdd_lfsck_fail(env, lfsck, false, true);
1788                         if (bk->lb_param & LPF_FAILOUT)
1789                                 RETURN(rc);
1790                         else
1791                                 goto checkpoint;
1792                 }
1793
1794                 mdd_lfsck_unpack_ent(ent);
1795                 if (ent->lde_attrs & LUDA_IGNORE)
1796                         goto checkpoint;
1797
1798                 *fid = ent->lde_fid;
1799                 child = mdd_object_find(env, mdd, fid);
1800                 if (child == NULL) {
1801                         goto checkpoint;
1802                 } else if (IS_ERR(child)) {
1803                         mdd_lfsck_fail(env, lfsck, false, true);
1804                         if (bk->lb_param & LPF_FAILOUT)
1805                                 RETURN(PTR_ERR(child));
1806                         else
1807                                 goto checkpoint;
1808                 }
1809
1810                 /* XXX: need more processing for remote object in the future. */
1811                 if (mdd_object_exists(child) && !mdd_object_remote(child))
1812                         rc = mdd_lfsck_exec_dir(env, lfsck, child, ent);
1813                 mdd_object_put(env, child);
1814                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
1815                         RETURN(rc);
1816
1817 checkpoint:
1818                 rc = mdd_lfsck_checkpoint(env, lfsck, false);
1819                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
1820                         RETURN(rc);
1821
1822                 /* Rate control. */
1823                 mdd_lfsck_control_speed(lfsck);
1824                 if (unlikely(!thread_is_running(thread)))
1825                         RETURN(0);
1826
1827                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
1828                         spin_lock(&lfsck->ml_lock);
1829                         thread_set_flags(thread, SVC_STOPPING);
1830                         spin_unlock(&lfsck->ml_lock);
1831                         RETURN(-EINVAL);
1832                 }
1833
1834                 rc = iops->next(env, di);
1835         } while (rc == 0);
1836
1837         if (rc > 0 && !lfsck->ml_oit_over)
1838                 mdd_lfsck_close_dir(env, lfsck);
1839
1840         RETURN(rc);
1841 }
1842
1843 static int mdd_lfsck_oit_engine(const struct lu_env *env,
1844                                 struct md_lfsck *lfsck)
1845 {
1846         struct mdd_thread_info  *info   = mdd_env_info(env);
1847         struct mdd_device       *mdd    = mdd_lfsck2mdd(lfsck);
1848         const struct dt_it_ops  *iops   =
1849                                 &lfsck->ml_obj_oit->do_index_ops->dio_it;
1850         struct dt_it            *di     = lfsck->ml_di_oit;
1851         struct lu_fid           *fid    = &info->mti_fid;
1852         struct lfsck_bookmark   *bk     = &lfsck->ml_bookmark_ram;
1853         struct ptlrpc_thread    *thread = &lfsck->ml_thread;
1854         int                      rc;
1855         ENTRY;
1856
1857         do {
1858                 struct mdd_object *target;
1859
1860                 if (lfsck->ml_di_dir != NULL) {
1861                         rc = mdd_lfsck_dir_engine(env, lfsck);
1862                         if (rc <= 0)
1863                                 RETURN(rc);
1864                 }
1865
1866                 if (unlikely(lfsck->ml_oit_over))
1867                         RETURN(1);
1868
1869                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
1870                     cfs_fail_val > 0) {
1871                         struct l_wait_info lwi;
1872
1873                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1874                                           NULL, NULL);
1875                         l_wait_event(thread->t_ctl_waitq,
1876                                      !thread_is_running(thread),
1877                                      &lwi);
1878                 }
1879
1880                 lfsck->ml_new_scanned++;
1881                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
1882                 if (rc != 0) {
1883                         mdd_lfsck_fail(env, lfsck, true, true);
1884                         if (bk->lb_param & LPF_FAILOUT)
1885                                 RETURN(rc);
1886                         else
1887                                 goto checkpoint;
1888                 }
1889
1890                 target = mdd_object_find(env, mdd, fid);
1891                 if (target == NULL) {
1892                         goto checkpoint;
1893                 } else if (IS_ERR(target)) {
1894                         mdd_lfsck_fail(env, lfsck, true, true);
1895                         if (bk->lb_param & LPF_FAILOUT)
1896                                 RETURN(PTR_ERR(target));
1897                         else
1898                                 goto checkpoint;
1899                 }
1900
1901                 /* XXX: In fact, low layer otable-based iteration should not
1902                  *      return agent object. But before LU-2646 resolved, we
1903                  *      need more processing for agent object. */
1904                 if (mdd_object_exists(target) && !mdd_object_remote(target))
1905                         rc = mdd_lfsck_exec_oit(env, lfsck, target);
1906                 mdd_object_put(env, target);
1907                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
1908                         RETURN(rc);
1909
1910 checkpoint:
1911                 rc = mdd_lfsck_checkpoint(env, lfsck, true);
1912                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
1913                         RETURN(rc);
1914
1915                 /* Rate control. */
1916                 mdd_lfsck_control_speed(lfsck);
1917
1918                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
1919                         spin_lock(&lfsck->ml_lock);
1920                         thread_set_flags(thread, SVC_STOPPING);
1921                         spin_unlock(&lfsck->ml_lock);
1922                         RETURN(-EINVAL);
1923                 }
1924
1925                 rc = iops->next(env, di);
1926                 if (rc > 0)
1927                         lfsck->ml_oit_over = 1;
1928
1929                 if (unlikely(!thread_is_running(thread)))
1930                         RETURN(0);
1931         } while (rc == 0 || lfsck->ml_di_dir != NULL);
1932
1933         RETURN(rc);
1934 }
1935
1936 static int mdd_lfsck_main(void *args)
1937 {
1938         struct lu_env            env;
1939         struct md_lfsck         *lfsck    = (struct md_lfsck *)args;
1940         struct ptlrpc_thread    *thread   = &lfsck->ml_thread;
1941         struct dt_object        *oit_obj  = lfsck->ml_obj_oit;
1942         const struct dt_it_ops  *oit_iops = &oit_obj->do_index_ops->dio_it;
1943         struct dt_it            *oit_di;
1944         int                      rc;
1945         ENTRY;
1946
1947         cfs_daemonize("lfsck");
1948         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1949         if (rc != 0) {
1950                 CERROR("%s: LFSCK, fail to init env, rc = %d\n",
1951                        mdd_lfsck2name(lfsck), rc);
1952                 GOTO(noenv, rc);
1953         }
1954
1955         oit_di = oit_iops->init(&env, oit_obj, lfsck->ml_args_oit, BYPASS_CAPA);
1956         if (IS_ERR(oit_di)) {
1957                 rc = PTR_ERR(oit_di);
1958                 CERROR("%s: LFSCK, fail to init iteration, rc = %d\n",
1959                        mdd_lfsck2name(lfsck), rc);
1960                 GOTO(fini_env, rc);
1961         }
1962
1963         spin_lock(&lfsck->ml_lock);
1964         lfsck->ml_di_oit = oit_di;
1965         spin_unlock(&lfsck->ml_lock);
1966         rc = mdd_lfsck_prep(&env, lfsck);
1967         if (rc != 0)
1968                 GOTO(fini_oit, rc);
1969
1970         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = 0x%x, dir_flags = 0x%x, "
1971                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
1972                ", pid = %d\n", lfsck->ml_args_oit, lfsck->ml_args_dir,
1973                lfsck->ml_pos_current.lp_oit_cookie,
1974                lfsck->ml_pos_current.lp_dir_cookie,
1975                PFID(&lfsck->ml_pos_current.lp_dir_parent),
1976                cfs_curproc_pid());
1977
1978         spin_lock(&lfsck->ml_lock);
1979         thread_set_flags(thread, SVC_RUNNING);
1980         spin_unlock(&lfsck->ml_lock);
1981         cfs_waitq_broadcast(&thread->t_ctl_waitq);
1982
1983         if (!cfs_list_empty(&lfsck->ml_list_scan) ||
1984             cfs_list_empty(&lfsck->ml_list_double_scan))
1985                 rc = mdd_lfsck_oit_engine(&env, lfsck);
1986         else
1987                 rc = 1;
1988
1989         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = 0x%x, dir_flags = 0x%x, "
1990                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
1991                ", pid = %d, rc = %d\n", lfsck->ml_args_oit, lfsck->ml_args_dir,
1992                lfsck->ml_pos_current.lp_oit_cookie,
1993                lfsck->ml_pos_current.lp_dir_cookie,
1994                PFID(&lfsck->ml_pos_current.lp_dir_parent),
1995                cfs_curproc_pid(), rc);
1996
1997         if (lfsck->ml_paused && cfs_list_empty(&lfsck->ml_list_scan))
1998                 oit_iops->put(&env, oit_di);
1999
2000         rc = mdd_lfsck_post(&env, lfsck, rc);
2001         if (lfsck->ml_di_dir != NULL)
2002                 mdd_lfsck_close_dir(&env, lfsck);
2003
2004 fini_oit:
2005         spin_lock(&lfsck->ml_lock);
2006         lfsck->ml_di_oit = NULL;
2007         spin_unlock(&lfsck->ml_lock);
2008
2009         oit_iops->fini(&env, oit_di);
2010         if (rc == 1) {
2011                 if (!cfs_list_empty(&lfsck->ml_list_double_scan))
2012                         rc = mdd_lfsck_double_scan(&env, lfsck);
2013                 else
2014                         rc = 0;
2015         }
2016
2017         /* XXX: Purge the pinned objects in the future. */
2018
2019 fini_env:
2020         lu_env_fini(&env);
2021
2022 noenv:
2023         spin_lock(&lfsck->ml_lock);
2024         thread_set_flags(thread, SVC_STOPPED);
2025         cfs_waitq_broadcast(&thread->t_ctl_waitq);
2026         spin_unlock(&lfsck->ml_lock);
2027         return rc;
2028 }
2029
2030 /* external interfaces */
2031
2032 int mdd_lfsck_set_speed(const struct lu_env *env, struct md_lfsck *lfsck,
2033                         __u32 limit)
2034 {
2035         int rc;
2036
2037         mutex_lock(&lfsck->ml_mutex);
2038         __mdd_lfsck_set_speed(lfsck, limit);
2039         rc = mdd_lfsck_bookmark_store(env, lfsck);
2040         mutex_unlock(&lfsck->ml_mutex);
2041         return rc;
2042 }
2043
2044 int mdd_lfsck_dump(const struct lu_env *env, struct md_lfsck *lfsck,
2045                    __u16 type, char *buf, int len)
2046 {
2047         struct lfsck_component *com;
2048         int                     rc;
2049
2050         if (!lfsck->ml_initialized)
2051                 return -ENODEV;
2052
2053         com = mdd_lfsck_component_find(lfsck, type);
2054         if (com == NULL)
2055                 return -ENOTSUPP;
2056
2057         rc = com->lc_ops->lfsck_dump(env, com, buf, len);
2058         mdd_lfsck_component_put(env, com);
2059         return rc;
2060 }
2061
2062 int mdd_lfsck_start(const struct lu_env *env, struct md_lfsck *lfsck,
2063                     struct lfsck_start *start)
2064 {
2065         struct lfsck_bookmark  *bk     = &lfsck->ml_bookmark_ram;
2066         struct ptlrpc_thread   *thread = &lfsck->ml_thread;
2067         struct lfsck_component *com;
2068         struct l_wait_info      lwi    = { 0 };
2069         bool                    dirty  = false;
2070         int                     rc     = 0;
2071         __u16                   valid  = 0;
2072         __u16                   flags  = 0;
2073         ENTRY;
2074
2075         if (lfsck->ml_obj_oit == NULL)
2076                 RETURN(-ENOTSUPP);
2077
2078         /* start == NULL means auto trigger paused LFSCK. */
2079         if (start == NULL && cfs_list_empty(&lfsck->ml_list_scan))
2080                 RETURN(0);
2081
2082         mutex_lock(&lfsck->ml_mutex);
2083         spin_lock(&lfsck->ml_lock);
2084         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2085                 spin_unlock(&lfsck->ml_lock);
2086                 mutex_unlock(&lfsck->ml_mutex);
2087                 RETURN(-EALREADY);
2088         }
2089
2090         spin_unlock(&lfsck->ml_lock);
2091
2092         lfsck->ml_paused = 0;
2093         lfsck->ml_oit_over = 0;
2094         lfsck->ml_drop_dryrun = 0;
2095         lfsck->ml_new_scanned = 0;
2096
2097         /* For auto trigger. */
2098         if (start == NULL)
2099                 goto trigger;
2100
2101         start->ls_version = bk->lb_version;
2102         if (start->ls_valid & LSV_SPEED_LIMIT) {
2103                 __mdd_lfsck_set_speed(lfsck, start->ls_speed_limit);
2104                 dirty = true;
2105         }
2106
2107         if (start->ls_valid & LSV_ERROR_HANDLE) {
2108                 valid |= DOIV_ERROR_HANDLE;
2109                 if (start->ls_flags & LPF_FAILOUT)
2110                         flags |= DOIF_FAILOUT;
2111
2112                 if ((start->ls_flags & LPF_FAILOUT) &&
2113                     !(bk->lb_param & LPF_FAILOUT)) {
2114                         bk->lb_param |= LPF_FAILOUT;
2115                         dirty = true;
2116                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
2117                            (bk->lb_param & LPF_FAILOUT)) {
2118                         bk->lb_param &= ~LPF_FAILOUT;
2119                         dirty = true;
2120                 }
2121         }
2122
2123         if (start->ls_valid & LSV_DRYRUN) {
2124                 if ((start->ls_flags & LPF_DRYRUN) &&
2125                     !(bk->lb_param & LPF_DRYRUN)) {
2126                         bk->lb_param |= LPF_DRYRUN;
2127                         dirty = true;
2128                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
2129                            (bk->lb_param & LPF_DRYRUN)) {
2130                         bk->lb_param &= ~LPF_DRYRUN;
2131                         lfsck->ml_drop_dryrun = 1;
2132                         dirty = true;
2133                 }
2134         }
2135
2136         if (dirty) {
2137                 rc = mdd_lfsck_bookmark_store(env, lfsck);
2138                 if (rc != 0)
2139                         GOTO(out, rc);
2140         }
2141
2142         if (start->ls_flags & LPF_RESET)
2143                 flags |= DOIF_RESET;
2144
2145         if (start->ls_active != 0) {
2146                 struct lfsck_component *next;
2147                 __u16 type = 1;
2148
2149                 if (start->ls_active == LFSCK_TYPES_ALL)
2150                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2151
2152                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2153                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2154                         GOTO(out, rc = -ENOTSUPP);
2155                 }
2156
2157                 cfs_list_for_each_entry_safe(com, next,
2158                                              &lfsck->ml_list_scan, lc_link) {
2159                         if (!(com->lc_type & start->ls_active)) {
2160                                 rc = com->lc_ops->lfsck_post(env, com, 0);
2161                                 if (rc != 0)
2162                                         GOTO(out, rc);
2163                         }
2164                 }
2165
2166                 while (start->ls_active != 0) {
2167                         if (type & start->ls_active) {
2168                                 com = __mdd_lfsck_component_find(lfsck, type,
2169                                                         &lfsck->ml_list_idle);
2170                                 if (com != NULL) {
2171                                         /* The component status will be updated
2172                                          * when its prep() is called later by
2173                                          * the LFSCK main engine. */
2174                                         cfs_list_del_init(&com->lc_link);
2175                                         cfs_list_add_tail(&com->lc_link,
2176                                                           &lfsck->ml_list_scan);
2177                                 }
2178                                 start->ls_active &= ~type;
2179                         }
2180                         type <<= 1;
2181                 }
2182         }
2183
2184         cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
2185                 start->ls_active |= com->lc_type;
2186                 if (flags & DOIF_RESET) {
2187                         rc = com->lc_ops->lfsck_reset(env, com, false);
2188                         if (rc != 0)
2189                                 GOTO(out, rc);
2190                 }
2191         }
2192
2193 trigger:
2194         lfsck->ml_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
2195         if (bk->lb_param & LPF_DRYRUN)
2196                 lfsck->ml_args_dir |= LUDA_VERIFY_DRYRUN;
2197
2198         if (bk->lb_param & LPF_FAILOUT) {
2199                 valid |= DOIV_ERROR_HANDLE;
2200                 flags |= DOIF_FAILOUT;
2201         }
2202
2203         if (!cfs_list_empty(&lfsck->ml_list_scan))
2204                 flags |= DOIF_OUTUSED;
2205
2206         lfsck->ml_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2207         thread_set_flags(thread, 0);
2208         rc = cfs_create_thread(mdd_lfsck_main, lfsck, 0);
2209         if (rc < 0)
2210                 CERROR("%s: cannot start LFSCK thread, rc = %d\n",
2211                        mdd_lfsck2name(lfsck), rc);
2212         else
2213                 l_wait_event(thread->t_ctl_waitq,
2214                              thread_is_running(thread) ||
2215                              thread_is_stopped(thread),
2216                              &lwi);
2217
2218         GOTO(out, rc = 0);
2219
2220 out:
2221         mutex_unlock(&lfsck->ml_mutex);
2222         return (rc < 0 ? rc : 0);
2223 }
2224
2225 int mdd_lfsck_stop(const struct lu_env *env, struct md_lfsck *lfsck,
2226                    bool pause)
2227 {
2228         struct ptlrpc_thread *thread = &lfsck->ml_thread;
2229         struct l_wait_info    lwi    = { 0 };
2230         ENTRY;
2231
2232         mutex_lock(&lfsck->ml_mutex);
2233         spin_lock(&lfsck->ml_lock);
2234         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2235                 spin_unlock(&lfsck->ml_lock);
2236                 mutex_unlock(&lfsck->ml_mutex);
2237                 RETURN(-EALREADY);
2238         }
2239
2240         if (pause)
2241                 lfsck->ml_paused = 1;
2242         thread_set_flags(thread, SVC_STOPPING);
2243         /* The LFSCK thread may be sleeping on low layer wait queue,
2244          * wake it up. */
2245         if (likely(lfsck->ml_di_oit != NULL))
2246                 lfsck->ml_obj_oit->do_index_ops->dio_it.put(env,
2247                                                             lfsck->ml_di_oit);
2248         spin_unlock(&lfsck->ml_lock);
2249
2250         cfs_waitq_broadcast(&thread->t_ctl_waitq);
2251         l_wait_event(thread->t_ctl_waitq,
2252                      thread_is_stopped(thread),
2253                      &lwi);
2254         mutex_unlock(&lfsck->ml_mutex);
2255
2256         RETURN(0);
2257 }
2258
2259 static const struct lu_fid lfsck_it_fid = { .f_seq = FID_SEQ_LOCAL_FILE,
2260                                             .f_oid = OTABLE_IT_OID,
2261                                             .f_ver = 0 };
2262
2263 int mdd_lfsck_setup(const struct lu_env *env, struct mdd_device *mdd)
2264 {
2265         struct md_lfsck  *lfsck = &mdd->mdd_lfsck;
2266         struct dt_object *obj;
2267         int               rc;
2268         ENTRY;
2269
2270         LASSERT(!lfsck->ml_initialized);
2271
2272         lfsck->ml_initialized = 1;
2273         mutex_init(&lfsck->ml_mutex);
2274         spin_lock_init(&lfsck->ml_lock);
2275         CFS_INIT_LIST_HEAD(&lfsck->ml_list_scan);
2276         CFS_INIT_LIST_HEAD(&lfsck->ml_list_dir);
2277         CFS_INIT_LIST_HEAD(&lfsck->ml_list_double_scan);
2278         CFS_INIT_LIST_HEAD(&lfsck->ml_list_idle);
2279         cfs_waitq_init(&lfsck->ml_thread.t_ctl_waitq);
2280
2281         obj = dt_locate(env, mdd->mdd_bottom, &lfsck_it_fid);
2282         if (IS_ERR(obj))
2283                 RETURN(PTR_ERR(obj));
2284
2285         lfsck->ml_obj_oit = obj;
2286         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2287         if (rc != 0) {
2288                 if (rc == -ENOTSUPP)
2289                         rc = 0;
2290
2291                 RETURN(rc);
2292         }
2293
2294         obj = dt_store_open(env, mdd->mdd_bottom, "", lfsck_bookmark_name,
2295                             &mdd_env_info(env)->mti_fid);
2296         if (IS_ERR(obj))
2297                 RETURN(PTR_ERR(obj));
2298
2299         lfsck->ml_bookmark_obj = obj;
2300         rc = mdd_lfsck_bookmark_load(env, lfsck);
2301         if (rc == -ENODATA)
2302                 rc = mdd_lfsck_bookmark_init(env, lfsck);
2303         if (rc != 0)
2304                 RETURN(rc);
2305
2306         rc = mdd_lfsck_namespace_setup(env, lfsck);
2307         /* XXX: LFSCK components initialization to be added here. */
2308
2309         RETURN(rc);
2310 }
2311
2312 void mdd_lfsck_cleanup(const struct lu_env *env, struct mdd_device *mdd)
2313 {
2314         struct md_lfsck         *lfsck  = &mdd->mdd_lfsck;
2315         struct ptlrpc_thread    *thread = &lfsck->ml_thread;
2316         struct lfsck_component  *com;
2317
2318         if (!lfsck->ml_initialized)
2319                 return;
2320
2321         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
2322
2323         if (lfsck->ml_obj_oit != NULL) {
2324                 lu_object_put(env, &lfsck->ml_obj_oit->do_lu);
2325                 lfsck->ml_obj_oit = NULL;
2326         }
2327
2328         LASSERT(lfsck->ml_obj_dir == NULL);
2329
2330         if (lfsck->ml_bookmark_obj != NULL) {
2331                 lu_object_put(env, &lfsck->ml_bookmark_obj->do_lu);
2332                 lfsck->ml_bookmark_obj = NULL;
2333         }
2334
2335         while (!cfs_list_empty(&lfsck->ml_list_scan)) {
2336                 com = cfs_list_entry(lfsck->ml_list_scan.next,
2337                                      struct lfsck_component,
2338                                      lc_link);
2339                 mdd_lfsck_component_cleanup(env, com);
2340         }
2341
2342         LASSERT(cfs_list_empty(&lfsck->ml_list_dir));
2343
2344         while (!cfs_list_empty(&lfsck->ml_list_double_scan)) {
2345                 com = cfs_list_entry(lfsck->ml_list_double_scan.next,
2346                                      struct lfsck_component,
2347                                      lc_link);
2348                 mdd_lfsck_component_cleanup(env, com);
2349         }
2350
2351         while (!cfs_list_empty(&lfsck->ml_list_idle)) {
2352                 com = cfs_list_entry(lfsck->ml_list_idle.next,
2353                                      struct lfsck_component,
2354                                      lc_link);
2355                 mdd_lfsck_component_cleanup(env, com);
2356         }
2357 }