Whamcloud - gitweb
6ea74b8764e83fc2649aa4bb66f99534fa67fb3c
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static CFS_LIST_HEAD(lfsck_instance_list);
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         NULL
96 };
97
98 const char *lfsck_status2names(enum lfsck_status status)
99 {
100         if (unlikely(status < 0 || status >= LS_MAX))
101                 return "unknown";
102
103         return lfsck_status_names[status];
104 }
105
106 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
107 {
108         spin_lock_init(&ltds->ltd_lock);
109         init_rwsem(&ltds->ltd_rw_sem);
110         INIT_LIST_HEAD(&ltds->ltd_orphan);
111         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
112         if (ltds->ltd_tgts_bitmap == NULL)
113                 return -ENOMEM;
114
115         return 0;
116 }
117
118 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
119 {
120         struct lfsck_tgt_desc   *ltd;
121         struct lfsck_tgt_desc   *next;
122         int                      idx;
123
124         down_write(&ltds->ltd_rw_sem);
125
126         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
127                                  ltd_orphan_list) {
128                 list_del_init(&ltd->ltd_orphan_list);
129                 lfsck_tgt_put(ltd);
130         }
131
132         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
133                 up_write(&ltds->ltd_rw_sem);
134
135                 return;
136         }
137
138         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
139                 ltd = LTD_TGT(ltds, idx);
140                 if (likely(ltd != NULL)) {
141                         LASSERT(list_empty(&ltd->ltd_layout_list));
142                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
143
144                         ltds->ltd_tgtnr--;
145                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
146                         LTD_TGT(ltds, idx) = NULL;
147                         lfsck_tgt_put(ltd);
148                 }
149         }
150
151         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
152                  ltds->ltd_tgtnr);
153
154         for (idx = 0; idx < TGT_PTRS; idx++) {
155                 if (ltds->ltd_tgts_idx[idx] != NULL) {
156                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
157                         ltds->ltd_tgts_idx[idx] = NULL;
158                 }
159         }
160
161         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
162         ltds->ltd_tgts_bitmap = NULL;
163         up_write(&ltds->ltd_rw_sem);
164 }
165
166 static int __lfsck_add_target(const struct lu_env *env,
167                               struct lfsck_instance *lfsck,
168                               struct lfsck_tgt_desc *ltd,
169                               bool for_ost, bool locked)
170 {
171         struct lfsck_tgt_descs *ltds;
172         __u32                   index = ltd->ltd_index;
173         int                     rc    = 0;
174         ENTRY;
175
176         if (for_ost)
177                 ltds = &lfsck->li_ost_descs;
178         else
179                 ltds = &lfsck->li_mdt_descs;
180
181         if (!locked)
182                 down_write(&ltds->ltd_rw_sem);
183
184         LASSERT(ltds->ltd_tgts_bitmap != NULL);
185
186         if (index >= ltds->ltd_tgts_bitmap->size) {
187                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
188                                     (__u32)BITS_PER_LONG);
189                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
190                 cfs_bitmap_t *new_bitmap;
191
192                 while (newsize < index + 1)
193                         newsize <<= 1;
194
195                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
196                 if (new_bitmap == NULL)
197                         GOTO(unlock, rc = -ENOMEM);
198
199                 if (ltds->ltd_tgtnr > 0)
200                         cfs_bitmap_copy(new_bitmap, old_bitmap);
201                 ltds->ltd_tgts_bitmap = new_bitmap;
202                 CFS_FREE_BITMAP(old_bitmap);
203         }
204
205         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
206                 CERROR("%s: the device %s (%u) is registered already\n",
207                        lfsck_lfsck2name(lfsck),
208                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
209                 GOTO(unlock, rc = -EEXIST);
210         }
211
212         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
213                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
214                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
215                         GOTO(unlock, rc = -ENOMEM);
216         }
217
218         LTD_TGT(ltds, index) = ltd;
219         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
220         ltds->ltd_tgtnr++;
221
222         GOTO(unlock, rc = 0);
223
224 unlock:
225         if (!locked)
226                 up_write(&ltds->ltd_rw_sem);
227
228         return rc;
229 }
230
231 static int lfsck_add_target_from_orphan(const struct lu_env *env,
232                                         struct lfsck_instance *lfsck)
233 {
234         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
235         struct lfsck_tgt_desc   *ltd;
236         struct lfsck_tgt_desc   *next;
237         struct list_head        *head    = &lfsck_ost_orphan_list;
238         int                      rc;
239         bool                     for_ost = true;
240
241 again:
242         spin_lock(&lfsck_instance_lock);
243         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
244                 if (ltd->ltd_key == lfsck->li_bottom) {
245                         list_del_init(&ltd->ltd_orphan_list);
246                         list_add_tail(&ltd->ltd_orphan_list,
247                                       &ltds->ltd_orphan);
248                 }
249         }
250         spin_unlock(&lfsck_instance_lock);
251
252         down_write(&ltds->ltd_rw_sem);
253         while (!list_empty(&ltds->ltd_orphan)) {
254                 ltd = list_entry(ltds->ltd_orphan.next,
255                                  struct lfsck_tgt_desc,
256                                  ltd_orphan_list);
257                 list_del_init(&ltd->ltd_orphan_list);
258                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
259                 /* Do not hold the semaphore for too long time. */
260                 up_write(&ltds->ltd_rw_sem);
261                 if (rc != 0)
262                         return rc;
263
264                 down_write(&ltds->ltd_rw_sem);
265         }
266         up_write(&ltds->ltd_rw_sem);
267
268         if (for_ost) {
269                 ltds = &lfsck->li_mdt_descs;
270                 head = &lfsck_mdt_orphan_list;
271                 for_ost = false;
272                 goto again;
273         }
274
275         return 0;
276 }
277
278 static inline struct lfsck_component *
279 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
280 {
281         struct lfsck_component *com;
282
283         cfs_list_for_each_entry(com, list, lc_link) {
284                 if (com->lc_type == type)
285                         return com;
286         }
287         return NULL;
288 }
289
290 struct lfsck_component *
291 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
292 {
293         struct lfsck_component *com;
294
295         spin_lock(&lfsck->li_lock);
296         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
297         if (com != NULL)
298                 goto unlock;
299
300         com = __lfsck_component_find(lfsck, type,
301                                      &lfsck->li_list_double_scan);
302         if (com != NULL)
303                 goto unlock;
304
305         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
306
307 unlock:
308         if (com != NULL)
309                 lfsck_component_get(com);
310         spin_unlock(&lfsck->li_lock);
311         return com;
312 }
313
314 void lfsck_component_cleanup(const struct lu_env *env,
315                              struct lfsck_component *com)
316 {
317         if (!cfs_list_empty(&com->lc_link))
318                 cfs_list_del_init(&com->lc_link);
319         if (!cfs_list_empty(&com->lc_link_dir))
320                 cfs_list_del_init(&com->lc_link_dir);
321
322         lfsck_component_put(env, com);
323 }
324
325 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
326                     struct lu_fid *fid, bool locked)
327 {
328         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
329         int                      rc = 0;
330         ENTRY;
331
332         if (!locked)
333                 mutex_lock(&lfsck->li_mutex);
334
335         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
336         if (rc >= 0) {
337                 bk->lb_last_fid = *fid;
338                 /* We do not care about whether the subsequent sub-operations
339                  * failed or not. The worst case is that one FID is lost that
340                  * is not a big issue for the LFSCK since it is relative rare
341                  * for LFSCK create. */
342                 rc = lfsck_bookmark_store(env, lfsck);
343         }
344
345         if (!locked)
346                 mutex_unlock(&lfsck->li_mutex);
347
348         RETURN(rc);
349 }
350
351 static const char dot[] = ".";
352 static const char dotdot[] = "..";
353
354 static int lfsck_create_lpf_local(const struct lu_env *env,
355                                   struct lfsck_instance *lfsck,
356                                   struct dt_object *parent,
357                                   struct dt_object *child,
358                                   struct lu_attr *la,
359                                   struct dt_object_format *dof,
360                                   const char *name)
361 {
362         struct dt_device        *dev    = lfsck->li_bottom;
363         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
364         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
365         const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
366         struct thandle          *th     = NULL;
367         loff_t                   pos    = 0;
368         int                      len    = sizeof(struct lfsck_bookmark);
369         int                      rc     = 0;
370         ENTRY;
371
372         th = dt_trans_create(env, dev);
373         if (IS_ERR(th))
374                 RETURN(PTR_ERR(th));
375
376         /* 1a. create child */
377         rc = dt_declare_create(env, child, la, NULL, dof, th);
378         if (rc != 0)
379                 GOTO(stop, rc);
380
381         /* 2a. increase child nlink */
382         rc = dt_declare_ref_add(env, child, th);
383         if (rc != 0)
384                 GOTO(stop, rc);
385
386         /* 3a. insert name into parent dir */
387         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
388                                (const struct dt_key *)name, th);
389         if (rc != 0)
390                 GOTO(stop, rc);
391
392         /* 4a. increase parent nlink */
393         rc = dt_declare_ref_add(env, parent, th);
394         if (rc != 0)
395                 GOTO(stop, rc);
396
397         /* 5a. update bookmark */
398         rc = dt_declare_record_write(env, bk_obj, len, 0, th);
399         if (rc != 0)
400                 GOTO(stop, rc);
401
402         rc = dt_trans_start_local(env, dev, th);
403         if (rc != 0)
404                 GOTO(stop, rc);
405
406         dt_write_lock(env, child, 0);
407         /* 1b.1 create child */
408         rc = dt_create(env, child, la, NULL, dof, th);
409         if (rc != 0)
410                 GOTO(unlock, rc);
411
412         if (unlikely(!dt_try_as_dir(env, child)))
413                 GOTO(unlock, rc = -ENOTDIR);
414
415         /* 1b.2 insert dot into child dir */
416         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
417                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
418         if (rc != 0)
419                 GOTO(unlock, rc);
420
421         /* 1b.3 insert dotdot into child dir */
422         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
423                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
424         if (rc != 0)
425                 GOTO(unlock, rc);
426
427         /* 2b. increase child nlink */
428         rc = dt_ref_add(env, child, th);
429         dt_write_unlock(env, child);
430         if (rc != 0)
431                 GOTO(stop, rc);
432
433         /* 3b. insert name into parent dir */
434         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
435                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
436         if (rc != 0)
437                 GOTO(stop, rc);
438
439         dt_write_lock(env, parent, 0);
440         /* 4b. increase parent nlink */
441         rc = dt_ref_add(env, parent, th);
442         dt_write_unlock(env, parent);
443         if (rc != 0)
444                 GOTO(stop, rc);
445
446         bk->lb_lpf_fid = *cfid;
447         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
448
449         /* 5b. update bookmark */
450         rc = dt_record_write(env, bk_obj,
451                              lfsck_buf_get(env, bk, len), &pos, th);
452
453         GOTO(stop, rc);
454
455 unlock:
456         dt_write_unlock(env, child);
457
458 stop:
459         dt_trans_stop(env, dev, th);
460
461         return rc;
462 }
463
464 static int lfsck_create_lpf_remote(const struct lu_env *env,
465                                    struct lfsck_instance *lfsck,
466                                    struct dt_object *parent,
467                                    struct dt_object *child,
468                                    struct lu_attr *la,
469                                    struct dt_object_format *dof,
470                                    const char *name)
471 {
472         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
473         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
474         const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
475         struct thandle          *th     = NULL;
476         struct dt_device        *dev;
477         loff_t                   pos    = 0;
478         int                      len    = sizeof(struct lfsck_bookmark);
479         int                      rc     = 0;
480         ENTRY;
481
482         /* Create .lustre/lost+found/MDTxxxx. */
483
484         /* XXX: Currently, cross-MDT create operation needs to create the child
485          *      object firstly, then insert name into the parent directory. For
486          *      this case, the child object resides on current MDT (local), but
487          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
488          *      easy to contain all the sub-modifications orderly within single
489          *      transaction.
490          *
491          *      To avoid more inconsistency, we split the create operation into
492          *      two transactions:
493          *
494          *      1) create the child locally.
495          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
496          *         remotely and update the lfsck_bookmark::lb_lpf_fid locally.
497          *
498          *      If 1) done but 2) failed, then the worst case is that we lose
499          *      one object locally, which is not a big issue. (can be repaird
500          *      by LFSCK phase III) */
501
502         /* Transaction I: */
503
504         dev = lfsck->li_bottom;
505         th = dt_trans_create(env, dev);
506         if (IS_ERR(th))
507                 RETURN(PTR_ERR(th));
508
509         /* 1a. create child locally. */
510         rc = dt_declare_create(env, child, la, NULL, dof, th);
511         if (rc != 0)
512                 GOTO(stop, rc);
513
514         /* 2a. increase child nlink locally. */
515         rc = dt_declare_ref_add(env, child, th);
516         if (rc != 0)
517                 GOTO(stop, rc);
518
519         rc = dt_trans_start_local(env, dev, th);
520         if (rc != 0)
521                 GOTO(stop, rc);
522
523         dt_write_lock(env, child, 0);
524         /* 1b. create child locally. */
525         rc = dt_create(env, child, la, NULL, dof, th);
526         if (rc != 0)
527                 GOTO(unlock, rc);
528
529         if (unlikely(!dt_try_as_dir(env, child)))
530                 GOTO(unlock, rc = -ENOTDIR);
531
532         /* 2b.1 insert dot into child dir locally. */
533         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
534                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
535         if (rc != 0)
536                 GOTO(unlock, rc);
537
538         /* 2b.2 insert dotdot into child dir locally. */
539         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
540                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
541         if (rc != 0)
542                 GOTO(unlock, rc);
543
544         /* 2b.3 increase child nlink locally. */
545         rc = dt_ref_add(env, child, th);
546         dt_write_unlock(env, child);
547         dt_trans_stop(env, dev, th);
548         if (rc != 0)
549                 RETURN(rc);
550
551         /* Transaction II: */
552
553         dev = lfsck->li_next;
554         th = dt_trans_create(env, dev);
555         if (IS_ERR(th))
556                 RETURN(PTR_ERR(th));
557
558         /* 3a. insert name into parent dir remotely. */
559         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
560                                (const struct dt_key *)name, th);
561         if (rc != 0)
562                 GOTO(stop, rc);
563
564         /* 4a. increase parent nlink remotely. */
565         rc = dt_declare_ref_add(env, parent, th);
566         if (rc != 0)
567                 GOTO(stop, rc);
568
569         /* 5a. decrease child nlink for dotdot locally if former remote
570          *     update failed. */
571         rc = dt_declare_ref_del(env, child, th);
572         if (rc != 0)
573                 GOTO(stop, rc);
574
575         /* 6a. decrease child nlink for dot locally if former remote
576          *     update failed. */
577         rc = dt_declare_ref_del(env, child, th);
578         if (rc != 0)
579                 GOTO(stop, rc);
580
581         /* 7a. destroy child locally if former remote update failed. */
582         rc = dt_declare_destroy(env, child, th);
583         if (rc != 0)
584                 GOTO(stop, rc);
585
586         /* 8a. update bookmark locally. */
587         rc = dt_declare_record_write(env, bk_obj, len, 0, th);
588         if (rc != 0)
589                 GOTO(stop, rc);
590
591         rc = dt_trans_start(env, dev, th);
592         if (rc != 0)
593                 GOTO(stop, rc);
594
595         /* 3b. insert name into parent dir remotely. */
596         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
597                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
598         if (rc == 0) {
599                 dt_write_lock(env, parent, 0);
600                 /* 4b. increase parent nlink remotely. */
601                 rc = dt_ref_add(env, parent, th);
602                 dt_write_unlock(env, parent);
603         }
604         if (rc != 0) {
605                 /* 5b. decrease child nlink for dotdot locally. */
606                 dt_ref_del(env, child, th);
607                 /* 6b. decrease child nlink for dot locally. */
608                 dt_ref_del(env, child, th);
609                 /* 7b. destroy child locally. */
610                 dt_destroy(env, child, th);
611                 GOTO(stop, rc);
612         }
613
614         bk->lb_lpf_fid = *cfid;
615         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
616
617         /* 8b. update bookmark locally. */
618         rc = dt_record_write(env, bk_obj,
619                              lfsck_buf_get(env, bk, len), &pos, th);
620
621         GOTO(stop, rc);
622
623 unlock:
624         dt_write_unlock(env, child);
625 stop:
626         dt_trans_stop(env, dev, th);
627
628         return rc;
629 }
630
631 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
632  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
633  * only when it is required, such as orphan OST-objects repairing. */
634 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
635 {
636         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
637         struct lfsck_thread_info *info  = lfsck_env_info(env);
638         struct lu_fid            *cfid  = &info->lti_fid2;
639         struct lu_attr           *la    = &info->lti_la;
640         struct dt_object_format  *dof   = &info->lti_dof;
641         struct dt_object         *parent = NULL;
642         struct dt_object         *child = NULL;
643         char                      name[8];
644         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
645         int                       rc    = 0;
646         ENTRY;
647
648         LASSERT(lfsck->li_master);
649
650         sprintf(name, "MDT%04x", node);
651         if (node == 0) {
652                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
653                                                   &LU_LPF_FID);
654         } else {
655                 struct lfsck_tgt_desc *ltd;
656
657                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
658                 if (unlikely(ltd == NULL))
659                         RETURN(-ENODEV);
660
661                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
662                                                   &LU_LPF_FID);
663                 lfsck_tgt_put(ltd);
664         }
665         if (IS_ERR(parent))
666                 RETURN(PTR_ERR(parent));
667
668         if (unlikely(!dt_try_as_dir(env, parent)))
669                 GOTO(out, rc = -ENOTDIR);
670
671         mutex_lock(&lfsck->li_mutex);
672         if (lfsck->li_lpf_obj != NULL)
673                 GOTO(unlock, rc = 0);
674
675         if (fid_is_zero(&bk->lb_lpf_fid)) {
676                 /* There is corner case that: in former LFSCK scanning we have
677                  * created the .lustre/lost+found/MDTxxxx but failed to update
678                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
679                  * it from MDT0 firstly. */
680                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
681                                (const struct dt_key *)name, BYPASS_CAPA);
682                 if (rc != 0 && rc != -ENOENT)
683                         GOTO(unlock, rc);
684
685                 if (rc == 0) {
686                         bk->lb_lpf_fid = *cfid;
687                         rc = lfsck_bookmark_store(env, lfsck);
688                 } else {
689                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
690                 }
691                 if (rc != 0)
692                         GOTO(unlock, rc);
693         } else {
694                 *cfid = bk->lb_lpf_fid;
695         }
696
697         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
698         if (IS_ERR(child))
699                 GOTO(unlock, rc = PTR_ERR(child));
700
701         if (dt_object_exists(child) != 0) {
702                 if (unlikely(!dt_try_as_dir(env, child)))
703                         GOTO(unlock, rc = -ENOTDIR);
704
705                 lfsck->li_lpf_obj = child;
706                 GOTO(unlock, rc = 0);
707         }
708
709         memset(la, 0, sizeof(*la));
710         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
711         la->la_mode = S_IFDIR | S_IRWXU;
712         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
713                        LA_UID | LA_GID;
714         memset(dof, 0, sizeof(*dof));
715         dof->dof_type = dt_mode_to_dft(S_IFDIR);
716
717         if (node == 0)
718                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
719                                             dof, name);
720         else
721                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
722                                              dof, name);
723         if (rc == 0)
724                 lfsck->li_lpf_obj = child;
725
726         GOTO(unlock, rc);
727
728 unlock:
729         mutex_unlock(&lfsck->li_mutex);
730         if (rc != 0 && child != NULL && !IS_ERR(child))
731                 lu_object_put(env, &child->do_lu);
732 out:
733         if (parent != NULL && !IS_ERR(parent))
734                 lu_object_put(env, &parent->do_lu);
735
736         return rc;
737 }
738
739 static int lfsck_fid_init(struct lfsck_instance *lfsck)
740 {
741         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
742         struct seq_server_site  *ss;
743         char                    *prefix;
744         int                      rc     = 0;
745         ENTRY;
746
747         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
748         if (unlikely(ss == NULL))
749                 RETURN(-ENODEV);
750
751         OBD_ALLOC_PTR(lfsck->li_seq);
752         if (lfsck->li_seq == NULL)
753                 RETURN(-ENOMEM);
754
755         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
756         if (prefix == NULL)
757                 GOTO(out, rc = -ENOMEM);
758
759         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
760         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
761                              ss->ss_server_seq);
762         OBD_FREE(prefix, MAX_OBD_NAME + 7);
763         if (rc != 0)
764                 GOTO(out, rc);
765
766         if (fid_is_sane(&bk->lb_last_fid))
767                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
768
769         RETURN(0);
770
771 out:
772         OBD_FREE_PTR(lfsck->li_seq);
773         lfsck->li_seq = NULL;
774
775         return rc;
776 }
777
778 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
779 {
780         if (lfsck->li_seq != NULL) {
781                 seq_client_fini(lfsck->li_seq);
782                 OBD_FREE_PTR(lfsck->li_seq);
783                 lfsck->li_seq = NULL;
784         }
785 }
786
787 void lfsck_instance_cleanup(const struct lu_env *env,
788                             struct lfsck_instance *lfsck)
789 {
790         struct ptlrpc_thread    *thread = &lfsck->li_thread;
791         struct lfsck_component  *com;
792         ENTRY;
793
794         LASSERT(list_empty(&lfsck->li_link));
795         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
796
797         if (lfsck->li_obj_oit != NULL) {
798                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
799                 lfsck->li_obj_oit = NULL;
800         }
801
802         LASSERT(lfsck->li_obj_dir == NULL);
803
804         while (!cfs_list_empty(&lfsck->li_list_scan)) {
805                 com = cfs_list_entry(lfsck->li_list_scan.next,
806                                      struct lfsck_component,
807                                      lc_link);
808                 lfsck_component_cleanup(env, com);
809         }
810
811         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
812
813         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
814                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
815                                      struct lfsck_component,
816                                      lc_link);
817                 lfsck_component_cleanup(env, com);
818         }
819
820         while (!cfs_list_empty(&lfsck->li_list_idle)) {
821                 com = cfs_list_entry(lfsck->li_list_idle.next,
822                                      struct lfsck_component,
823                                      lc_link);
824                 lfsck_component_cleanup(env, com);
825         }
826
827         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
828         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
829
830         if (lfsck->li_bookmark_obj != NULL) {
831                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
832                 lfsck->li_bookmark_obj = NULL;
833         }
834
835         if (lfsck->li_lpf_obj != NULL) {
836                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
837                 lfsck->li_lpf_obj = NULL;
838         }
839
840         if (lfsck->li_los != NULL) {
841                 local_oid_storage_fini(env, lfsck->li_los);
842                 lfsck->li_los = NULL;
843         }
844
845         lfsck_fid_fini(lfsck);
846
847         OBD_FREE_PTR(lfsck);
848 }
849
850 static inline struct lfsck_instance *
851 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
852 {
853         struct lfsck_instance *lfsck;
854
855         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
856                 if (lfsck->li_bottom == key) {
857                         if (ref)
858                                 lfsck_instance_get(lfsck);
859                         if (unlink)
860                                 list_del_init(&lfsck->li_link);
861
862                         return lfsck;
863                 }
864         }
865
866         return NULL;
867 }
868
869 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
870                                            bool unlink)
871 {
872         struct lfsck_instance *lfsck;
873
874         spin_lock(&lfsck_instance_lock);
875         lfsck = __lfsck_instance_find(key, ref, unlink);
876         spin_unlock(&lfsck_instance_lock);
877
878         return lfsck;
879 }
880
881 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
882 {
883         struct lfsck_instance *tmp;
884
885         spin_lock(&lfsck_instance_lock);
886         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
887                 if (lfsck->li_bottom == tmp->li_bottom) {
888                         spin_unlock(&lfsck_instance_lock);
889                         return -EEXIST;
890                 }
891         }
892
893         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
894         spin_unlock(&lfsck_instance_lock);
895         return 0;
896 }
897
898 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
899                     const char *prefix)
900 {
901         int save = *len;
902         int flag;
903         int rc;
904         int i;
905
906         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
907         if (rc <= 0)
908                 return -ENOSPC;
909
910         *buf += rc;
911         *len -= rc;
912         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
913                 if (flag & bits) {
914                         bits &= ~flag;
915                         if (names[i] != NULL) {
916                                 rc = snprintf(*buf, *len, "%s%c", names[i],
917                                               bits != 0 ? ',' : '\n');
918                                 if (rc <= 0)
919                                         return -ENOSPC;
920
921                                 *buf += rc;
922                                 *len -= rc;
923                         }
924                 }
925         }
926         return save - *len;
927 }
928
929 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
930 {
931         int rc;
932
933         if (time != 0)
934                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
935                               cfs_time_current_sec() - time);
936         else
937                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
938         if (rc <= 0)
939                 return -ENOSPC;
940
941         *buf += rc;
942         *len -= rc;
943         return rc;
944 }
945
946 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
947                    const char *prefix)
948 {
949         int rc;
950
951         if (fid_is_zero(&pos->lp_dir_parent)) {
952                 if (pos->lp_oit_cookie == 0)
953                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
954                                       prefix);
955                 else
956                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
957                                       prefix, pos->lp_oit_cookie);
958         } else {
959                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
960                               prefix, pos->lp_oit_cookie,
961                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
962         }
963         if (rc <= 0)
964                 return -ENOSPC;
965
966         *buf += rc;
967         *len -= rc;
968         return rc;
969 }
970
971 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
972                     struct lfsck_position *pos, bool init)
973 {
974         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
975
976         if (unlikely(lfsck->li_di_oit == NULL)) {
977                 memset(pos, 0, sizeof(*pos));
978                 return;
979         }
980
981         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
982         if (!lfsck->li_current_oit_processed && !init)
983                 pos->lp_oit_cookie--;
984
985         LASSERT(pos->lp_oit_cookie > 0);
986
987         if (lfsck->li_di_dir != NULL) {
988                 struct dt_object *dto = lfsck->li_obj_dir;
989
990                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
991                                                         lfsck->li_di_dir);
992
993                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
994                         fid_zero(&pos->lp_dir_parent);
995                         pos->lp_dir_cookie = 0;
996                 } else {
997                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
998                 }
999         } else {
1000                 fid_zero(&pos->lp_dir_parent);
1001                 pos->lp_dir_cookie = 0;
1002         }
1003 }
1004
1005 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1006 {
1007         lfsck->li_bookmark_ram.lb_speed_limit = limit;
1008         if (limit != LFSCK_SPEED_NO_LIMIT) {
1009                 if (limit > HZ) {
1010                         lfsck->li_sleep_rate = limit / HZ;
1011                         lfsck->li_sleep_jif = 1;
1012                 } else {
1013                         lfsck->li_sleep_rate = 1;
1014                         lfsck->li_sleep_jif = HZ / limit;
1015                 }
1016         } else {
1017                 lfsck->li_sleep_jif = 0;
1018                 lfsck->li_sleep_rate = 0;
1019         }
1020 }
1021
1022 void lfsck_control_speed(struct lfsck_instance *lfsck)
1023 {
1024         struct ptlrpc_thread *thread = &lfsck->li_thread;
1025         struct l_wait_info    lwi;
1026
1027         if (lfsck->li_sleep_jif > 0 &&
1028             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1029                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1030                                        LWI_ON_SIGNAL_NOOP, NULL);
1031
1032                 l_wait_event(thread->t_ctl_waitq,
1033                              !thread_is_running(thread),
1034                              &lwi);
1035                 lfsck->li_new_scanned = 0;
1036         }
1037 }
1038
1039 void lfsck_control_speed_by_self(struct lfsck_component *com)
1040 {
1041         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1042         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1043         struct l_wait_info       lwi;
1044
1045         if (lfsck->li_sleep_jif > 0 &&
1046             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1047                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1048                                        LWI_ON_SIGNAL_NOOP, NULL);
1049
1050                 l_wait_event(thread->t_ctl_waitq,
1051                              !thread_is_running(thread),
1052                              &lwi);
1053                 com->lc_new_scanned = 0;
1054         }
1055 }
1056
1057 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
1058                             struct lu_fid *fid)
1059 {
1060         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
1061                      !dt_try_as_dir(env, obj)))
1062                 return -ENOTDIR;
1063
1064         return dt_lookup(env, obj, (struct dt_rec *)fid,
1065                          (const struct dt_key *)"..", BYPASS_CAPA);
1066 }
1067
1068 static int lfsck_needs_scan_dir(const struct lu_env *env,
1069                                 struct lfsck_instance *lfsck,
1070                                 struct dt_object *obj)
1071 {
1072         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
1073         int            depth = 0;
1074         int            rc;
1075
1076         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
1077             cfs_list_empty(&lfsck->li_list_dir))
1078                RETURN(0);
1079
1080         while (1) {
1081                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
1082                  *      which is the agent directory to manage the objects
1083                  *      which name entries reside on remote MDTs. Related
1084                  *      consistency verification will be processed in LFSCK
1085                  *      phase III. */
1086                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
1087                         if (depth > 0)
1088                                 lfsck_object_put(env, obj);
1089                         return 1;
1090                 }
1091
1092                 /* .lustre doesn't contain "real" user objects, no need lfsck */
1093                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
1094                         if (depth > 0)
1095                                 lfsck_object_put(env, obj);
1096                         return 0;
1097                 }
1098
1099                 dt_read_lock(env, obj, MOR_TGT_CHILD);
1100                 if (unlikely(lfsck_is_dead_obj(obj))) {
1101                         dt_read_unlock(env, obj);
1102                         if (depth > 0)
1103                                 lfsck_object_put(env, obj);
1104                         return 0;
1105                 }
1106
1107                 rc = dt_xattr_get(env, obj,
1108                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
1109                                   BYPASS_CAPA);
1110                 dt_read_unlock(env, obj);
1111                 if (rc >= 0) {
1112                         if (depth > 0)
1113                                 lfsck_object_put(env, obj);
1114                         return 1;
1115                 }
1116
1117                 if (rc < 0 && rc != -ENODATA) {
1118                         if (depth > 0)
1119                                 lfsck_object_put(env, obj);
1120                         return rc;
1121                 }
1122
1123                 rc = lfsck_parent_fid(env, obj, fid);
1124                 if (depth > 0)
1125                         lfsck_object_put(env, obj);
1126                 if (rc != 0)
1127                         return rc;
1128
1129                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
1130                         return 0;
1131
1132                 obj = lfsck_object_find(env, lfsck, fid);
1133                 if (obj == NULL)
1134                         return 0;
1135                 else if (IS_ERR(obj))
1136                         return PTR_ERR(obj);
1137
1138                 if (!dt_object_exists(obj)) {
1139                         lfsck_object_put(env, obj);
1140                         return 0;
1141                 }
1142
1143                 /* Currently, only client visible directory can be remote. */
1144                 if (dt_object_remote(obj)) {
1145                         lfsck_object_put(env, obj);
1146                         return 1;
1147                 }
1148
1149                 depth++;
1150         }
1151         return 0;
1152 }
1153
1154 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1155                                                  struct lfsck_component *com,
1156                                                  struct lfsck_start_param *lsp)
1157 {
1158         struct lfsck_thread_args *lta;
1159         int                       rc;
1160
1161         OBD_ALLOC_PTR(lta);
1162         if (lta == NULL)
1163                 return ERR_PTR(-ENOMEM);
1164
1165         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1166         if (rc != 0) {
1167                 OBD_FREE_PTR(lta);
1168                 return ERR_PTR(rc);
1169         }
1170
1171         lta->lta_lfsck = lfsck_instance_get(lfsck);
1172         if (com != NULL)
1173                 lta->lta_com = lfsck_component_get(com);
1174
1175         lta->lta_lsp = lsp;
1176
1177         return lta;
1178 }
1179
1180 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1181 {
1182         if (lta->lta_com != NULL)
1183                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1184         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1185         lu_env_fini(&lta->lta_env);
1186         OBD_FREE_PTR(lta);
1187 }
1188
1189 /* LFSCK wrap functions */
1190
1191 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
1192                 bool new_checked)
1193 {
1194         struct lfsck_component *com;
1195
1196         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1197                 com->lc_ops->lfsck_fail(env, com, new_checked);
1198         }
1199 }
1200
1201 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
1202 {
1203         struct lfsck_component *com;
1204         int                     rc  = 0;
1205         int                     rc1 = 0;
1206
1207         if (likely(cfs_time_beforeq(cfs_time_current(),
1208                                     lfsck->li_time_next_checkpoint)))
1209                 return 0;
1210
1211         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1212         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1213                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
1214                 if (rc != 0)
1215                         rc1 = rc;
1216         }
1217
1218         lfsck->li_time_last_checkpoint = cfs_time_current();
1219         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1220                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1221         return rc1 != 0 ? rc1 : rc;
1222 }
1223
1224 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
1225                struct lfsck_start_param *lsp)
1226 {
1227         struct dt_object       *obj     = NULL;
1228         struct lfsck_component *com;
1229         struct lfsck_component *next;
1230         struct lfsck_position  *pos     = NULL;
1231         const struct dt_it_ops *iops    =
1232                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
1233         struct dt_it           *di;
1234         int                     rc;
1235         ENTRY;
1236
1237         LASSERT(lfsck->li_obj_dir == NULL);
1238         LASSERT(lfsck->li_di_dir == NULL);
1239
1240         lfsck->li_current_oit_processed = 0;
1241         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1242                 com->lc_new_checked = 0;
1243                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1244                         com->lc_journal = 0;
1245
1246                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
1247                 if (rc != 0)
1248                         GOTO(out, rc);
1249
1250                 if ((pos == NULL) ||
1251                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
1252                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
1253                         pos = &com->lc_pos_start;
1254         }
1255
1256         /* Init otable-based iterator. */
1257         if (pos == NULL) {
1258                 rc = iops->load(env, lfsck->li_di_oit, 0);
1259                 if (rc > 0) {
1260                         lfsck->li_oit_over = 1;
1261                         rc = 0;
1262                 }
1263
1264                 GOTO(out, rc);
1265         }
1266
1267         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
1268         if (rc < 0)
1269                 GOTO(out, rc);
1270         else if (rc > 0)
1271                 lfsck->li_oit_over = 1;
1272
1273         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
1274                 GOTO(out, rc = 0);
1275
1276         /* Find the directory for namespace-based traverse. */
1277         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
1278         if (obj == NULL)
1279                 GOTO(out, rc = 0);
1280         else if (IS_ERR(obj))
1281                 RETURN(PTR_ERR(obj));
1282
1283         /* XXX: Currently, skip remote object, the consistency for
1284          *      remote object will be processed in LFSCK phase III. */
1285         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
1286             unlikely(!S_ISDIR(lfsck_object_type(obj))))
1287                 GOTO(out, rc = 0);
1288
1289         if (unlikely(!dt_try_as_dir(env, obj)))
1290                 GOTO(out, rc = -ENOTDIR);
1291
1292         /* Init the namespace-based directory traverse. */
1293         iops = &obj->do_index_ops->dio_it;
1294         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1295         if (IS_ERR(di))
1296                 GOTO(out, rc = PTR_ERR(di));
1297
1298         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
1299
1300         rc = iops->load(env, di, pos->lp_dir_cookie);
1301         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
1302                 rc = iops->next(env, di);
1303         else if (rc > 0)
1304                 rc = 0;
1305
1306         if (rc != 0) {
1307                 iops->put(env, di);
1308                 iops->fini(env, di);
1309                 GOTO(out, rc);
1310         }
1311
1312         lfsck->li_obj_dir = lfsck_object_get(obj);
1313         lfsck->li_cookie_dir = iops->store(env, di);
1314         spin_lock(&lfsck->li_lock);
1315         lfsck->li_di_dir = di;
1316         spin_unlock(&lfsck->li_lock);
1317
1318         GOTO(out, rc = 0);
1319
1320 out:
1321         if (obj != NULL)
1322                 lfsck_object_put(env, obj);
1323
1324         if (rc < 0) {
1325                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1326                                              lc_link)
1327                         com->lc_ops->lfsck_post(env, com, rc, true);
1328
1329                 return rc;
1330         }
1331
1332         rc = 0;
1333         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
1334         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1335                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
1336                 if (rc != 0)
1337                         break;
1338         }
1339
1340         lfsck->li_time_last_checkpoint = cfs_time_current();
1341         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1342                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1343         return rc;
1344 }
1345
1346 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
1347                    struct dt_object *obj)
1348 {
1349         struct lfsck_component *com;
1350         const struct dt_it_ops *iops;
1351         struct dt_it           *di;
1352         int                     rc;
1353         ENTRY;
1354
1355         LASSERT(lfsck->li_obj_dir == NULL);
1356
1357         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1358                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
1359                 if (rc != 0)
1360                         RETURN(rc);
1361         }
1362
1363         rc = lfsck_needs_scan_dir(env, lfsck, obj);
1364         if (rc <= 0)
1365                 GOTO(out, rc);
1366
1367         if (unlikely(!dt_try_as_dir(env, obj)))
1368                 GOTO(out, rc = -ENOTDIR);
1369
1370         iops = &obj->do_index_ops->dio_it;
1371         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1372         if (IS_ERR(di))
1373                 GOTO(out, rc = PTR_ERR(di));
1374
1375         rc = iops->load(env, di, 0);
1376         if (rc == 0)
1377                 rc = iops->next(env, di);
1378         else if (rc > 0)
1379                 rc = 0;
1380
1381         if (rc != 0) {
1382                 iops->put(env, di);
1383                 iops->fini(env, di);
1384                 GOTO(out, rc);
1385         }
1386
1387         lfsck->li_obj_dir = lfsck_object_get(obj);
1388         lfsck->li_cookie_dir = iops->store(env, di);
1389         spin_lock(&lfsck->li_lock);
1390         lfsck->li_di_dir = di;
1391         spin_unlock(&lfsck->li_lock);
1392
1393         GOTO(out, rc = 0);
1394
1395 out:
1396         if (rc < 0)
1397                 lfsck_fail(env, lfsck, false);
1398         return (rc > 0 ? 0 : rc);
1399 }
1400
1401 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
1402                    struct dt_object *obj, struct lu_dirent *ent)
1403 {
1404         struct lfsck_component *com;
1405         int                     rc;
1406
1407         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1408                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
1409                 if (rc != 0)
1410                         return rc;
1411         }
1412         return 0;
1413 }
1414
1415 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
1416                int result)
1417 {
1418         struct lfsck_component *com;
1419         struct lfsck_component *next;
1420         int                     rc  = 0;
1421         int                     rc1 = 0;
1422
1423         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1424         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1425                 rc = com->lc_ops->lfsck_post(env, com, result, false);
1426                 if (rc != 0)
1427                         rc1 = rc;
1428         }
1429
1430         lfsck->li_time_last_checkpoint = cfs_time_current();
1431         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1432                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1433
1434         /* Ignore some component post failure to make other can go ahead. */
1435         return result;
1436 }
1437
1438 static void lfsck_interpret(const struct lu_env *env,
1439                             struct lfsck_instance *lfsck,
1440                             struct ptlrpc_request *req, void *args, int result)
1441 {
1442         struct lfsck_async_interpret_args *laia = args;
1443         struct lfsck_component            *com;
1444
1445         LASSERT(laia->laia_shared);
1446
1447         spin_lock(&lfsck->li_lock);
1448         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1449                 if (com->lc_ops->lfsck_interpret != NULL) {
1450                         laia->laia_com = com;
1451                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1452                 }
1453         }
1454
1455         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1456                 if (com->lc_ops->lfsck_interpret != NULL) {
1457                         laia->laia_com = com;
1458                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1459                 }
1460         }
1461         spin_unlock(&lfsck->li_lock);
1462 }
1463
1464 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
1465 {
1466         struct lfsck_component *com;
1467         struct lfsck_component *next;
1468         struct l_wait_info      lwi = { 0 };
1469         int                     rc  = 0;
1470         int                     rc1 = 0;
1471
1472         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1473                                      lc_link) {
1474                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1475                         com->lc_journal = 0;
1476
1477                 rc = com->lc_ops->lfsck_double_scan(env, com);
1478                 if (rc != 0)
1479                         rc1 = rc;
1480         }
1481
1482         l_wait_event(lfsck->li_thread.t_ctl_waitq,
1483                      atomic_read(&lfsck->li_double_scan_count) == 0,
1484                      &lwi);
1485
1486         return rc1 != 0 ? rc1 : rc;
1487 }
1488
1489 static int lfsck_stop_notify(const struct lu_env *env,
1490                              struct lfsck_instance *lfsck,
1491                              struct lfsck_tgt_descs *ltds,
1492                              struct lfsck_tgt_desc *ltd, __u16 type)
1493 {
1494         struct ptlrpc_request_set *set;
1495         struct lfsck_component    *com;
1496         int                        rc  = 0;
1497         ENTRY;
1498
1499         spin_lock(&lfsck->li_lock);
1500         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1501         if (com == NULL)
1502                 com = __lfsck_component_find(lfsck, type,
1503                                              &lfsck->li_list_double_scan);
1504         if (com != NULL)
1505                 lfsck_component_get(com);
1506         spin_lock(&lfsck->li_lock);
1507
1508         if (com != NULL) {
1509                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1510                         set = ptlrpc_prep_set();
1511                         if (set == NULL) {
1512                                 lfsck_component_put(env, com);
1513
1514                                 RETURN(-ENOMEM);
1515                         }
1516
1517                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1518                                                             ltd, set);
1519                         if (rc == 0)
1520                                 rc = ptlrpc_set_wait(set);
1521
1522                         ptlrpc_set_destroy(set);
1523                 }
1524
1525                 lfsck_component_put(env, com);
1526         }
1527
1528         RETURN(rc);
1529 }
1530
1531 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
1532 {
1533         struct lfsck_component *com;
1534         struct lfsck_component *next;
1535
1536         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1537                                  lc_link) {
1538                 if (com->lc_ops->lfsck_quit != NULL)
1539                         com->lc_ops->lfsck_quit(env, com);
1540         }
1541
1542         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1543                                  lc_link) {
1544                 if (com->lc_ops->lfsck_quit != NULL)
1545                         com->lc_ops->lfsck_quit(env, com);
1546         }
1547 }
1548
1549 static int lfsck_async_interpret(const struct lu_env *env,
1550                                  struct ptlrpc_request *req,
1551                                  void *args, int rc)
1552 {
1553         struct lfsck_async_interpret_args *laia = args;
1554         struct lfsck_instance             *lfsck;
1555
1556         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1557                               li_mdt_descs);
1558         lfsck_interpret(env, lfsck, req, laia, rc);
1559         lfsck_tgt_put(laia->laia_ltd);
1560         if (rc != 0 && laia->laia_result != -EALREADY)
1561                 laia->laia_result = rc;
1562
1563         return 0;
1564 }
1565
1566 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1567                         struct lfsck_request *lr,
1568                         struct ptlrpc_request_set *set,
1569                         ptlrpc_interpterer_t interpreter,
1570                         void *args, int request)
1571 {
1572         struct lfsck_async_interpret_args *laia;
1573         struct ptlrpc_request             *req;
1574         struct lfsck_request              *tmp;
1575         struct req_format                 *format;
1576         int                                rc;
1577
1578         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
1579                 return -EOPNOTSUPP;
1580
1581         switch (request) {
1582         case LFSCK_NOTIFY:
1583                 format = &RQF_LFSCK_NOTIFY;
1584                 break;
1585         case LFSCK_QUERY:
1586                 format = &RQF_LFSCK_QUERY;
1587                 break;
1588         default:
1589                 CERROR("%s: unknown async request: opc = %d\n",
1590                        exp->exp_obd->obd_name, request);
1591                 return -EINVAL;
1592         }
1593
1594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1595         if (req == NULL)
1596                 return -ENOMEM;
1597
1598         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1599         if (rc != 0) {
1600                 ptlrpc_request_free(req);
1601
1602                 return rc;
1603         }
1604
1605         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1606         *tmp = *lr;
1607         ptlrpc_request_set_replen(req);
1608
1609         laia = ptlrpc_req_async_args(req);
1610         *laia = *(struct lfsck_async_interpret_args *)args;
1611         if (laia->laia_com != NULL)
1612                 lfsck_component_get(laia->laia_com);
1613         req->rq_interpret_reply = interpreter;
1614         ptlrpc_set_add_req(set, req);
1615
1616         return 0;
1617 }
1618
1619 /* external interfaces */
1620
1621 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
1622 {
1623         struct lu_env           env;
1624         struct lfsck_instance  *lfsck;
1625         int                     rc;
1626         ENTRY;
1627
1628         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1629         if (rc != 0)
1630                 RETURN(rc);
1631
1632         lfsck = lfsck_instance_find(key, true, false);
1633         if (likely(lfsck != NULL)) {
1634                 rc = snprintf(buf, len, "%u\n",
1635                               lfsck->li_bookmark_ram.lb_speed_limit);
1636                 lfsck_instance_put(&env, lfsck);
1637         } else {
1638                 rc = -ENODEV;
1639         }
1640
1641         lu_env_fini(&env);
1642
1643         RETURN(rc);
1644 }
1645 EXPORT_SYMBOL(lfsck_get_speed);
1646
1647 int lfsck_set_speed(struct dt_device *key, int val)
1648 {
1649         struct lu_env           env;
1650         struct lfsck_instance  *lfsck;
1651         int                     rc;
1652         ENTRY;
1653
1654         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1655         if (rc != 0)
1656                 RETURN(rc);
1657
1658         lfsck = lfsck_instance_find(key, true, false);
1659         if (likely(lfsck != NULL)) {
1660                 mutex_lock(&lfsck->li_mutex);
1661                 __lfsck_set_speed(lfsck, val);
1662                 rc = lfsck_bookmark_store(&env, lfsck);
1663                 mutex_unlock(&lfsck->li_mutex);
1664                 lfsck_instance_put(&env, lfsck);
1665         } else {
1666                 rc = -ENODEV;
1667         }
1668
1669         lu_env_fini(&env);
1670
1671         RETURN(rc);
1672 }
1673 EXPORT_SYMBOL(lfsck_set_speed);
1674
1675 int lfsck_get_windows(struct dt_device *key, void *buf, int len)
1676 {
1677         struct lu_env           env;
1678         struct lfsck_instance  *lfsck;
1679         int                     rc;
1680         ENTRY;
1681
1682         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1683         if (rc != 0)
1684                 RETURN(rc);
1685
1686         lfsck = lfsck_instance_find(key, true, false);
1687         if (likely(lfsck != NULL)) {
1688                 rc = snprintf(buf, len, "%u\n",
1689                               lfsck->li_bookmark_ram.lb_async_windows);
1690                 lfsck_instance_put(&env, lfsck);
1691         } else {
1692                 rc = -ENODEV;
1693         }
1694
1695         lu_env_fini(&env);
1696
1697         RETURN(rc);
1698 }
1699 EXPORT_SYMBOL(lfsck_get_windows);
1700
1701 int lfsck_set_windows(struct dt_device *key, int val)
1702 {
1703         struct lu_env           env;
1704         struct lfsck_instance  *lfsck;
1705         int                     rc;
1706         ENTRY;
1707
1708         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1709         if (rc != 0)
1710                 RETURN(rc);
1711
1712         lfsck = lfsck_instance_find(key, true, false);
1713         if (likely(lfsck != NULL)) {
1714                 if (val > LFSCK_ASYNC_WIN_MAX) {
1715                         CERROR("%s: Too large async windows size, which "
1716                                "may cause memory issues. The valid range "
1717                                "is [0 - %u]. If you do not want to restrict "
1718                                "the windows size for async requests pipeline, "
1719                                "just set it as 0.\n",
1720                                lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1721                         rc = -EINVAL;
1722                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1723                         mutex_lock(&lfsck->li_mutex);
1724                         lfsck->li_bookmark_ram.lb_async_windows = val;
1725                         rc = lfsck_bookmark_store(&env, lfsck);
1726                         mutex_unlock(&lfsck->li_mutex);
1727                 }
1728                 lfsck_instance_put(&env, lfsck);
1729         } else {
1730                 rc = -ENODEV;
1731         }
1732
1733         lu_env_fini(&env);
1734
1735         RETURN(rc);
1736 }
1737 EXPORT_SYMBOL(lfsck_set_windows);
1738
1739 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
1740 {
1741         struct lu_env           env;
1742         struct lfsck_instance  *lfsck;
1743         struct lfsck_component *com;
1744         int                     rc;
1745         ENTRY;
1746
1747         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1748         if (rc != 0)
1749                 RETURN(rc);
1750
1751         lfsck = lfsck_instance_find(key, true, false);
1752         if (likely(lfsck != NULL)) {
1753                 com = lfsck_component_find(lfsck, type);
1754                 if (likely(com != NULL)) {
1755                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
1756                         lfsck_component_put(&env, com);
1757                 } else {
1758                         rc = -ENOTSUPP;
1759                 }
1760
1761                 lfsck_instance_put(&env, lfsck);
1762         } else {
1763                 rc = -ENODEV;
1764         }
1765
1766         lu_env_fini(&env);
1767
1768         RETURN(rc);
1769 }
1770 EXPORT_SYMBOL(lfsck_dump);
1771
1772 static int lfsck_stop_all(const struct lu_env *env,
1773                           struct lfsck_instance *lfsck,
1774                           struct lfsck_stop *stop)
1775 {
1776         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1777         struct lfsck_request              *lr     = &info->lti_lr;
1778         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1779         struct ptlrpc_request_set         *set;
1780         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1781         struct lfsck_tgt_desc             *ltd;
1782         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1783         __u32                              idx;
1784         int                                rc     = 0;
1785         int                                rc1    = 0;
1786         ENTRY;
1787
1788         LASSERT(stop->ls_flags & LPF_BROADCAST);
1789
1790         set = ptlrpc_prep_set();
1791         if (unlikely(set == NULL)) {
1792                 CERROR("%s: cannot allocate memory for stop LFSCK on "
1793                        "all targets\n", lfsck_lfsck2name(lfsck));
1794
1795                 RETURN(-ENOMEM);
1796         }
1797
1798         memset(lr, 0, sizeof(*lr));
1799         lr->lr_event = LE_STOP;
1800         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1801         lr->lr_status = stop->ls_status;
1802         lr->lr_version = bk->lb_version;
1803         lr->lr_active = LFSCK_TYPES_ALL;
1804         lr->lr_param = stop->ls_flags;
1805
1806         laia->laia_com = NULL;
1807         laia->laia_ltds = ltds;
1808         laia->laia_lr = lr;
1809         laia->laia_result = 0;
1810         laia->laia_shared = 1;
1811
1812         down_read(&ltds->ltd_rw_sem);
1813         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1814                 ltd = lfsck_tgt_get(ltds, idx);
1815                 LASSERT(ltd != NULL);
1816
1817                 laia->laia_ltd = ltd;
1818                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1819                                          lfsck_async_interpret, laia,
1820                                          LFSCK_NOTIFY);
1821                 if (rc != 0) {
1822                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1823                         lfsck_tgt_put(ltd);
1824                         CWARN("%s: cannot notify MDT %x for LFSCK stop: "
1825                               "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1826                         rc1 = rc;
1827                 }
1828         }
1829         up_read(&ltds->ltd_rw_sem);
1830
1831         rc = ptlrpc_set_wait(set);
1832         ptlrpc_set_destroy(set);
1833
1834         if (rc == 0)
1835                 rc = laia->laia_result;
1836
1837         if (rc == -EALREADY)
1838                 rc = 0;
1839
1840         if (rc != 0)
1841                 CWARN("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1842                       lfsck_lfsck2name(lfsck), rc);
1843
1844         RETURN(rc != 0 ? rc : rc1);
1845 }
1846
1847 static int lfsck_start_all(const struct lu_env *env,
1848                            struct lfsck_instance *lfsck,
1849                            struct lfsck_start *start)
1850 {
1851         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1852         struct lfsck_request              *lr     = &info->lti_lr;
1853         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1854         struct ptlrpc_request_set         *set;
1855         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1856         struct lfsck_tgt_desc             *ltd;
1857         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1858         __u32                              idx;
1859         int                                rc     = 0;
1860         ENTRY;
1861
1862         LASSERT(start->ls_flags & LPF_BROADCAST);
1863
1864         set = ptlrpc_prep_set();
1865         if (unlikely(set == NULL)) {
1866                 if (bk->lb_param & LPF_FAILOUT) {
1867                         CERROR("%s: cannot allocate memory for start LFSCK on "
1868                                "all targets, failout.\n",
1869                                lfsck_lfsck2name(lfsck));
1870
1871                         RETURN(-ENOMEM);
1872                 } else {
1873                         CWARN("%s: cannot allocate memory for start LFSCK on "
1874                               "all targets, partly scan.\n",
1875                               lfsck_lfsck2name(lfsck));
1876
1877                         RETURN(0);
1878                 }
1879         }
1880
1881         memset(lr, 0, sizeof(*lr));
1882         lr->lr_event = LE_START;
1883         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1884         lr->lr_speed = bk->lb_speed_limit;
1885         lr->lr_version = bk->lb_version;
1886         lr->lr_active = start->ls_active;
1887         lr->lr_param = start->ls_flags;
1888         lr->lr_async_windows = bk->lb_async_windows;
1889         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1890                        LSV_ASYNC_WINDOWS;
1891
1892         laia->laia_com = NULL;
1893         laia->laia_ltds = ltds;
1894         laia->laia_lr = lr;
1895         laia->laia_result = 0;
1896         laia->laia_shared = 1;
1897
1898         down_read(&ltds->ltd_rw_sem);
1899         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1900                 ltd = lfsck_tgt_get(ltds, idx);
1901                 LASSERT(ltd != NULL);
1902
1903                 laia->laia_ltd = ltd;
1904                 ltd->ltd_layout_done = 0;
1905                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1906                                          lfsck_async_interpret, laia,
1907                                          LFSCK_NOTIFY);
1908                 if (rc != 0) {
1909                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1910                         lfsck_tgt_put(ltd);
1911                         if (bk->lb_param & LPF_FAILOUT) {
1912                                 CERROR("%s: cannot notify MDT %x for LFSCK "
1913                                        "start, failout: rc = %d\n",
1914                                        lfsck_lfsck2name(lfsck), idx, rc);
1915                                 break;
1916                         } else {
1917                                 CWARN("%s: cannot notify MDT %x for LFSCK "
1918                                       "start, partly scan: rc = %d\n",
1919                                       lfsck_lfsck2name(lfsck), idx, rc);
1920                                 rc = 0;
1921                         }
1922                 }
1923         }
1924         up_read(&ltds->ltd_rw_sem);
1925
1926         if (rc != 0) {
1927                 ptlrpc_set_destroy(set);
1928
1929                 RETURN(rc);
1930         }
1931
1932         rc = ptlrpc_set_wait(set);
1933         ptlrpc_set_destroy(set);
1934
1935         if (rc == 0)
1936                 rc = laia->laia_result;
1937
1938         if (rc != 0) {
1939                 if (bk->lb_param & LPF_FAILOUT) {
1940                         struct lfsck_stop *stop = &info->lti_stop;
1941
1942                         CERROR("%s: cannot start LFSCK on some MDTs, "
1943                                "stop all: rc = %d\n",
1944                                lfsck_lfsck2name(lfsck), rc);
1945                         if (rc != -EALREADY) {
1946                                 stop->ls_status = LS_FAILED;
1947                                 stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1948                                 lfsck_stop_all(env, lfsck, stop);
1949                         }
1950                 } else {
1951                         CWARN("%s: cannot start LFSCK on some MDTs, "
1952                               "partly scan: rc = %d\n",
1953                               lfsck_lfsck2name(lfsck), rc);
1954                         rc = 0;
1955                 }
1956         }
1957
1958         RETURN(rc);
1959 }
1960
1961 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1962                 struct lfsck_start_param *lsp)
1963 {
1964         struct lfsck_start              *start  = lsp->lsp_start;
1965         struct lfsck_instance           *lfsck;
1966         struct lfsck_bookmark           *bk;
1967         struct ptlrpc_thread            *thread;
1968         struct lfsck_component          *com;
1969         struct l_wait_info               lwi    = { 0 };
1970         struct lfsck_thread_args        *lta;
1971         bool                             dirty  = false;
1972         long                             rc     = 0;
1973         __u16                            valid  = 0;
1974         __u16                            flags  = 0;
1975         __u16                            type   = 1;
1976         ENTRY;
1977
1978         lfsck = lfsck_instance_find(key, true, false);
1979         if (unlikely(lfsck == NULL))
1980                 RETURN(-ENODEV);
1981
1982         /* System is not ready, try again later. */
1983         if (unlikely(lfsck->li_namespace == NULL))
1984                 GOTO(put, rc = -EAGAIN);
1985
1986         /* start == NULL means auto trigger paused LFSCK. */
1987         if ((start == NULL) &&
1988             (cfs_list_empty(&lfsck->li_list_scan) ||
1989              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
1990                 GOTO(put, rc = 0);
1991
1992         bk = &lfsck->li_bookmark_ram;
1993         thread = &lfsck->li_thread;
1994         mutex_lock(&lfsck->li_mutex);
1995         spin_lock(&lfsck->li_lock);
1996         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1997                 rc = -EALREADY;
1998                 while (start->ls_active != 0) {
1999                         if (!(type & start->ls_active)) {
2000                                 type <<= 1;
2001                                 continue;
2002                         }
2003
2004                         com = __lfsck_component_find(lfsck, type,
2005                                                      &lfsck->li_list_scan);
2006                         if (com == NULL)
2007                                 com = __lfsck_component_find(lfsck, type,
2008                                                 &lfsck->li_list_double_scan);
2009                         if (com == NULL) {
2010                                 rc = -EOPNOTSUPP;
2011                                 break;
2012                         }
2013
2014                         if (com->lc_ops->lfsck_join != NULL) {
2015                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2016                                 if (rc != 0 && rc != -EALREADY)
2017                                         break;
2018                         }
2019                         start->ls_active &= ~type;
2020                         type <<= 1;
2021                 }
2022                 spin_unlock(&lfsck->li_lock);
2023                 GOTO(out, rc);
2024         }
2025         spin_unlock(&lfsck->li_lock);
2026
2027         lfsck->li_status = 0;
2028         lfsck->li_oit_over = 0;
2029         lfsck->li_start_unplug = 0;
2030         lfsck->li_drop_dryrun = 0;
2031         lfsck->li_new_scanned = 0;
2032
2033         /* For auto trigger. */
2034         if (start == NULL)
2035                 goto trigger;
2036
2037         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2038                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2039                        lfsck_lfsck2name(lfsck));
2040
2041                 GOTO(out, rc = -EPERM);
2042         }
2043
2044         start->ls_version = bk->lb_version;
2045         if (start->ls_valid & LSV_SPEED_LIMIT) {
2046                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
2047                 dirty = true;
2048         }
2049
2050         if (start->ls_valid & LSV_ASYNC_WINDOWS &&
2051             bk->lb_async_windows != start->ls_async_windows) {
2052                 bk->lb_async_windows = start->ls_async_windows;
2053                 dirty = true;
2054         }
2055
2056         if (start->ls_valid & LSV_ERROR_HANDLE) {
2057                 valid |= DOIV_ERROR_HANDLE;
2058                 if (start->ls_flags & LPF_FAILOUT)
2059                         flags |= DOIF_FAILOUT;
2060
2061                 if ((start->ls_flags & LPF_FAILOUT) &&
2062                     !(bk->lb_param & LPF_FAILOUT)) {
2063                         bk->lb_param |= LPF_FAILOUT;
2064                         dirty = true;
2065                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
2066                            (bk->lb_param & LPF_FAILOUT)) {
2067                         bk->lb_param &= ~LPF_FAILOUT;
2068                         dirty = true;
2069                 }
2070         }
2071
2072         if (start->ls_valid & LSV_DRYRUN) {
2073                 valid |= DOIV_DRYRUN;
2074                 if (start->ls_flags & LPF_DRYRUN)
2075                         flags |= DOIF_DRYRUN;
2076
2077                 if ((start->ls_flags & LPF_DRYRUN) &&
2078                     !(bk->lb_param & LPF_DRYRUN)) {
2079                         bk->lb_param |= LPF_DRYRUN;
2080                         dirty = true;
2081                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
2082                            (bk->lb_param & LPF_DRYRUN)) {
2083                         bk->lb_param &= ~LPF_DRYRUN;
2084                         lfsck->li_drop_dryrun = 1;
2085                         dirty = true;
2086                 }
2087         }
2088
2089         if (bk->lb_param & LPF_ALL_TGT &&
2090             !(start->ls_flags & LPF_ALL_TGT)) {
2091                 bk->lb_param &= ~LPF_ALL_TGT;
2092                 dirty = true;
2093         } else if (!(bk->lb_param & LPF_ALL_TGT) &&
2094                    start->ls_flags & LPF_ALL_TGT) {
2095                 bk->lb_param |= LPF_ALL_TGT;
2096                 dirty = true;
2097         }
2098
2099         if (bk->lb_param & LPF_ORPHAN &&
2100             !(start->ls_flags & LPF_ORPHAN)) {
2101                 bk->lb_param &= ~LPF_ORPHAN;
2102                 dirty = true;
2103         } else if (!(bk->lb_param & LPF_ORPHAN) &&
2104                    start->ls_flags & LPF_ORPHAN) {
2105                 bk->lb_param |= LPF_ORPHAN;
2106                 dirty = true;
2107         }
2108
2109         if (dirty) {
2110                 rc = lfsck_bookmark_store(env, lfsck);
2111                 if (rc != 0)
2112                         GOTO(out, rc);
2113         }
2114
2115         if (start->ls_flags & LPF_RESET)
2116                 flags |= DOIF_RESET;
2117
2118         if (start->ls_active != 0) {
2119                 struct lfsck_component *next;
2120
2121                 if (start->ls_active == LFSCK_TYPES_ALL)
2122                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2123
2124                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2125                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2126                         GOTO(out, rc = -ENOTSUPP);
2127                 }
2128
2129                 cfs_list_for_each_entry_safe(com, next,
2130                                              &lfsck->li_list_scan, lc_link) {
2131                         if (!(com->lc_type & start->ls_active)) {
2132                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2133                                                              false);
2134                                 if (rc != 0)
2135                                         GOTO(out, rc);
2136                         }
2137                 }
2138
2139                 while (start->ls_active != 0) {
2140                         if (type & start->ls_active) {
2141                                 com = __lfsck_component_find(lfsck, type,
2142                                                         &lfsck->li_list_idle);
2143                                 if (com != NULL) {
2144                                         /* The component status will be updated
2145                                          * when its prep() is called later by
2146                                          * the LFSCK main engine. */
2147                                         cfs_list_del_init(&com->lc_link);
2148                                         cfs_list_add_tail(&com->lc_link,
2149                                                           &lfsck->li_list_scan);
2150                                 }
2151                                 start->ls_active &= ~type;
2152                         }
2153                         type <<= 1;
2154                 }
2155         }
2156
2157         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2158                 start->ls_active |= com->lc_type;
2159                 if (flags & DOIF_RESET) {
2160                         rc = com->lc_ops->lfsck_reset(env, com, false);
2161                         if (rc != 0)
2162                                 GOTO(out, rc);
2163                 }
2164         }
2165
2166 trigger:
2167         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
2168         if (bk->lb_param & LPF_DRYRUN) {
2169                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2170                 valid |= DOIV_DRYRUN;
2171                 flags |= DOIF_DRYRUN;
2172         }
2173
2174         if (bk->lb_param & LPF_FAILOUT) {
2175                 valid |= DOIV_ERROR_HANDLE;
2176                 flags |= DOIF_FAILOUT;
2177         }
2178
2179         if (!cfs_list_empty(&lfsck->li_list_scan))
2180                 flags |= DOIF_OUTUSED;
2181
2182         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2183         thread_set_flags(thread, 0);
2184         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2185         if (IS_ERR(lta))
2186                 GOTO(out, rc = PTR_ERR(lta));
2187
2188         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
2189         if (IS_ERR_VALUE(rc)) {
2190                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
2191                        lfsck_lfsck2name(lfsck), rc);
2192                 lfsck_thread_args_fini(lta);
2193
2194                 GOTO(out, rc);
2195         }
2196
2197         l_wait_event(thread->t_ctl_waitq,
2198                      thread_is_running(thread) ||
2199                      thread_is_stopped(thread),
2200                      &lwi);
2201         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2202                 lfsck->li_start_unplug = 1;
2203                 wake_up_all(&thread->t_ctl_waitq);
2204
2205                 GOTO(out, rc = 0);
2206         }
2207
2208         /* release lfsck::li_mutex to avoid deadlock. */
2209         mutex_unlock(&lfsck->li_mutex);
2210         rc = lfsck_start_all(env, lfsck, start);
2211         if (rc != 0) {
2212                 spin_lock(&lfsck->li_lock);
2213                 if (thread_is_stopped(thread)) {
2214                         spin_unlock(&lfsck->li_lock);
2215                 } else {
2216                         lfsck->li_status = LS_FAILED;
2217                         lfsck->li_flags = 0;
2218                         thread_set_flags(thread, SVC_STOPPING);
2219                         spin_unlock(&lfsck->li_lock);
2220
2221                         lfsck->li_start_unplug = 1;
2222                         wake_up_all(&thread->t_ctl_waitq);
2223                         l_wait_event(thread->t_ctl_waitq,
2224                                      thread_is_stopped(thread),
2225                                      &lwi);
2226                 }
2227         } else {
2228                 lfsck->li_start_unplug = 1;
2229                 wake_up_all(&thread->t_ctl_waitq);
2230         }
2231
2232         GOTO(put, rc);
2233
2234 out:
2235         mutex_unlock(&lfsck->li_mutex);
2236
2237 put:
2238         lfsck_instance_put(env, lfsck);
2239
2240         return rc < 0 ? rc : 0;
2241 }
2242 EXPORT_SYMBOL(lfsck_start);
2243
2244 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2245                struct lfsck_stop *stop)
2246 {
2247         struct lfsck_instance   *lfsck;
2248         struct ptlrpc_thread    *thread;
2249         struct l_wait_info       lwi    = { 0 };
2250         int                      rc     = 0;
2251         int                      rc1    = 0;
2252         ENTRY;
2253
2254         lfsck = lfsck_instance_find(key, true, false);
2255         if (unlikely(lfsck == NULL))
2256                 RETURN(-ENODEV);
2257
2258         thread = &lfsck->li_thread;
2259         /* release lfsck::li_mutex to avoid deadlock. */
2260         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2261                 if (!lfsck->li_master) {
2262                         CERROR("%s: only allow to specify '-A' via MDS\n",
2263                                lfsck_lfsck2name(lfsck));
2264
2265                         GOTO(out, rc = -EPERM);
2266                 }
2267
2268                 rc1 = lfsck_stop_all(env, lfsck, stop);
2269         }
2270
2271         mutex_lock(&lfsck->li_mutex);
2272         spin_lock(&lfsck->li_lock);
2273         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2274                 spin_unlock(&lfsck->li_lock);
2275                 GOTO(out, rc = -EALREADY);
2276         }
2277
2278         if (stop != NULL) {
2279                 lfsck->li_status = stop->ls_status;
2280                 lfsck->li_flags = stop->ls_flags;
2281         } else {
2282                 lfsck->li_status = LS_STOPPED;
2283                 lfsck->li_flags = 0;
2284         }
2285
2286         thread_set_flags(thread, SVC_STOPPING);
2287         spin_unlock(&lfsck->li_lock);
2288
2289         wake_up_all(&thread->t_ctl_waitq);
2290         l_wait_event(thread->t_ctl_waitq,
2291                      thread_is_stopped(thread),
2292                      &lwi);
2293
2294         GOTO(out, rc = 0);
2295
2296 out:
2297         mutex_unlock(&lfsck->li_mutex);
2298         lfsck_instance_put(env, lfsck);
2299
2300         return rc != 0 ? rc : rc1;
2301 }
2302 EXPORT_SYMBOL(lfsck_stop);
2303
2304 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2305                     struct lfsck_request *lr)
2306 {
2307         int rc = -EOPNOTSUPP;
2308         ENTRY;
2309
2310         switch (lr->lr_event) {
2311         case LE_START: {
2312                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2313                 struct lfsck_start_param  lsp;
2314
2315                 memset(start, 0, sizeof(*start));
2316                 start->ls_valid = lr->lr_valid;
2317                 start->ls_speed_limit = lr->lr_speed;
2318                 start->ls_version = lr->lr_version;
2319                 start->ls_active = lr->lr_active;
2320                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2321                 start->ls_async_windows = lr->lr_async_windows;
2322
2323                 lsp.lsp_start = start;
2324                 lsp.lsp_index = lr->lr_index;
2325                 lsp.lsp_index_valid = 1;
2326                 rc = lfsck_start(env, key, &lsp);
2327                 break;
2328         }
2329         case LE_STOP: {
2330                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2331
2332                 memset(stop, 0, sizeof(*stop));
2333                 stop->ls_status = lr->lr_status;
2334                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2335                 rc = lfsck_stop(env, key, stop);
2336                 break;
2337         }
2338         case LE_PHASE1_DONE:
2339         case LE_PHASE2_DONE:
2340         case LE_FID_ACCESSED:
2341         case LE_PEER_EXIT: {
2342                 struct lfsck_instance  *lfsck;
2343                 struct lfsck_component *com;
2344
2345                 lfsck = lfsck_instance_find(key, true, false);
2346                 if (unlikely(lfsck == NULL))
2347                         RETURN(-ENODEV);
2348
2349                 com = lfsck_component_find(lfsck, lr->lr_active);
2350                 if (likely(com != NULL)) {
2351                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2352                         lfsck_component_put(env, com);
2353                 }
2354
2355                 lfsck_instance_put(env, lfsck);
2356                 break;
2357         }
2358         default:
2359                 break;
2360         }
2361
2362         RETURN(rc);
2363 }
2364 EXPORT_SYMBOL(lfsck_in_notify);
2365
2366 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2367                 struct lfsck_request *lr)
2368 {
2369         struct lfsck_instance  *lfsck;
2370         struct lfsck_component *com;
2371         int                     rc;
2372         ENTRY;
2373
2374         lfsck = lfsck_instance_find(key, true, false);
2375         if (unlikely(lfsck == NULL))
2376                 RETURN(-ENODEV);
2377
2378         com = lfsck_component_find(lfsck, lr->lr_active);
2379         if (likely(com != NULL)) {
2380                 rc = com->lc_ops->lfsck_query(env, com);
2381                 lfsck_component_put(env, com);
2382         } else {
2383                 rc = -ENOTSUPP;
2384         }
2385
2386         lfsck_instance_put(env, lfsck);
2387
2388         RETURN(rc);
2389 }
2390 EXPORT_SYMBOL(lfsck_query);
2391
2392 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2393                              struct ldlm_namespace *ns)
2394 {
2395         struct lfsck_instance  *lfsck;
2396         int                     rc      = -ENODEV;
2397
2398         lfsck = lfsck_instance_find(key, true, false);
2399         if (likely(lfsck != NULL)) {
2400                 lfsck->li_namespace = ns;
2401                 lfsck_instance_put(env, lfsck);
2402                 rc = 0;
2403         }
2404
2405         return rc;
2406 }
2407 EXPORT_SYMBOL(lfsck_register_namespace);
2408
2409 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2410                    struct dt_device *next, struct obd_device *obd,
2411                    lfsck_out_notify notify, void *notify_data, bool master)
2412 {
2413         struct lfsck_instance   *lfsck;
2414         struct dt_object        *root  = NULL;
2415         struct dt_object        *obj;
2416         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2417         int                      rc;
2418         ENTRY;
2419
2420         lfsck = lfsck_instance_find(key, false, false);
2421         if (unlikely(lfsck != NULL))
2422                 RETURN(-EEXIST);
2423
2424         OBD_ALLOC_PTR(lfsck);
2425         if (lfsck == NULL)
2426                 RETURN(-ENOMEM);
2427
2428         mutex_init(&lfsck->li_mutex);
2429         spin_lock_init(&lfsck->li_lock);
2430         CFS_INIT_LIST_HEAD(&lfsck->li_link);
2431         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
2432         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
2433         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2434         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
2435         atomic_set(&lfsck->li_ref, 1);
2436         atomic_set(&lfsck->li_double_scan_count, 0);
2437         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2438         lfsck->li_out_notify = notify;
2439         lfsck->li_out_notify_data = notify_data;
2440         lfsck->li_next = next;
2441         lfsck->li_bottom = key;
2442         lfsck->li_obd = obd;
2443
2444         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2445         if (rc != 0)
2446                 GOTO(out, rc);
2447
2448         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2449         if (rc != 0)
2450                 GOTO(out, rc);
2451
2452         fid->f_seq = FID_SEQ_LOCAL_NAME;
2453         fid->f_oid = 1;
2454         fid->f_ver = 0;
2455         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
2456         if (rc != 0)
2457                 GOTO(out, rc);
2458
2459         rc = dt_root_get(env, key, fid);
2460         if (rc != 0)
2461                 GOTO(out, rc);
2462
2463         root = dt_locate(env, lfsck->li_bottom, fid);
2464         if (IS_ERR(root))
2465                 GOTO(out, rc = PTR_ERR(root));
2466
2467         if (unlikely(!dt_try_as_dir(env, root)))
2468                 GOTO(out, rc = -ENOTDIR);
2469
2470         lfsck->li_local_root_fid = *fid;
2471         if (master) {
2472                 lfsck->li_master = 1;
2473                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
2474                         rc = dt_lookup(env, root,
2475                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2476                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2477                         if (rc != 0)
2478                                 GOTO(out, rc);
2479                 }
2480         }
2481
2482         fid->f_seq = FID_SEQ_LOCAL_FILE;
2483         fid->f_oid = OTABLE_IT_OID;
2484         fid->f_ver = 0;
2485         obj = dt_locate(env, lfsck->li_bottom, fid);
2486         if (IS_ERR(obj))
2487                 GOTO(out, rc = PTR_ERR(obj));
2488
2489         lfsck->li_obj_oit = obj;
2490         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2491         if (rc != 0) {
2492                 if (rc == -ENOTSUPP)
2493                         GOTO(add, rc = 0);
2494
2495                 GOTO(out, rc);
2496         }
2497
2498         rc = lfsck_bookmark_setup(env, lfsck);
2499         if (rc != 0)
2500                 GOTO(out, rc);
2501
2502         if (master) {
2503                 rc = lfsck_fid_init(lfsck);
2504                 if (rc < 0)
2505                         GOTO(out, rc);
2506
2507                 rc = lfsck_namespace_setup(env, lfsck);
2508                 if (rc < 0)
2509                         GOTO(out, rc);
2510         }
2511
2512         rc = lfsck_layout_setup(env, lfsck);
2513         if (rc < 0)
2514                 GOTO(out, rc);
2515
2516         /* XXX: more LFSCK components initialization to be added here. */
2517
2518 add:
2519         rc = lfsck_instance_add(lfsck);
2520         if (rc == 0)
2521                 rc = lfsck_add_target_from_orphan(env, lfsck);
2522 out:
2523         if (root != NULL && !IS_ERR(root))
2524                 lu_object_put(env, &root->do_lu);
2525         if (rc != 0)
2526                 lfsck_instance_cleanup(env, lfsck);
2527         return rc;
2528 }
2529 EXPORT_SYMBOL(lfsck_register);
2530
2531 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2532 {
2533         struct lfsck_instance *lfsck;
2534
2535         lfsck = lfsck_instance_find(key, false, true);
2536         if (lfsck != NULL)
2537                 lfsck_instance_put(env, lfsck);
2538 }
2539 EXPORT_SYMBOL(lfsck_degister);
2540
2541 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2542                      struct dt_device *tgt, struct obd_export *exp,
2543                      __u32 index, bool for_ost)
2544 {
2545         struct lfsck_instance   *lfsck;
2546         struct lfsck_tgt_desc   *ltd;
2547         int                      rc;
2548         ENTRY;
2549
2550         OBD_ALLOC_PTR(ltd);
2551         if (ltd == NULL)
2552                 RETURN(-ENOMEM);
2553
2554         ltd->ltd_tgt = tgt;
2555         ltd->ltd_key = key;
2556         ltd->ltd_exp = exp;
2557         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2558         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2559         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2560         atomic_set(&ltd->ltd_ref, 1);
2561         ltd->ltd_index = index;
2562
2563         spin_lock(&lfsck_instance_lock);
2564         lfsck = __lfsck_instance_find(key, true, false);
2565         if (lfsck == NULL) {
2566                 if (for_ost)
2567                         list_add_tail(&ltd->ltd_orphan_list,
2568                                       &lfsck_ost_orphan_list);
2569                 else
2570                         list_add_tail(&ltd->ltd_orphan_list,
2571                                       &lfsck_mdt_orphan_list);
2572                 spin_unlock(&lfsck_instance_lock);
2573
2574                 RETURN(0);
2575         }
2576         spin_unlock(&lfsck_instance_lock);
2577
2578         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2579         if (rc != 0)
2580                 lfsck_tgt_put(ltd);
2581
2582         lfsck_instance_put(env, lfsck);
2583
2584         RETURN(rc);
2585 }
2586 EXPORT_SYMBOL(lfsck_add_target);
2587
2588 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2589                       struct dt_device *tgt, __u32 index, bool for_ost)
2590 {
2591         struct lfsck_instance   *lfsck;
2592         struct lfsck_tgt_descs  *ltds;
2593         struct lfsck_tgt_desc   *ltd    = NULL;
2594         struct list_head        *head;
2595
2596         if (for_ost)
2597                 head = &lfsck_ost_orphan_list;
2598         else
2599                 head = &lfsck_mdt_orphan_list;
2600
2601         spin_lock(&lfsck_instance_lock);
2602         list_for_each_entry(ltd, head, ltd_orphan_list) {
2603                 if (ltd->ltd_tgt == tgt) {
2604                         list_del_init(&ltd->ltd_orphan_list);
2605                         spin_unlock(&lfsck_instance_lock);
2606                         lfsck_tgt_put(ltd);
2607
2608                         return;
2609                 }
2610         }
2611
2612         lfsck = __lfsck_instance_find(key, true, false);
2613         spin_unlock(&lfsck_instance_lock);
2614         if (unlikely(lfsck == NULL))
2615                 return;
2616
2617         if (for_ost)
2618                 ltds = &lfsck->li_ost_descs;
2619         else
2620                 ltds = &lfsck->li_mdt_descs;
2621
2622         down_write(&ltds->ltd_rw_sem);
2623         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2624
2625         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2626                 goto unlock;
2627
2628         ltd = LTD_TGT(ltds, index);
2629         if (unlikely(ltd == NULL))
2630                 goto unlock;
2631
2632         LASSERT(ltds->ltd_tgtnr > 0);
2633
2634         ltds->ltd_tgtnr--;
2635         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2636         LTD_TGT(ltds, index) = NULL;
2637
2638 unlock:
2639         if (ltd == NULL) {
2640                 if (for_ost)
2641                         head = &lfsck->li_ost_descs.ltd_orphan;
2642                 else
2643                         head = &lfsck->li_ost_descs.ltd_orphan;
2644
2645                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2646                         if (ltd->ltd_tgt == tgt) {
2647                                 list_del_init(&ltd->ltd_orphan_list);
2648                                 break;
2649                         }
2650                 }
2651         }
2652
2653         up_write(&ltds->ltd_rw_sem);
2654         if (ltd != NULL) {
2655                 spin_lock(&ltds->ltd_lock);
2656                 ltd->ltd_dead = 1;
2657                 spin_unlock(&ltds->ltd_lock);
2658                 lfsck_stop_notify(env, lfsck, ltds, ltd, LT_LAYOUT);
2659                 lfsck_tgt_put(ltd);
2660         }
2661
2662         lfsck_instance_put(env, lfsck);
2663 }
2664 EXPORT_SYMBOL(lfsck_del_target);
2665
2666 static int __init lfsck_init(void)
2667 {
2668         int rc;
2669
2670         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2671         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2672         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2673         rc = lu_context_key_register(&lfsck_thread_key);
2674         if (rc == 0) {
2675                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2676                 tgt_register_lfsck_query(lfsck_query);
2677         }
2678
2679         return rc;
2680 }
2681
2682 static void __exit lfsck_exit(void)
2683 {
2684         struct lfsck_tgt_desc *ltd;
2685         struct lfsck_tgt_desc *next;
2686
2687         LASSERT(cfs_list_empty(&lfsck_instance_list));
2688
2689         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2690                                  ltd_orphan_list) {
2691                 list_del_init(&ltd->ltd_orphan_list);
2692                 lfsck_tgt_put(ltd);
2693         }
2694
2695         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2696                                  ltd_orphan_list) {
2697                 list_del_init(&ltd->ltd_orphan_list);
2698                 lfsck_tgt_put(ltd);
2699         }
2700
2701         lu_context_key_degister(&lfsck_thread_key);
2702 }
2703
2704 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2705 MODULE_DESCRIPTION("LFSCK");
2706 MODULE_LICENSE("GPL");
2707
2708 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);