Whamcloud - gitweb
LU-1267 lfsck: framework (2) for MDT-OST consistency
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         OBD_FREE_PTR(info);
55 }
56
57 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
58 LU_KEY_INIT_GENERIC(lfsck);
59
60 static CFS_LIST_HEAD(lfsck_instance_list);
61 static struct list_head lfsck_ost_orphan_list;
62 static struct list_head lfsck_mdt_orphan_list;
63 static DEFINE_SPINLOCK(lfsck_instance_lock);
64
65 static const char *lfsck_status_names[] = {
66         [LS_INIT]               = "init",
67         [LS_SCANNING_PHASE1]    = "scanning-phase1",
68         [LS_SCANNING_PHASE2]    = "scanning-phase2",
69         [LS_COMPLETED]          = "completed",
70         [LS_FAILED]             = "failed",
71         [LS_STOPPED]            = "stopped",
72         [LS_PAUSED]             = "paused",
73         [LS_CRASHED]            = "crashed",
74         [LS_PARTIAL]            = "partial"
75 };
76
77 const char *lfsck_flags_names[] = {
78         "scanned-once",
79         "inconsistent",
80         "upgrade",
81         "incomplete",
82         "crashed_lastid",
83         NULL
84 };
85
86 const char *lfsck_param_names[] = {
87         NULL,
88         "failout",
89         "dryrun",
90         NULL
91 };
92
93 const char *lfsck_status2names(enum lfsck_status status)
94 {
95         if (unlikely(status < 0 || status >= LS_MAX))
96                 return "unknown";
97
98         return lfsck_status_names[status];
99 }
100
101 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
102 {
103         spin_lock_init(&ltds->ltd_lock);
104         init_rwsem(&ltds->ltd_rw_sem);
105         INIT_LIST_HEAD(&ltds->ltd_orphan);
106         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
107         if (ltds->ltd_tgts_bitmap == NULL)
108                 return -ENOMEM;
109
110         return 0;
111 }
112
113 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
114 {
115         struct lfsck_tgt_desc   *ltd;
116         struct lfsck_tgt_desc   *next;
117         int                      idx;
118
119         down_write(&ltds->ltd_rw_sem);
120
121         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
122                                  ltd_orphan_list) {
123                 list_del_init(&ltd->ltd_orphan_list);
124                 lfsck_tgt_put(ltd);
125         }
126
127         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
128                 up_write(&ltds->ltd_rw_sem);
129
130                 return;
131         }
132
133         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
134                 ltd = LTD_TGT(ltds, idx);
135                 if (likely(ltd != NULL)) {
136                         LASSERT(list_empty(&ltd->ltd_layout_list));
137
138                         ltds->ltd_tgtnr--;
139                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
140                         LTD_TGT(ltds, idx) = NULL;
141                         lfsck_tgt_put(ltd);
142                 }
143         }
144
145         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
146                  ltds->ltd_tgtnr);
147
148         for (idx = 0; idx < TGT_PTRS; idx++) {
149                 if (ltds->ltd_tgts_idx[idx] != NULL) {
150                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
151                         ltds->ltd_tgts_idx[idx] = NULL;
152                 }
153         }
154
155         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
156         ltds->ltd_tgts_bitmap = NULL;
157         up_write(&ltds->ltd_rw_sem);
158 }
159
160 static int __lfsck_add_target(const struct lu_env *env,
161                               struct lfsck_instance *lfsck,
162                               struct lfsck_tgt_desc *ltd,
163                               bool for_ost, bool locked)
164 {
165         struct lfsck_tgt_descs *ltds;
166         __u32                   index = ltd->ltd_index;
167         int                     rc    = 0;
168         ENTRY;
169
170         if (for_ost)
171                 ltds = &lfsck->li_ost_descs;
172         else
173                 ltds = &lfsck->li_mdt_descs;
174
175         if (!locked)
176                 down_write(&ltds->ltd_rw_sem);
177
178         LASSERT(ltds->ltd_tgts_bitmap != NULL);
179
180         if (index >= ltds->ltd_tgts_bitmap->size) {
181                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
182                                     (__u32)BITS_PER_LONG);
183                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
184                 cfs_bitmap_t *new_bitmap;
185
186                 while (newsize < index + 1)
187                         newsize <<= 1;
188
189                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
190                 if (new_bitmap == NULL)
191                         GOTO(unlock, rc = -ENOMEM);
192
193                 if (ltds->ltd_tgtnr > 0)
194                         cfs_bitmap_copy(new_bitmap, old_bitmap);
195                 ltds->ltd_tgts_bitmap = new_bitmap;
196                 CFS_FREE_BITMAP(old_bitmap);
197         }
198
199         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
200                 CERROR("%s: the device %s (%u) is registered already\n",
201                        lfsck_lfsck2name(lfsck),
202                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
203                 GOTO(unlock, rc = -EEXIST);
204         }
205
206         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
207                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
208                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
209                         GOTO(unlock, rc = -ENOMEM);
210         }
211
212         LTD_TGT(ltds, index) = ltd;
213         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
214         ltds->ltd_tgtnr++;
215
216         GOTO(unlock, rc = 0);
217
218 unlock:
219         if (!locked)
220                 up_write(&ltds->ltd_rw_sem);
221
222         return rc;
223 }
224
225 static int lfsck_add_target_from_orphan(const struct lu_env *env,
226                                         struct lfsck_instance *lfsck)
227 {
228         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
229         struct lfsck_tgt_desc   *ltd;
230         struct lfsck_tgt_desc   *next;
231         struct list_head        *head    = &lfsck_ost_orphan_list;
232         int                      rc;
233         bool                     for_ost = true;
234
235 again:
236         spin_lock(&lfsck_instance_lock);
237         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
238                 if (ltd->ltd_key == lfsck->li_bottom) {
239                         list_del_init(&ltd->ltd_orphan_list);
240                         list_add_tail(&ltd->ltd_orphan_list,
241                                       &ltds->ltd_orphan);
242                 }
243         }
244         spin_unlock(&lfsck_instance_lock);
245
246         down_write(&ltds->ltd_rw_sem);
247         while (!list_empty(&ltds->ltd_orphan)) {
248                 ltd = list_entry(ltds->ltd_orphan.next,
249                                  struct lfsck_tgt_desc,
250                                  ltd_orphan_list);
251                 list_del_init(&ltd->ltd_orphan_list);
252                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
253                 /* Do not hold the semaphore for too long time. */
254                 up_write(&ltds->ltd_rw_sem);
255                 if (rc != 0)
256                         return rc;
257
258                 down_write(&ltds->ltd_rw_sem);
259         }
260         up_write(&ltds->ltd_rw_sem);
261
262         if (for_ost) {
263                 ltds = &lfsck->li_mdt_descs;
264                 head = &lfsck_mdt_orphan_list;
265                 for_ost = false;
266                 goto again;
267         }
268
269         return 0;
270 }
271
272 static inline struct lfsck_component *
273 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
274 {
275         struct lfsck_component *com;
276
277         cfs_list_for_each_entry(com, list, lc_link) {
278                 if (com->lc_type == type)
279                         return com;
280         }
281         return NULL;
282 }
283
284 static struct lfsck_component *
285 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
286 {
287         struct lfsck_component *com;
288
289         spin_lock(&lfsck->li_lock);
290         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
291         if (com != NULL)
292                 goto unlock;
293
294         com = __lfsck_component_find(lfsck, type,
295                                      &lfsck->li_list_double_scan);
296         if (com != NULL)
297                 goto unlock;
298
299         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
300
301 unlock:
302         if (com != NULL)
303                 lfsck_component_get(com);
304         spin_unlock(&lfsck->li_lock);
305         return com;
306 }
307
308 void lfsck_component_cleanup(const struct lu_env *env,
309                              struct lfsck_component *com)
310 {
311         if (!cfs_list_empty(&com->lc_link))
312                 cfs_list_del_init(&com->lc_link);
313         if (!cfs_list_empty(&com->lc_link_dir))
314                 cfs_list_del_init(&com->lc_link_dir);
315
316         lfsck_component_put(env, com);
317 }
318
319 void lfsck_instance_cleanup(const struct lu_env *env,
320                             struct lfsck_instance *lfsck)
321 {
322         struct ptlrpc_thread    *thread = &lfsck->li_thread;
323         struct lfsck_component  *com;
324         ENTRY;
325
326         LASSERT(list_empty(&lfsck->li_link));
327         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
328
329         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
330         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
331
332         if (lfsck->li_obj_oit != NULL) {
333                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
334                 lfsck->li_obj_oit = NULL;
335         }
336
337         LASSERT(lfsck->li_obj_dir == NULL);
338
339         while (!cfs_list_empty(&lfsck->li_list_scan)) {
340                 com = cfs_list_entry(lfsck->li_list_scan.next,
341                                      struct lfsck_component,
342                                      lc_link);
343                 lfsck_component_cleanup(env, com);
344         }
345
346         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
347
348         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
349                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
350                                      struct lfsck_component,
351                                      lc_link);
352                 lfsck_component_cleanup(env, com);
353         }
354
355         while (!cfs_list_empty(&lfsck->li_list_idle)) {
356                 com = cfs_list_entry(lfsck->li_list_idle.next,
357                                      struct lfsck_component,
358                                      lc_link);
359                 lfsck_component_cleanup(env, com);
360         }
361
362         if (lfsck->li_bookmark_obj != NULL) {
363                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
364                 lfsck->li_bookmark_obj = NULL;
365         }
366
367         if (lfsck->li_los != NULL) {
368                 local_oid_storage_fini(env, lfsck->li_los);
369                 lfsck->li_los = NULL;
370         }
371
372         OBD_FREE_PTR(lfsck);
373 }
374
375 static inline struct lfsck_instance *
376 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
377 {
378         struct lfsck_instance *lfsck;
379
380         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
381                 if (lfsck->li_bottom == key) {
382                         if (ref)
383                                 lfsck_instance_get(lfsck);
384                         if (unlink)
385                                 list_del_init(&lfsck->li_link);
386
387                         return lfsck;
388                 }
389         }
390
391         return NULL;
392 }
393
394 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
395                                                          bool ref, bool unlink)
396 {
397         struct lfsck_instance *lfsck;
398
399         spin_lock(&lfsck_instance_lock);
400         lfsck = __lfsck_instance_find(key, ref, unlink);
401         spin_unlock(&lfsck_instance_lock);
402
403         return lfsck;
404 }
405
406 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
407 {
408         struct lfsck_instance *tmp;
409
410         spin_lock(&lfsck_instance_lock);
411         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
412                 if (lfsck->li_bottom == tmp->li_bottom) {
413                         spin_unlock(&lfsck_instance_lock);
414                         return -EEXIST;
415                 }
416         }
417
418         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
419         spin_unlock(&lfsck_instance_lock);
420         return 0;
421 }
422
423 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
424                     const char *prefix)
425 {
426         int save = *len;
427         int flag;
428         int rc;
429         int i;
430
431         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
432         if (rc <= 0)
433                 return -ENOSPC;
434
435         *buf += rc;
436         *len -= rc;
437         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
438                 if (flag & bits) {
439                         bits &= ~flag;
440                         if (names[i] != NULL) {
441                                 rc = snprintf(*buf, *len, "%s%c", names[i],
442                                               bits != 0 ? ',' : '\n');
443                                 if (rc <= 0)
444                                         return -ENOSPC;
445
446                                 *buf += rc;
447                                 *len -= rc;
448                         }
449                 }
450         }
451         return save - *len;
452 }
453
454 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
455 {
456         int rc;
457
458         if (time != 0)
459                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
460                               cfs_time_current_sec() - time);
461         else
462                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
463         if (rc <= 0)
464                 return -ENOSPC;
465
466         *buf += rc;
467         *len -= rc;
468         return rc;
469 }
470
471 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
472                    const char *prefix)
473 {
474         int rc;
475
476         if (fid_is_zero(&pos->lp_dir_parent)) {
477                 if (pos->lp_oit_cookie == 0)
478                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
479                                       prefix);
480                 else
481                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
482                                       prefix, pos->lp_oit_cookie);
483         } else {
484                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
485                               prefix, pos->lp_oit_cookie,
486                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
487         }
488         if (rc <= 0)
489                 return -ENOSPC;
490
491         *buf += rc;
492         *len -= rc;
493         return rc;
494 }
495
496 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
497                     struct lfsck_position *pos, bool init)
498 {
499         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
500
501         if (unlikely(lfsck->li_di_oit == NULL)) {
502                 memset(pos, 0, sizeof(*pos));
503                 return;
504         }
505
506         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
507         if (!lfsck->li_current_oit_processed && !init)
508                 pos->lp_oit_cookie--;
509
510         LASSERT(pos->lp_oit_cookie > 0);
511
512         if (lfsck->li_di_dir != NULL) {
513                 struct dt_object *dto = lfsck->li_obj_dir;
514
515                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
516                                                         lfsck->li_di_dir);
517
518                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
519                         fid_zero(&pos->lp_dir_parent);
520                         pos->lp_dir_cookie = 0;
521                 } else {
522                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
523                 }
524         } else {
525                 fid_zero(&pos->lp_dir_parent);
526                 pos->lp_dir_cookie = 0;
527         }
528 }
529
530 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
531 {
532         lfsck->li_bookmark_ram.lb_speed_limit = limit;
533         if (limit != LFSCK_SPEED_NO_LIMIT) {
534                 if (limit > HZ) {
535                         lfsck->li_sleep_rate = limit / HZ;
536                         lfsck->li_sleep_jif = 1;
537                 } else {
538                         lfsck->li_sleep_rate = 1;
539                         lfsck->li_sleep_jif = HZ / limit;
540                 }
541         } else {
542                 lfsck->li_sleep_jif = 0;
543                 lfsck->li_sleep_rate = 0;
544         }
545 }
546
547 void lfsck_control_speed(struct lfsck_instance *lfsck)
548 {
549         struct ptlrpc_thread *thread = &lfsck->li_thread;
550         struct l_wait_info    lwi;
551
552         if (lfsck->li_sleep_jif > 0 &&
553             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
554                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
555                                        LWI_ON_SIGNAL_NOOP, NULL);
556
557                 l_wait_event(thread->t_ctl_waitq,
558                              !thread_is_running(thread),
559                              &lwi);
560                 lfsck->li_new_scanned = 0;
561         }
562 }
563
564 void lfsck_control_speed_by_self(struct lfsck_component *com)
565 {
566         struct lfsck_instance   *lfsck  = com->lc_lfsck;
567         struct ptlrpc_thread    *thread = &lfsck->li_thread;
568         struct l_wait_info       lwi;
569
570         if (lfsck->li_sleep_jif > 0 &&
571             com->lc_new_scanned >= lfsck->li_sleep_rate) {
572                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
573                                        LWI_ON_SIGNAL_NOOP, NULL);
574
575                 l_wait_event(thread->t_ctl_waitq,
576                              !thread_is_running(thread),
577                              &lwi);
578                 com->lc_new_scanned = 0;
579         }
580 }
581
582 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
583                             struct lu_fid *fid)
584 {
585         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
586                      !dt_try_as_dir(env, obj)))
587                 return -ENOTDIR;
588
589         return dt_lookup(env, obj, (struct dt_rec *)fid,
590                          (const struct dt_key *)"..", BYPASS_CAPA);
591 }
592
593 static int lfsck_needs_scan_dir(const struct lu_env *env,
594                                 struct lfsck_instance *lfsck,
595                                 struct dt_object *obj)
596 {
597         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
598         int            depth = 0;
599         int            rc;
600
601         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
602             cfs_list_empty(&lfsck->li_list_dir))
603                RETURN(0);
604
605         while (1) {
606                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
607                  *      which is the agent directory to manage the objects
608                  *      which name entries reside on remote MDTs. Related
609                  *      consistency verification will be processed in LFSCK
610                  *      phase III. */
611                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
612                         if (depth > 0)
613                                 lfsck_object_put(env, obj);
614                         return 1;
615                 }
616
617                 /* .lustre doesn't contain "real" user objects, no need lfsck */
618                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
619                         if (depth > 0)
620                                 lfsck_object_put(env, obj);
621                         return 0;
622                 }
623
624                 dt_read_lock(env, obj, MOR_TGT_CHILD);
625                 if (unlikely(lfsck_is_dead_obj(obj))) {
626                         dt_read_unlock(env, obj);
627                         if (depth > 0)
628                                 lfsck_object_put(env, obj);
629                         return 0;
630                 }
631
632                 rc = dt_xattr_get(env, obj,
633                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
634                                   BYPASS_CAPA);
635                 dt_read_unlock(env, obj);
636                 if (rc >= 0) {
637                         if (depth > 0)
638                                 lfsck_object_put(env, obj);
639                         return 1;
640                 }
641
642                 if (rc < 0 && rc != -ENODATA) {
643                         if (depth > 0)
644                                 lfsck_object_put(env, obj);
645                         return rc;
646                 }
647
648                 rc = lfsck_parent_fid(env, obj, fid);
649                 if (depth > 0)
650                         lfsck_object_put(env, obj);
651                 if (rc != 0)
652                         return rc;
653
654                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
655                         return 0;
656
657                 obj = lfsck_object_find(env, lfsck, fid);
658                 if (obj == NULL)
659                         return 0;
660                 else if (IS_ERR(obj))
661                         return PTR_ERR(obj);
662
663                 if (!dt_object_exists(obj)) {
664                         lfsck_object_put(env, obj);
665                         return 0;
666                 }
667
668                 /* Currently, only client visible directory can be remote. */
669                 if (dt_object_remote(obj)) {
670                         lfsck_object_put(env, obj);
671                         return 1;
672                 }
673
674                 depth++;
675         }
676         return 0;
677 }
678
679 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
680                                                  struct lfsck_component *com)
681 {
682         struct lfsck_thread_args *lta;
683         int                       rc;
684
685         OBD_ALLOC_PTR(lta);
686         if (lta == NULL)
687                 return ERR_PTR(-ENOMEM);
688
689         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
690         if (rc != 0) {
691                 OBD_FREE_PTR(lta);
692                 return ERR_PTR(rc);
693         }
694
695         lta->lta_lfsck = lfsck_instance_get(lfsck);
696         if (com != NULL)
697                 lta->lta_com = lfsck_component_get(com);
698
699         return lta;
700 }
701
702 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
703 {
704         if (lta->lta_com != NULL)
705                 lfsck_component_put(&lta->lta_env, lta->lta_com);
706         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
707         lu_env_fini(&lta->lta_env);
708         OBD_FREE_PTR(lta);
709 }
710
711 /* LFSCK wrap functions */
712
713 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
714                 bool new_checked)
715 {
716         struct lfsck_component *com;
717
718         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
719                 com->lc_ops->lfsck_fail(env, com, new_checked);
720         }
721 }
722
723 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
724 {
725         struct lfsck_component *com;
726         int                     rc  = 0;
727         int                     rc1 = 0;
728
729         if (likely(cfs_time_beforeq(cfs_time_current(),
730                                     lfsck->li_time_next_checkpoint)))
731                 return 0;
732
733         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
734         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
735                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
736                 if (rc != 0)
737                         rc1 = rc;
738         }
739
740         lfsck->li_time_last_checkpoint = cfs_time_current();
741         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
742                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
743         return rc1 != 0 ? rc1 : rc;
744 }
745
746 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
747 {
748         struct dt_object       *obj     = NULL;
749         struct lfsck_component *com;
750         struct lfsck_component *next;
751         struct lfsck_position  *pos     = NULL;
752         const struct dt_it_ops *iops    =
753                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
754         struct dt_it           *di;
755         int                     rc;
756         ENTRY;
757
758         LASSERT(lfsck->li_obj_dir == NULL);
759         LASSERT(lfsck->li_di_dir == NULL);
760
761         lfsck->li_current_oit_processed = 0;
762         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
763                 com->lc_new_checked = 0;
764                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
765                         com->lc_journal = 0;
766
767                 rc = com->lc_ops->lfsck_prep(env, com);
768                 if (rc != 0)
769                         GOTO(out, rc);
770
771                 if ((pos == NULL) ||
772                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
773                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
774                         pos = &com->lc_pos_start;
775         }
776
777         /* Init otable-based iterator. */
778         if (pos == NULL) {
779                 rc = iops->load(env, lfsck->li_di_oit, 0);
780                 if (rc > 0) {
781                         lfsck->li_oit_over = 1;
782                         rc = 0;
783                 }
784
785                 GOTO(out, rc);
786         }
787
788         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
789         if (rc < 0)
790                 GOTO(out, rc);
791         else if (rc > 0)
792                 lfsck->li_oit_over = 1;
793
794         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
795                 GOTO(out, rc = 0);
796
797         /* Find the directory for namespace-based traverse. */
798         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
799         if (obj == NULL)
800                 GOTO(out, rc = 0);
801         else if (IS_ERR(obj))
802                 RETURN(PTR_ERR(obj));
803
804         /* XXX: Currently, skip remote object, the consistency for
805          *      remote object will be processed in LFSCK phase III. */
806         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
807             unlikely(!S_ISDIR(lfsck_object_type(obj))))
808                 GOTO(out, rc = 0);
809
810         if (unlikely(!dt_try_as_dir(env, obj)))
811                 GOTO(out, rc = -ENOTDIR);
812
813         /* Init the namespace-based directory traverse. */
814         iops = &obj->do_index_ops->dio_it;
815         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
816         if (IS_ERR(di))
817                 GOTO(out, rc = PTR_ERR(di));
818
819         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
820
821         rc = iops->load(env, di, pos->lp_dir_cookie);
822         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
823                 rc = iops->next(env, di);
824         else if (rc > 0)
825                 rc = 0;
826
827         if (rc != 0) {
828                 iops->put(env, di);
829                 iops->fini(env, di);
830                 GOTO(out, rc);
831         }
832
833         lfsck->li_obj_dir = lfsck_object_get(obj);
834         lfsck->li_cookie_dir = iops->store(env, di);
835         spin_lock(&lfsck->li_lock);
836         lfsck->li_di_dir = di;
837         spin_unlock(&lfsck->li_lock);
838
839         GOTO(out, rc = 0);
840
841 out:
842         if (obj != NULL)
843                 lfsck_object_put(env, obj);
844
845         if (rc < 0) {
846                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
847                                              lc_link)
848                         com->lc_ops->lfsck_post(env, com, rc, true);
849
850                 return rc;
851         }
852
853         rc = 0;
854         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
855         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
856                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
857                 if (rc != 0)
858                         break;
859         }
860
861         lfsck->li_time_last_checkpoint = cfs_time_current();
862         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
863                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
864         return rc;
865 }
866
867 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
868                    struct dt_object *obj)
869 {
870         struct lfsck_component *com;
871         const struct dt_it_ops *iops;
872         struct dt_it           *di;
873         int                     rc;
874         ENTRY;
875
876         LASSERT(lfsck->li_obj_dir == NULL);
877
878         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
879                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
880                 if (rc != 0)
881                         RETURN(rc);
882         }
883
884         rc = lfsck_needs_scan_dir(env, lfsck, obj);
885         if (rc <= 0)
886                 GOTO(out, rc);
887
888         if (unlikely(!dt_try_as_dir(env, obj)))
889                 GOTO(out, rc = -ENOTDIR);
890
891         iops = &obj->do_index_ops->dio_it;
892         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
893         if (IS_ERR(di))
894                 GOTO(out, rc = PTR_ERR(di));
895
896         rc = iops->load(env, di, 0);
897         if (rc == 0)
898                 rc = iops->next(env, di);
899         else if (rc > 0)
900                 rc = 0;
901
902         if (rc != 0) {
903                 iops->put(env, di);
904                 iops->fini(env, di);
905                 GOTO(out, rc);
906         }
907
908         lfsck->li_obj_dir = lfsck_object_get(obj);
909         lfsck->li_cookie_dir = iops->store(env, di);
910         spin_lock(&lfsck->li_lock);
911         lfsck->li_di_dir = di;
912         spin_unlock(&lfsck->li_lock);
913
914         GOTO(out, rc = 0);
915
916 out:
917         if (rc < 0)
918                 lfsck_fail(env, lfsck, false);
919         return (rc > 0 ? 0 : rc);
920 }
921
922 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
923                    struct dt_object *obj, struct lu_dirent *ent)
924 {
925         struct lfsck_component *com;
926         int                     rc;
927
928         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
929                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
930                 if (rc != 0)
931                         return rc;
932         }
933         return 0;
934 }
935
936 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
937                int result)
938 {
939         struct lfsck_component *com;
940         struct lfsck_component *next;
941         int                     rc  = 0;
942         int                     rc1 = 0;
943
944         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
945         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
946                 rc = com->lc_ops->lfsck_post(env, com, result, false);
947                 if (rc != 0)
948                         rc1 = rc;
949         }
950
951         lfsck->li_time_last_checkpoint = cfs_time_current();
952         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
953                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
954
955         /* Ignore some component post failure to make other can go ahead. */
956         return result;
957 }
958
959 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
960 {
961         struct lfsck_component *com;
962         struct lfsck_component *next;
963         int                     rc;
964
965         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
966                                      lc_link) {
967                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
968                         com->lc_journal = 0;
969
970                 rc = com->lc_ops->lfsck_double_scan(env, com);
971                 if (rc != 0)
972                         return rc;
973         }
974         return 0;
975 }
976
977 /* external interfaces */
978
979 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
980 {
981         struct lu_env           env;
982         struct lfsck_instance  *lfsck;
983         int                     rc;
984         ENTRY;
985
986         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
987         if (rc != 0)
988                 RETURN(rc);
989
990         lfsck = lfsck_instance_find(key, true, false);
991         if (likely(lfsck != NULL)) {
992                 rc = snprintf(buf, len, "%u\n",
993                               lfsck->li_bookmark_ram.lb_speed_limit);
994                 lfsck_instance_put(&env, lfsck);
995         } else {
996                 rc = -ENODEV;
997         }
998
999         lu_env_fini(&env);
1000
1001         RETURN(rc);
1002 }
1003 EXPORT_SYMBOL(lfsck_get_speed);
1004
1005 int lfsck_set_speed(struct dt_device *key, int val)
1006 {
1007         struct lu_env           env;
1008         struct lfsck_instance  *lfsck;
1009         int                     rc;
1010         ENTRY;
1011
1012         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1013         if (rc != 0)
1014                 RETURN(rc);
1015
1016         lfsck = lfsck_instance_find(key, true, false);
1017         if (likely(lfsck != NULL)) {
1018                 mutex_lock(&lfsck->li_mutex);
1019                 __lfsck_set_speed(lfsck, val);
1020                 rc = lfsck_bookmark_store(&env, lfsck);
1021                 mutex_unlock(&lfsck->li_mutex);
1022                 lfsck_instance_put(&env, lfsck);
1023         } else {
1024                 rc = -ENODEV;
1025         }
1026
1027         lu_env_fini(&env);
1028
1029         RETURN(rc);
1030 }
1031 EXPORT_SYMBOL(lfsck_set_speed);
1032
1033 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
1034 {
1035         struct lu_env           env;
1036         struct lfsck_instance  *lfsck;
1037         struct lfsck_component *com;
1038         int                     rc;
1039         ENTRY;
1040
1041         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1042         if (rc != 0)
1043                 RETURN(rc);
1044
1045         lfsck = lfsck_instance_find(key, true, false);
1046         if (likely(lfsck != NULL)) {
1047                 com = lfsck_component_find(lfsck, type);
1048                 if (likely(com != NULL)) {
1049                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
1050                         lfsck_component_put(&env, com);
1051                 } else {
1052                         rc = -ENOTSUPP;
1053                 }
1054
1055                 lfsck_instance_put(&env, lfsck);
1056         } else {
1057                 rc = -ENODEV;
1058         }
1059
1060         lu_env_fini(&env);
1061
1062         RETURN(rc);
1063 }
1064 EXPORT_SYMBOL(lfsck_dump);
1065
1066 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1067                 struct lfsck_start_param *lsp)
1068 {
1069         struct lfsck_start              *start  = lsp->lsp_start;
1070         struct lfsck_instance           *lfsck;
1071         struct lfsck_bookmark           *bk;
1072         struct ptlrpc_thread            *thread;
1073         struct lfsck_component          *com;
1074         struct l_wait_info               lwi    = { 0 };
1075         struct lfsck_thread_args        *lta;
1076         bool                             dirty  = false;
1077         long                             rc     = 0;
1078         __u16                            valid  = 0;
1079         __u16                            flags  = 0;
1080         __u16                            type   = 1;
1081         ENTRY;
1082
1083         lfsck = lfsck_instance_find(key, true, false);
1084         if (unlikely(lfsck == NULL))
1085                 RETURN(-ENODEV);
1086
1087         /* start == NULL means auto trigger paused LFSCK. */
1088         if ((start == NULL) &&
1089             (cfs_list_empty(&lfsck->li_list_scan) ||
1090              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
1091                 GOTO(put, rc = 0);
1092
1093         bk = &lfsck->li_bookmark_ram;
1094         thread = &lfsck->li_thread;
1095         mutex_lock(&lfsck->li_mutex);
1096         spin_lock(&lfsck->li_lock);
1097         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1098                 rc = -EALREADY;
1099                 while (start->ls_active != 0) {
1100                         if (type & start->ls_active) {
1101                                 com = __lfsck_component_find(lfsck, type,
1102                                                         &lfsck->li_list_scan);
1103                                 if (com == NULL)
1104                                         com = __lfsck_component_find(lfsck,
1105                                                 type,
1106                                                 &lfsck->li_list_double_scan);
1107                                 if (com == NULL) {
1108                                         rc = -EBUSY;
1109                                         break;
1110                                 } else {
1111                                         start->ls_active &= ~type;
1112                                 }
1113                         }
1114                         type <<= 1;
1115                 }
1116                 spin_unlock(&lfsck->li_lock);
1117                 GOTO(out, rc);
1118         }
1119         spin_unlock(&lfsck->li_lock);
1120
1121         lfsck->li_namespace = lsp->lsp_namespace;
1122         lfsck->li_paused = 0;
1123         lfsck->li_oit_over = 0;
1124         lfsck->li_drop_dryrun = 0;
1125         lfsck->li_new_scanned = 0;
1126
1127         /* For auto trigger. */
1128         if (start == NULL)
1129                 goto trigger;
1130
1131         start->ls_version = bk->lb_version;
1132         if (start->ls_valid & LSV_SPEED_LIMIT) {
1133                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
1134                 dirty = true;
1135         }
1136
1137         if (start->ls_valid & LSV_ERROR_HANDLE) {
1138                 valid |= DOIV_ERROR_HANDLE;
1139                 if (start->ls_flags & LPF_FAILOUT)
1140                         flags |= DOIF_FAILOUT;
1141
1142                 if ((start->ls_flags & LPF_FAILOUT) &&
1143                     !(bk->lb_param & LPF_FAILOUT)) {
1144                         bk->lb_param |= LPF_FAILOUT;
1145                         dirty = true;
1146                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
1147                            (bk->lb_param & LPF_FAILOUT)) {
1148                         bk->lb_param &= ~LPF_FAILOUT;
1149                         dirty = true;
1150                 }
1151         }
1152
1153         if (start->ls_valid & LSV_DRYRUN) {
1154                 valid |= DOIV_DRYRUN;
1155                 if (start->ls_flags & LPF_DRYRUN)
1156                         flags |= DOIF_DRYRUN;
1157
1158                 if ((start->ls_flags & LPF_DRYRUN) &&
1159                     !(bk->lb_param & LPF_DRYRUN)) {
1160                         bk->lb_param |= LPF_DRYRUN;
1161                         dirty = true;
1162                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
1163                            (bk->lb_param & LPF_DRYRUN)) {
1164                         bk->lb_param &= ~LPF_DRYRUN;
1165                         lfsck->li_drop_dryrun = 1;
1166                         dirty = true;
1167                 }
1168         }
1169
1170         if (dirty) {
1171                 rc = lfsck_bookmark_store(env, lfsck);
1172                 if (rc != 0)
1173                         GOTO(out, rc);
1174         }
1175
1176         if (start->ls_flags & LPF_RESET)
1177                 flags |= DOIF_RESET;
1178
1179         if (start->ls_active != 0) {
1180                 struct lfsck_component *next;
1181
1182                 if (start->ls_active == LFSCK_TYPES_ALL)
1183                         start->ls_active = LFSCK_TYPES_SUPPORTED;
1184
1185                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
1186                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
1187                         GOTO(out, rc = -ENOTSUPP);
1188                 }
1189
1190                 cfs_list_for_each_entry_safe(com, next,
1191                                              &lfsck->li_list_scan, lc_link) {
1192                         if (!(com->lc_type & start->ls_active)) {
1193                                 rc = com->lc_ops->lfsck_post(env, com, 0,
1194                                                              false);
1195                                 if (rc != 0)
1196                                         GOTO(out, rc);
1197                         }
1198                 }
1199
1200                 while (start->ls_active != 0) {
1201                         if (type & start->ls_active) {
1202                                 com = __lfsck_component_find(lfsck, type,
1203                                                         &lfsck->li_list_idle);
1204                                 if (com != NULL) {
1205                                         /* The component status will be updated
1206                                          * when its prep() is called later by
1207                                          * the LFSCK main engine. */
1208                                         cfs_list_del_init(&com->lc_link);
1209                                         cfs_list_add_tail(&com->lc_link,
1210                                                           &lfsck->li_list_scan);
1211                                 }
1212                                 start->ls_active &= ~type;
1213                         }
1214                         type <<= 1;
1215                 }
1216         }
1217
1218         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1219                 start->ls_active |= com->lc_type;
1220                 if (flags & DOIF_RESET) {
1221                         rc = com->lc_ops->lfsck_reset(env, com, false);
1222                         if (rc != 0)
1223                                 GOTO(out, rc);
1224                 }
1225         }
1226
1227 trigger:
1228         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1229         if (bk->lb_param & LPF_DRYRUN) {
1230                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1231                 valid |= DOIV_DRYRUN;
1232                 flags |= DOIF_DRYRUN;
1233         }
1234
1235         if (bk->lb_param & LPF_FAILOUT) {
1236                 valid |= DOIV_ERROR_HANDLE;
1237                 flags |= DOIF_FAILOUT;
1238         }
1239
1240         if (!cfs_list_empty(&lfsck->li_list_scan))
1241                 flags |= DOIF_OUTUSED;
1242
1243         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1244         thread_set_flags(thread, 0);
1245         lta = lfsck_thread_args_init(lfsck, NULL);
1246         if (IS_ERR(lta))
1247                 GOTO(out, rc = PTR_ERR(lta));
1248
1249         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
1250         if (IS_ERR_VALUE(rc)) {
1251                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
1252                        lfsck_lfsck2name(lfsck), rc);
1253                 lfsck_thread_args_fini(lta);
1254         } else {
1255                 rc = 0;
1256                 l_wait_event(thread->t_ctl_waitq,
1257                              thread_is_running(thread) ||
1258                              thread_is_stopped(thread),
1259                              &lwi);
1260         }
1261
1262         GOTO(out, rc);
1263
1264 out:
1265         mutex_unlock(&lfsck->li_mutex);
1266 put:
1267         lfsck_instance_put(env, lfsck);
1268         return (rc < 0 ? rc : 0);
1269 }
1270 EXPORT_SYMBOL(lfsck_start);
1271
1272 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1273 {
1274         struct lfsck_instance   *lfsck;
1275         struct ptlrpc_thread    *thread;
1276         struct l_wait_info       lwi    = { 0 };
1277         ENTRY;
1278
1279         lfsck = lfsck_instance_find(key, true, false);
1280         if (unlikely(lfsck == NULL))
1281                 RETURN(-ENODEV);
1282
1283         thread = &lfsck->li_thread;
1284         mutex_lock(&lfsck->li_mutex);
1285         spin_lock(&lfsck->li_lock);
1286         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1287                 spin_unlock(&lfsck->li_lock);
1288                 mutex_unlock(&lfsck->li_mutex);
1289                 lfsck_instance_put(env, lfsck);
1290                 RETURN(-EALREADY);
1291         }
1292
1293         if (pause)
1294                 lfsck->li_paused = 1;
1295         thread_set_flags(thread, SVC_STOPPING);
1296         spin_unlock(&lfsck->li_lock);
1297
1298         wake_up_all(&thread->t_ctl_waitq);
1299         l_wait_event(thread->t_ctl_waitq,
1300                      thread_is_stopped(thread),
1301                      &lwi);
1302         mutex_unlock(&lfsck->li_mutex);
1303         lfsck_instance_put(env, lfsck);
1304
1305         RETURN(0);
1306 }
1307 EXPORT_SYMBOL(lfsck_stop);
1308
1309 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1310                    struct dt_device *next, lfsck_out_notify notify,
1311                    void *notify_data, bool master)
1312 {
1313         struct lfsck_instance   *lfsck;
1314         struct dt_object        *root  = NULL;
1315         struct dt_object        *obj;
1316         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1317         int                      rc;
1318         ENTRY;
1319
1320         lfsck = lfsck_instance_find(key, false, false);
1321         if (unlikely(lfsck != NULL))
1322                 RETURN(-EEXIST);
1323
1324         OBD_ALLOC_PTR(lfsck);
1325         if (lfsck == NULL)
1326                 RETURN(-ENOMEM);
1327
1328         mutex_init(&lfsck->li_mutex);
1329         spin_lock_init(&lfsck->li_lock);
1330         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1331         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1332         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1333         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1334         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1335         atomic_set(&lfsck->li_ref, 1);
1336         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
1337         lfsck->li_out_notify = notify;
1338         lfsck->li_out_notify_data = notify_data;
1339         lfsck->li_next = next;
1340         lfsck->li_bottom = key;
1341
1342         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
1343         if (rc != 0)
1344                 GOTO(out, rc);
1345
1346         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
1347         if (rc != 0)
1348                 GOTO(out, rc);
1349
1350         fid->f_seq = FID_SEQ_LOCAL_NAME;
1351         fid->f_oid = 1;
1352         fid->f_ver = 0;
1353         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1354         if (rc != 0)
1355                 GOTO(out, rc);
1356
1357         rc = dt_root_get(env, key, fid);
1358         if (rc != 0)
1359                 GOTO(out, rc);
1360
1361         root = dt_locate(env, lfsck->li_bottom, fid);
1362         if (IS_ERR(root))
1363                 GOTO(out, rc = PTR_ERR(root));
1364
1365         if (unlikely(!dt_try_as_dir(env, root)))
1366                 GOTO(out, rc = -ENOTDIR);
1367
1368         lfsck->li_local_root_fid = *fid;
1369         if (master) {
1370                 lfsck->li_master = 1;
1371                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1372                         rc = dt_lookup(env, root,
1373                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1374                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1375                         if (rc != 0)
1376                                 GOTO(out, rc);
1377                 }
1378         }
1379
1380         fid->f_seq = FID_SEQ_LOCAL_FILE;
1381         fid->f_oid = OTABLE_IT_OID;
1382         fid->f_ver = 0;
1383         obj = dt_locate(env, lfsck->li_bottom, fid);
1384         if (IS_ERR(obj))
1385                 GOTO(out, rc = PTR_ERR(obj));
1386
1387         lfsck->li_obj_oit = obj;
1388         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1389         if (rc != 0) {
1390                 if (rc == -ENOTSUPP)
1391                         GOTO(add, rc = 0);
1392
1393                 GOTO(out, rc);
1394         }
1395
1396         rc = lfsck_bookmark_setup(env, lfsck);
1397         if (rc != 0)
1398                 GOTO(out, rc);
1399
1400         if (master) {
1401                 rc = lfsck_namespace_setup(env, lfsck);
1402                 if (rc < 0)
1403                         GOTO(out, rc);
1404         }
1405
1406         rc = lfsck_layout_setup(env, lfsck);
1407         if (rc < 0)
1408                 GOTO(out, rc);
1409
1410         /* XXX: more LFSCK components initialization to be added here. */
1411
1412 add:
1413         rc = lfsck_instance_add(lfsck);
1414         if (rc == 0)
1415                 rc = lfsck_add_target_from_orphan(env, lfsck);
1416 out:
1417         if (root != NULL && !IS_ERR(root))
1418                 lu_object_put(env, &root->do_lu);
1419         if (rc != 0)
1420                 lfsck_instance_cleanup(env, lfsck);
1421         return rc;
1422 }
1423 EXPORT_SYMBOL(lfsck_register);
1424
1425 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1426 {
1427         struct lfsck_instance *lfsck;
1428
1429         lfsck = lfsck_instance_find(key, false, true);
1430         if (lfsck != NULL)
1431                 lfsck_instance_put(env, lfsck);
1432 }
1433 EXPORT_SYMBOL(lfsck_degister);
1434
1435 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
1436                      struct dt_device *tgt, struct obd_export *exp,
1437                      __u32 index, bool for_ost)
1438 {
1439         struct lfsck_instance   *lfsck;
1440         struct lfsck_tgt_desc   *ltd;
1441         int                      rc;
1442         ENTRY;
1443
1444         OBD_ALLOC_PTR(ltd);
1445         if (ltd == NULL)
1446                 RETURN(-ENOMEM);
1447
1448         ltd->ltd_tgt = tgt;
1449         ltd->ltd_key = key;
1450         ltd->ltd_exp = exp;
1451         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
1452         INIT_LIST_HEAD(&ltd->ltd_layout_list);
1453         atomic_set(&ltd->ltd_ref, 1);
1454         ltd->ltd_index = index;
1455
1456         spin_lock(&lfsck_instance_lock);
1457         lfsck = __lfsck_instance_find(key, true, false);
1458         if (lfsck == NULL) {
1459                 if (for_ost)
1460                         list_add_tail(&ltd->ltd_orphan_list,
1461                                       &lfsck_ost_orphan_list);
1462                 else
1463                         list_add_tail(&ltd->ltd_orphan_list,
1464                                       &lfsck_mdt_orphan_list);
1465                 spin_unlock(&lfsck_instance_lock);
1466
1467                 RETURN(0);
1468         }
1469         spin_unlock(&lfsck_instance_lock);
1470
1471         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
1472         if (rc != 0)
1473                 lfsck_tgt_put(ltd);
1474
1475         lfsck_instance_put(env, lfsck);
1476
1477         RETURN(rc);
1478 }
1479 EXPORT_SYMBOL(lfsck_add_target);
1480
1481 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
1482                       struct dt_device *tgt, __u32 index, bool for_ost)
1483 {
1484         struct lfsck_instance   *lfsck;
1485         struct lfsck_tgt_descs  *ltds;
1486         struct lfsck_tgt_desc   *ltd;
1487         struct list_head        *head;
1488         bool                     found = false;
1489
1490         if (for_ost)
1491                 head = &lfsck_ost_orphan_list;
1492         else
1493                 head = &lfsck_mdt_orphan_list;
1494
1495         spin_lock(&lfsck_instance_lock);
1496         list_for_each_entry(ltd, head, ltd_orphan_list) {
1497                 if (ltd->ltd_tgt == tgt) {
1498                         list_del_init(&ltd->ltd_orphan_list);
1499                         spin_unlock(&lfsck_instance_lock);
1500                         lfsck_tgt_put(ltd);
1501
1502                         return;
1503                 }
1504         }
1505
1506         lfsck = __lfsck_instance_find(key, true, false);
1507         spin_unlock(&lfsck_instance_lock);
1508         if (unlikely(lfsck == NULL))
1509                 return;
1510
1511         if (for_ost)
1512                 ltds = &lfsck->li_ost_descs;
1513         else
1514                 ltds = &lfsck->li_mdt_descs;
1515
1516         down_write(&ltds->ltd_rw_sem);
1517
1518         LASSERT(ltds->ltd_tgts_bitmap != NULL);
1519
1520         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
1521                 goto unlock;
1522
1523         ltd = LTD_TGT(ltds, index);
1524         if (unlikely(ltd == NULL))
1525                 goto unlock;
1526
1527         found = true;
1528         if (!list_empty(&ltd->ltd_layout_list)) {
1529                 spin_lock(&ltds->ltd_lock);
1530                 list_del_init(&ltd->ltd_layout_list);
1531                 spin_unlock(&ltds->ltd_lock);
1532         }
1533
1534         LASSERT(ltds->ltd_tgtnr > 0);
1535
1536         ltds->ltd_tgtnr--;
1537         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
1538         LTD_TGT(ltds, index) = NULL;
1539         lfsck_tgt_put(ltd);
1540
1541 unlock:
1542         if (!found) {
1543                 if (for_ost)
1544                         head = &lfsck->li_ost_descs.ltd_orphan;
1545                 else
1546                         head = &lfsck->li_ost_descs.ltd_orphan;
1547
1548                 list_for_each_entry(ltd, head, ltd_orphan_list) {
1549                         if (ltd->ltd_tgt == tgt) {
1550                                 list_del_init(&ltd->ltd_orphan_list);
1551                                 lfsck_tgt_put(ltd);
1552                                 break;
1553                         }
1554                 }
1555         }
1556
1557         up_write(&ltds->ltd_rw_sem);
1558         lfsck_instance_put(env, lfsck);
1559 }
1560 EXPORT_SYMBOL(lfsck_del_target);
1561
1562 static int __init lfsck_init(void)
1563 {
1564         int rc;
1565
1566         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
1567         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
1568         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1569         rc = lu_context_key_register(&lfsck_thread_key);
1570
1571         return rc;
1572 }
1573
1574 static void __exit lfsck_exit(void)
1575 {
1576         struct lfsck_tgt_desc *ltd;
1577         struct lfsck_tgt_desc *next;
1578
1579         LASSERT(cfs_list_empty(&lfsck_instance_list));
1580
1581         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
1582                                  ltd_orphan_list) {
1583                 list_del_init(&ltd->ltd_orphan_list);
1584                 lfsck_tgt_put(ltd);
1585         }
1586
1587         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
1588                                  ltd_orphan_list) {
1589                 list_del_init(&ltd->ltd_orphan_list);
1590                 lfsck_tgt_put(ltd);
1591         }
1592
1593         lu_context_key_degister(&lfsck_thread_key);
1594 }
1595
1596 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1597 MODULE_DESCRIPTION("LFSCK");
1598 MODULE_LICENSE("GPL");
1599
1600 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);