Whamcloud - gitweb
LU-4879 lfsck: avoid check exp_connect_flags before connected
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static CFS_LIST_HEAD(lfsck_instance_list);
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         "broadcast",
96         "orphan",
97         NULL
98 };
99
100 const char *lfsck_status2names(enum lfsck_status status)
101 {
102         if (unlikely(status < 0 || status >= LS_MAX))
103                 return "unknown";
104
105         return lfsck_status_names[status];
106 }
107
108 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
109 {
110         spin_lock_init(&ltds->ltd_lock);
111         init_rwsem(&ltds->ltd_rw_sem);
112         INIT_LIST_HEAD(&ltds->ltd_orphan);
113         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
114         if (ltds->ltd_tgts_bitmap == NULL)
115                 return -ENOMEM;
116
117         return 0;
118 }
119
120 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
121 {
122         struct lfsck_tgt_desc   *ltd;
123         struct lfsck_tgt_desc   *next;
124         int                      idx;
125
126         down_write(&ltds->ltd_rw_sem);
127
128         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
129                                  ltd_orphan_list) {
130                 list_del_init(&ltd->ltd_orphan_list);
131                 lfsck_tgt_put(ltd);
132         }
133
134         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
135                 up_write(&ltds->ltd_rw_sem);
136
137                 return;
138         }
139
140         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
141                 ltd = LTD_TGT(ltds, idx);
142                 if (likely(ltd != NULL)) {
143                         LASSERT(list_empty(&ltd->ltd_layout_list));
144                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
145
146                         ltds->ltd_tgtnr--;
147                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
148                         LTD_TGT(ltds, idx) = NULL;
149                         lfsck_tgt_put(ltd);
150                 }
151         }
152
153         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
154                  ltds->ltd_tgtnr);
155
156         for (idx = 0; idx < TGT_PTRS; idx++) {
157                 if (ltds->ltd_tgts_idx[idx] != NULL) {
158                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
159                         ltds->ltd_tgts_idx[idx] = NULL;
160                 }
161         }
162
163         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
164         ltds->ltd_tgts_bitmap = NULL;
165         up_write(&ltds->ltd_rw_sem);
166 }
167
168 static int __lfsck_add_target(const struct lu_env *env,
169                               struct lfsck_instance *lfsck,
170                               struct lfsck_tgt_desc *ltd,
171                               bool for_ost, bool locked)
172 {
173         struct lfsck_tgt_descs *ltds;
174         __u32                   index = ltd->ltd_index;
175         int                     rc    = 0;
176         ENTRY;
177
178         if (for_ost)
179                 ltds = &lfsck->li_ost_descs;
180         else
181                 ltds = &lfsck->li_mdt_descs;
182
183         if (!locked)
184                 down_write(&ltds->ltd_rw_sem);
185
186         LASSERT(ltds->ltd_tgts_bitmap != NULL);
187
188         if (index >= ltds->ltd_tgts_bitmap->size) {
189                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
190                                     (__u32)BITS_PER_LONG);
191                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
192                 cfs_bitmap_t *new_bitmap;
193
194                 while (newsize < index + 1)
195                         newsize <<= 1;
196
197                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
198                 if (new_bitmap == NULL)
199                         GOTO(unlock, rc = -ENOMEM);
200
201                 if (ltds->ltd_tgtnr > 0)
202                         cfs_bitmap_copy(new_bitmap, old_bitmap);
203                 ltds->ltd_tgts_bitmap = new_bitmap;
204                 CFS_FREE_BITMAP(old_bitmap);
205         }
206
207         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
208                 CERROR("%s: the device %s (%u) is registered already\n",
209                        lfsck_lfsck2name(lfsck),
210                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
211                 GOTO(unlock, rc = -EEXIST);
212         }
213
214         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
215                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
216                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
217                         GOTO(unlock, rc = -ENOMEM);
218         }
219
220         LTD_TGT(ltds, index) = ltd;
221         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
222         ltds->ltd_tgtnr++;
223
224         GOTO(unlock, rc = 0);
225
226 unlock:
227         if (!locked)
228                 up_write(&ltds->ltd_rw_sem);
229
230         return rc;
231 }
232
233 static int lfsck_add_target_from_orphan(const struct lu_env *env,
234                                         struct lfsck_instance *lfsck)
235 {
236         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
237         struct lfsck_tgt_desc   *ltd;
238         struct lfsck_tgt_desc   *next;
239         struct list_head        *head    = &lfsck_ost_orphan_list;
240         int                      rc;
241         bool                     for_ost = true;
242
243 again:
244         spin_lock(&lfsck_instance_lock);
245         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
246                 if (ltd->ltd_key == lfsck->li_bottom) {
247                         list_del_init(&ltd->ltd_orphan_list);
248                         list_add_tail(&ltd->ltd_orphan_list,
249                                       &ltds->ltd_orphan);
250                 }
251         }
252         spin_unlock(&lfsck_instance_lock);
253
254         down_write(&ltds->ltd_rw_sem);
255         while (!list_empty(&ltds->ltd_orphan)) {
256                 ltd = list_entry(ltds->ltd_orphan.next,
257                                  struct lfsck_tgt_desc,
258                                  ltd_orphan_list);
259                 list_del_init(&ltd->ltd_orphan_list);
260                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
261                 /* Do not hold the semaphore for too long time. */
262                 up_write(&ltds->ltd_rw_sem);
263                 if (rc != 0)
264                         return rc;
265
266                 down_write(&ltds->ltd_rw_sem);
267         }
268         up_write(&ltds->ltd_rw_sem);
269
270         if (for_ost) {
271                 ltds = &lfsck->li_mdt_descs;
272                 head = &lfsck_mdt_orphan_list;
273                 for_ost = false;
274                 goto again;
275         }
276
277         return 0;
278 }
279
280 static inline struct lfsck_component *
281 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
282 {
283         struct lfsck_component *com;
284
285         cfs_list_for_each_entry(com, list, lc_link) {
286                 if (com->lc_type == type)
287                         return com;
288         }
289         return NULL;
290 }
291
292 struct lfsck_component *
293 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
294 {
295         struct lfsck_component *com;
296
297         spin_lock(&lfsck->li_lock);
298         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
299         if (com != NULL)
300                 goto unlock;
301
302         com = __lfsck_component_find(lfsck, type,
303                                      &lfsck->li_list_double_scan);
304         if (com != NULL)
305                 goto unlock;
306
307         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
308
309 unlock:
310         if (com != NULL)
311                 lfsck_component_get(com);
312         spin_unlock(&lfsck->li_lock);
313         return com;
314 }
315
316 void lfsck_component_cleanup(const struct lu_env *env,
317                              struct lfsck_component *com)
318 {
319         if (!cfs_list_empty(&com->lc_link))
320                 cfs_list_del_init(&com->lc_link);
321         if (!cfs_list_empty(&com->lc_link_dir))
322                 cfs_list_del_init(&com->lc_link_dir);
323
324         lfsck_component_put(env, com);
325 }
326
327 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
328                     struct lu_fid *fid, bool locked)
329 {
330         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
331         int                      rc = 0;
332         ENTRY;
333
334         if (!locked)
335                 mutex_lock(&lfsck->li_mutex);
336
337         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
338         if (rc >= 0) {
339                 bk->lb_last_fid = *fid;
340                 /* We do not care about whether the subsequent sub-operations
341                  * failed or not. The worst case is that one FID is lost that
342                  * is not a big issue for the LFSCK since it is relative rare
343                  * for LFSCK create. */
344                 rc = lfsck_bookmark_store(env, lfsck);
345         }
346
347         if (!locked)
348                 mutex_unlock(&lfsck->li_mutex);
349
350         RETURN(rc);
351 }
352
353 static const char dot[] = ".";
354 static const char dotdot[] = "..";
355
356 static int lfsck_create_lpf_local(const struct lu_env *env,
357                                   struct lfsck_instance *lfsck,
358                                   struct dt_object *parent,
359                                   struct dt_object *child,
360                                   struct lu_attr *la,
361                                   struct dt_object_format *dof,
362                                   const char *name)
363 {
364         struct dt_device        *dev    = lfsck->li_bottom;
365         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
366         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
367         const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
368         struct thandle          *th     = NULL;
369         loff_t                   pos    = 0;
370         int                      len    = sizeof(struct lfsck_bookmark);
371         int                      rc     = 0;
372         ENTRY;
373
374         th = dt_trans_create(env, dev);
375         if (IS_ERR(th))
376                 RETURN(PTR_ERR(th));
377
378         /* 1a. create child */
379         rc = dt_declare_create(env, child, la, NULL, dof, th);
380         if (rc != 0)
381                 GOTO(stop, rc);
382
383         /* 2a. increase child nlink */
384         rc = dt_declare_ref_add(env, child, th);
385         if (rc != 0)
386                 GOTO(stop, rc);
387
388         /* 3a. insert name into parent dir */
389         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
390                                (const struct dt_key *)name, th);
391         if (rc != 0)
392                 GOTO(stop, rc);
393
394         /* 4a. increase parent nlink */
395         rc = dt_declare_ref_add(env, parent, th);
396         if (rc != 0)
397                 GOTO(stop, rc);
398
399         /* 5a. update bookmark */
400         rc = dt_declare_record_write(env, bk_obj,
401                                      lfsck_buf_get(env, bk, len), 0, th);
402         if (rc != 0)
403                 GOTO(stop, rc);
404
405         rc = dt_trans_start_local(env, dev, th);
406         if (rc != 0)
407                 GOTO(stop, rc);
408
409         dt_write_lock(env, child, 0);
410         /* 1b.1 create child */
411         rc = dt_create(env, child, la, NULL, dof, th);
412         if (rc != 0)
413                 GOTO(unlock, rc);
414
415         if (unlikely(!dt_try_as_dir(env, child)))
416                 GOTO(unlock, rc = -ENOTDIR);
417
418         /* 1b.2 insert dot into child dir */
419         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
420                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
421         if (rc != 0)
422                 GOTO(unlock, rc);
423
424         /* 1b.3 insert dotdot into child dir */
425         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
426                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
427         if (rc != 0)
428                 GOTO(unlock, rc);
429
430         /* 2b. increase child nlink */
431         rc = dt_ref_add(env, child, th);
432         dt_write_unlock(env, child);
433         if (rc != 0)
434                 GOTO(stop, rc);
435
436         /* 3b. insert name into parent dir */
437         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
438                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
439         if (rc != 0)
440                 GOTO(stop, rc);
441
442         dt_write_lock(env, parent, 0);
443         /* 4b. increase parent nlink */
444         rc = dt_ref_add(env, parent, th);
445         dt_write_unlock(env, parent);
446         if (rc != 0)
447                 GOTO(stop, rc);
448
449         bk->lb_lpf_fid = *cfid;
450         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
451
452         /* 5b. update bookmark */
453         rc = dt_record_write(env, bk_obj,
454                              lfsck_buf_get(env, bk, len), &pos, th);
455
456         GOTO(stop, rc);
457
458 unlock:
459         dt_write_unlock(env, child);
460
461 stop:
462         dt_trans_stop(env, dev, th);
463
464         return rc;
465 }
466
467 static int lfsck_create_lpf_remote(const struct lu_env *env,
468                                    struct lfsck_instance *lfsck,
469                                    struct dt_object *parent,
470                                    struct dt_object *child,
471                                    struct lu_attr *la,
472                                    struct dt_object_format *dof,
473                                    const char *name)
474 {
475         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
476         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
477         const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
478         struct thandle          *th     = NULL;
479         struct dt_device        *dev;
480         loff_t                   pos    = 0;
481         int                      len    = sizeof(struct lfsck_bookmark);
482         int                      rc     = 0;
483         ENTRY;
484
485         /* Create .lustre/lost+found/MDTxxxx. */
486
487         /* XXX: Currently, cross-MDT create operation needs to create the child
488          *      object firstly, then insert name into the parent directory. For
489          *      this case, the child object resides on current MDT (local), but
490          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
491          *      easy to contain all the sub-modifications orderly within single
492          *      transaction.
493          *
494          *      To avoid more inconsistency, we split the create operation into
495          *      two transactions:
496          *
497          *      1) create the child locally.
498          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
499          *         remotely and update the lfsck_bookmark::lb_lpf_fid locally.
500          *
501          *      If 1) done but 2) failed, then the worst case is that we lose
502          *      one object locally, which is not a big issue. (can be repaird
503          *      by LFSCK phase III) */
504
505         /* Transaction I: */
506
507         dev = lfsck->li_bottom;
508         th = dt_trans_create(env, dev);
509         if (IS_ERR(th))
510                 RETURN(PTR_ERR(th));
511
512         /* 1a. create child locally. */
513         rc = dt_declare_create(env, child, la, NULL, dof, th);
514         if (rc != 0)
515                 GOTO(stop, rc);
516
517         /* 2a. increase child nlink locally. */
518         rc = dt_declare_ref_add(env, child, th);
519         if (rc != 0)
520                 GOTO(stop, rc);
521
522         rc = dt_trans_start_local(env, dev, th);
523         if (rc != 0)
524                 GOTO(stop, rc);
525
526         dt_write_lock(env, child, 0);
527         /* 1b. create child locally. */
528         rc = dt_create(env, child, la, NULL, dof, th);
529         if (rc != 0)
530                 GOTO(unlock, rc);
531
532         if (unlikely(!dt_try_as_dir(env, child)))
533                 GOTO(unlock, rc = -ENOTDIR);
534
535         /* 2b.1 insert dot into child dir locally. */
536         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
537                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
538         if (rc != 0)
539                 GOTO(unlock, rc);
540
541         /* 2b.2 insert dotdot into child dir locally. */
542         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
543                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
544         if (rc != 0)
545                 GOTO(unlock, rc);
546
547         /* 2b.3 increase child nlink locally. */
548         rc = dt_ref_add(env, child, th);
549         dt_write_unlock(env, child);
550         dt_trans_stop(env, dev, th);
551         if (rc != 0)
552                 RETURN(rc);
553
554         /* Transaction II: */
555
556         dev = lfsck->li_next;
557         th = dt_trans_create(env, dev);
558         if (IS_ERR(th))
559                 RETURN(PTR_ERR(th));
560
561         /* 3a. insert name into parent dir remotely. */
562         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
563                                (const struct dt_key *)name, th);
564         if (rc != 0)
565                 GOTO(stop, rc);
566
567         /* 4a. increase parent nlink remotely. */
568         rc = dt_declare_ref_add(env, parent, th);
569         if (rc != 0)
570                 GOTO(stop, rc);
571
572         /* 5a. decrease child nlink for dotdot locally if former remote
573          *     update failed. */
574         rc = dt_declare_ref_del(env, child, th);
575         if (rc != 0)
576                 GOTO(stop, rc);
577
578         /* 6a. decrease child nlink for dot locally if former remote
579          *     update failed. */
580         rc = dt_declare_ref_del(env, child, th);
581         if (rc != 0)
582                 GOTO(stop, rc);
583
584         /* 7a. destroy child locally if former remote update failed. */
585         rc = dt_declare_destroy(env, child, th);
586         if (rc != 0)
587                 GOTO(stop, rc);
588
589         /* 8a. update bookmark locally. */
590         rc = dt_declare_record_write(env, bk_obj,
591                                      lfsck_buf_get(env, bk, len), 0, th);
592         if (rc != 0)
593                 GOTO(stop, rc);
594
595         rc = dt_trans_start(env, dev, th);
596         if (rc != 0)
597                 GOTO(stop, rc);
598
599         /* 3b. insert name into parent dir remotely. */
600         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
601                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
602         if (rc == 0) {
603                 dt_write_lock(env, parent, 0);
604                 /* 4b. increase parent nlink remotely. */
605                 rc = dt_ref_add(env, parent, th);
606                 dt_write_unlock(env, parent);
607         }
608         if (rc != 0) {
609                 /* 5b. decrease child nlink for dotdot locally. */
610                 dt_ref_del(env, child, th);
611                 /* 6b. decrease child nlink for dot locally. */
612                 dt_ref_del(env, child, th);
613                 /* 7b. destroy child locally. */
614                 dt_destroy(env, child, th);
615                 GOTO(stop, rc);
616         }
617
618         bk->lb_lpf_fid = *cfid;
619         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
620
621         /* 8b. update bookmark locally. */
622         rc = dt_record_write(env, bk_obj,
623                              lfsck_buf_get(env, bk, len), &pos, th);
624
625         GOTO(stop, rc);
626
627 unlock:
628         dt_write_unlock(env, child);
629 stop:
630         dt_trans_stop(env, dev, th);
631
632         return rc;
633 }
634
635 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
636  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
637  * only when it is required, such as orphan OST-objects repairing. */
638 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
639 {
640         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
641         struct lfsck_thread_info *info  = lfsck_env_info(env);
642         struct lu_fid            *cfid  = &info->lti_fid2;
643         struct lu_attr           *la    = &info->lti_la;
644         struct dt_object_format  *dof   = &info->lti_dof;
645         struct dt_object         *parent = NULL;
646         struct dt_object         *child = NULL;
647         char                      name[8];
648         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
649         int                       rc    = 0;
650         ENTRY;
651
652         LASSERT(lfsck->li_master);
653
654         sprintf(name, "MDT%04x", node);
655         if (node == 0) {
656                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
657                                                   &LU_LPF_FID);
658         } else {
659                 struct lfsck_tgt_desc *ltd;
660
661                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
662                 if (unlikely(ltd == NULL))
663                         RETURN(-ENXIO);
664
665                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
666                                                   &LU_LPF_FID);
667                 lfsck_tgt_put(ltd);
668         }
669         if (IS_ERR(parent))
670                 RETURN(PTR_ERR(parent));
671
672         if (unlikely(!dt_try_as_dir(env, parent)))
673                 GOTO(out, rc = -ENOTDIR);
674
675         mutex_lock(&lfsck->li_mutex);
676         if (lfsck->li_lpf_obj != NULL)
677                 GOTO(unlock, rc = 0);
678
679         if (fid_is_zero(&bk->lb_lpf_fid)) {
680                 /* There is corner case that: in former LFSCK scanning we have
681                  * created the .lustre/lost+found/MDTxxxx but failed to update
682                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
683                  * it from MDT0 firstly. */
684                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
685                                (const struct dt_key *)name, BYPASS_CAPA);
686                 if (rc != 0 && rc != -ENOENT)
687                         GOTO(unlock, rc);
688
689                 if (rc == 0) {
690                         bk->lb_lpf_fid = *cfid;
691                         rc = lfsck_bookmark_store(env, lfsck);
692                 } else {
693                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
694                 }
695                 if (rc != 0)
696                         GOTO(unlock, rc);
697         } else {
698                 *cfid = bk->lb_lpf_fid;
699         }
700
701         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
702         if (IS_ERR(child))
703                 GOTO(unlock, rc = PTR_ERR(child));
704
705         if (dt_object_exists(child) != 0) {
706                 if (unlikely(!dt_try_as_dir(env, child)))
707                         GOTO(unlock, rc = -ENOTDIR);
708
709                 lfsck->li_lpf_obj = child;
710                 GOTO(unlock, rc = 0);
711         }
712
713         memset(la, 0, sizeof(*la));
714         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
715         la->la_mode = S_IFDIR | S_IRWXU;
716         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
717                        LA_UID | LA_GID;
718         memset(dof, 0, sizeof(*dof));
719         dof->dof_type = dt_mode_to_dft(S_IFDIR);
720
721         if (node == 0)
722                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
723                                             dof, name);
724         else
725                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
726                                              dof, name);
727         if (rc == 0)
728                 lfsck->li_lpf_obj = child;
729
730         GOTO(unlock, rc);
731
732 unlock:
733         mutex_unlock(&lfsck->li_mutex);
734         if (rc != 0 && child != NULL && !IS_ERR(child))
735                 lu_object_put(env, &child->do_lu);
736 out:
737         if (parent != NULL && !IS_ERR(parent))
738                 lu_object_put(env, &parent->do_lu);
739
740         return rc;
741 }
742
743 static int lfsck_fid_init(struct lfsck_instance *lfsck)
744 {
745         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
746         struct seq_server_site  *ss;
747         char                    *prefix;
748         int                      rc     = 0;
749         ENTRY;
750
751         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
752         if (unlikely(ss == NULL))
753                 RETURN(-ENXIO);
754
755         OBD_ALLOC_PTR(lfsck->li_seq);
756         if (lfsck->li_seq == NULL)
757                 RETURN(-ENOMEM);
758
759         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
760         if (prefix == NULL)
761                 GOTO(out, rc = -ENOMEM);
762
763         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
764         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
765                              ss->ss_server_seq);
766         OBD_FREE(prefix, MAX_OBD_NAME + 7);
767         if (rc != 0)
768                 GOTO(out, rc);
769
770         if (fid_is_sane(&bk->lb_last_fid))
771                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
772
773         RETURN(0);
774
775 out:
776         OBD_FREE_PTR(lfsck->li_seq);
777         lfsck->li_seq = NULL;
778
779         return rc;
780 }
781
782 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
783 {
784         if (lfsck->li_seq != NULL) {
785                 seq_client_fini(lfsck->li_seq);
786                 OBD_FREE_PTR(lfsck->li_seq);
787                 lfsck->li_seq = NULL;
788         }
789 }
790
791 void lfsck_instance_cleanup(const struct lu_env *env,
792                             struct lfsck_instance *lfsck)
793 {
794         struct ptlrpc_thread    *thread = &lfsck->li_thread;
795         struct lfsck_component  *com;
796         ENTRY;
797
798         LASSERT(list_empty(&lfsck->li_link));
799         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
800
801         if (lfsck->li_obj_oit != NULL) {
802                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
803                 lfsck->li_obj_oit = NULL;
804         }
805
806         LASSERT(lfsck->li_obj_dir == NULL);
807
808         while (!cfs_list_empty(&lfsck->li_list_scan)) {
809                 com = cfs_list_entry(lfsck->li_list_scan.next,
810                                      struct lfsck_component,
811                                      lc_link);
812                 lfsck_component_cleanup(env, com);
813         }
814
815         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
816
817         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
818                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
819                                      struct lfsck_component,
820                                      lc_link);
821                 lfsck_component_cleanup(env, com);
822         }
823
824         while (!cfs_list_empty(&lfsck->li_list_idle)) {
825                 com = cfs_list_entry(lfsck->li_list_idle.next,
826                                      struct lfsck_component,
827                                      lc_link);
828                 lfsck_component_cleanup(env, com);
829         }
830
831         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
832         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
833
834         if (lfsck->li_bookmark_obj != NULL) {
835                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
836                 lfsck->li_bookmark_obj = NULL;
837         }
838
839         if (lfsck->li_lpf_obj != NULL) {
840                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
841                 lfsck->li_lpf_obj = NULL;
842         }
843
844         if (lfsck->li_los != NULL) {
845                 local_oid_storage_fini(env, lfsck->li_los);
846                 lfsck->li_los = NULL;
847         }
848
849         lfsck_fid_fini(lfsck);
850
851         OBD_FREE_PTR(lfsck);
852 }
853
854 static inline struct lfsck_instance *
855 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
856 {
857         struct lfsck_instance *lfsck;
858
859         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
860                 if (lfsck->li_bottom == key) {
861                         if (ref)
862                                 lfsck_instance_get(lfsck);
863                         if (unlink)
864                                 list_del_init(&lfsck->li_link);
865
866                         return lfsck;
867                 }
868         }
869
870         return NULL;
871 }
872
873 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
874                                            bool unlink)
875 {
876         struct lfsck_instance *lfsck;
877
878         spin_lock(&lfsck_instance_lock);
879         lfsck = __lfsck_instance_find(key, ref, unlink);
880         spin_unlock(&lfsck_instance_lock);
881
882         return lfsck;
883 }
884
885 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
886 {
887         struct lfsck_instance *tmp;
888
889         spin_lock(&lfsck_instance_lock);
890         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
891                 if (lfsck->li_bottom == tmp->li_bottom) {
892                         spin_unlock(&lfsck_instance_lock);
893                         return -EEXIST;
894                 }
895         }
896
897         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
898         spin_unlock(&lfsck_instance_lock);
899         return 0;
900 }
901
902 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
903                     const char *prefix)
904 {
905         int save = *len;
906         int flag;
907         int rc;
908         int i;
909         bool newline = (bits != 0 ? false : true);
910
911         rc = snprintf(*buf, *len, "%s:%c", prefix, newline ? '\n' : ' ');
912         if (rc <= 0)
913                 return -ENOSPC;
914
915         *buf += rc;
916         *len -= rc;
917         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
918                 if (flag & bits) {
919                         bits &= ~flag;
920                         if (names[i] != NULL) {
921                                 if (bits == 0)
922                                         newline = true;
923
924                                 rc = snprintf(*buf, *len, "%s%c", names[i],
925                                               newline ? '\n' : ',');
926                                 if (rc <= 0)
927                                         return -ENOSPC;
928
929                                 *buf += rc;
930                                 *len -= rc;
931                         }
932                 }
933         }
934
935         if (!newline) {
936                 rc = snprintf(*buf, *len, "\n");
937                 if (rc <= 0)
938                         return -ENOSPC;
939
940                 *buf += rc;
941                 *len -= rc;
942         }
943
944         return save - *len;
945 }
946
947 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
948 {
949         int rc;
950
951         if (time != 0)
952                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
953                               cfs_time_current_sec() - time);
954         else
955                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
956         if (rc <= 0)
957                 return -ENOSPC;
958
959         *buf += rc;
960         *len -= rc;
961         return rc;
962 }
963
964 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
965                    const char *prefix)
966 {
967         int rc;
968
969         if (fid_is_zero(&pos->lp_dir_parent)) {
970                 if (pos->lp_oit_cookie == 0)
971                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
972                                       prefix);
973                 else
974                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
975                                       prefix, pos->lp_oit_cookie);
976         } else {
977                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
978                               prefix, pos->lp_oit_cookie,
979                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
980         }
981         if (rc <= 0)
982                 return -ENOSPC;
983
984         *buf += rc;
985         *len -= rc;
986         return rc;
987 }
988
989 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
990                     struct lfsck_position *pos, bool init)
991 {
992         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
993
994         if (unlikely(lfsck->li_di_oit == NULL)) {
995                 memset(pos, 0, sizeof(*pos));
996                 return;
997         }
998
999         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1000         if (!lfsck->li_current_oit_processed && !init)
1001                 pos->lp_oit_cookie--;
1002
1003         LASSERT(pos->lp_oit_cookie > 0);
1004
1005         if (lfsck->li_di_dir != NULL) {
1006                 struct dt_object *dto = lfsck->li_obj_dir;
1007
1008                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1009                                                         lfsck->li_di_dir);
1010
1011                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1012                         fid_zero(&pos->lp_dir_parent);
1013                         pos->lp_dir_cookie = 0;
1014                 } else {
1015                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1016                 }
1017         } else {
1018                 fid_zero(&pos->lp_dir_parent);
1019                 pos->lp_dir_cookie = 0;
1020         }
1021 }
1022
1023 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1024 {
1025         lfsck->li_bookmark_ram.lb_speed_limit = limit;
1026         if (limit != LFSCK_SPEED_NO_LIMIT) {
1027                 if (limit > HZ) {
1028                         lfsck->li_sleep_rate = limit / HZ;
1029                         lfsck->li_sleep_jif = 1;
1030                 } else {
1031                         lfsck->li_sleep_rate = 1;
1032                         lfsck->li_sleep_jif = HZ / limit;
1033                 }
1034         } else {
1035                 lfsck->li_sleep_jif = 0;
1036                 lfsck->li_sleep_rate = 0;
1037         }
1038 }
1039
1040 void lfsck_control_speed(struct lfsck_instance *lfsck)
1041 {
1042         struct ptlrpc_thread *thread = &lfsck->li_thread;
1043         struct l_wait_info    lwi;
1044
1045         if (lfsck->li_sleep_jif > 0 &&
1046             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1047                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1048                                        LWI_ON_SIGNAL_NOOP, NULL);
1049
1050                 l_wait_event(thread->t_ctl_waitq,
1051                              !thread_is_running(thread),
1052                              &lwi);
1053                 lfsck->li_new_scanned = 0;
1054         }
1055 }
1056
1057 void lfsck_control_speed_by_self(struct lfsck_component *com)
1058 {
1059         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1060         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1061         struct l_wait_info       lwi;
1062
1063         if (lfsck->li_sleep_jif > 0 &&
1064             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1065                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1066                                        LWI_ON_SIGNAL_NOOP, NULL);
1067
1068                 l_wait_event(thread->t_ctl_waitq,
1069                              !thread_is_running(thread),
1070                              &lwi);
1071                 com->lc_new_scanned = 0;
1072         }
1073 }
1074
1075 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
1076                             struct lu_fid *fid)
1077 {
1078         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
1079                      !dt_try_as_dir(env, obj)))
1080                 return -ENOTDIR;
1081
1082         return dt_lookup(env, obj, (struct dt_rec *)fid,
1083                          (const struct dt_key *)"..", BYPASS_CAPA);
1084 }
1085
1086 static int lfsck_needs_scan_dir(const struct lu_env *env,
1087                                 struct lfsck_instance *lfsck,
1088                                 struct dt_object *obj)
1089 {
1090         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
1091         int            depth = 0;
1092         int            rc;
1093
1094         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
1095             cfs_list_empty(&lfsck->li_list_dir))
1096                RETURN(0);
1097
1098         while (1) {
1099                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
1100                  *      which is the agent directory to manage the objects
1101                  *      which name entries reside on remote MDTs. Related
1102                  *      consistency verification will be processed in LFSCK
1103                  *      phase III. */
1104                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
1105                         if (depth > 0)
1106                                 lfsck_object_put(env, obj);
1107                         return 1;
1108                 }
1109
1110                 /* .lustre doesn't contain "real" user objects, no need lfsck */
1111                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
1112                         if (depth > 0)
1113                                 lfsck_object_put(env, obj);
1114                         return 0;
1115                 }
1116
1117                 dt_read_lock(env, obj, MOR_TGT_CHILD);
1118                 if (unlikely(lfsck_is_dead_obj(obj))) {
1119                         dt_read_unlock(env, obj);
1120                         if (depth > 0)
1121                                 lfsck_object_put(env, obj);
1122                         return 0;
1123                 }
1124
1125                 rc = dt_xattr_get(env, obj,
1126                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
1127                                   BYPASS_CAPA);
1128                 dt_read_unlock(env, obj);
1129                 if (rc >= 0) {
1130                         if (depth > 0)
1131                                 lfsck_object_put(env, obj);
1132                         return 1;
1133                 }
1134
1135                 if (rc < 0 && rc != -ENODATA) {
1136                         if (depth > 0)
1137                                 lfsck_object_put(env, obj);
1138                         return rc;
1139                 }
1140
1141                 rc = lfsck_parent_fid(env, obj, fid);
1142                 if (depth > 0)
1143                         lfsck_object_put(env, obj);
1144                 if (rc != 0)
1145                         return rc;
1146
1147                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
1148                         return 0;
1149
1150                 obj = lfsck_object_find(env, lfsck, fid);
1151                 if (obj == NULL)
1152                         return 0;
1153                 else if (IS_ERR(obj))
1154                         return PTR_ERR(obj);
1155
1156                 if (!dt_object_exists(obj)) {
1157                         lfsck_object_put(env, obj);
1158                         return 0;
1159                 }
1160
1161                 /* Currently, only client visible directory can be remote. */
1162                 if (dt_object_remote(obj)) {
1163                         lfsck_object_put(env, obj);
1164                         return 1;
1165                 }
1166
1167                 depth++;
1168         }
1169         return 0;
1170 }
1171
1172 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1173                                                  struct lfsck_component *com,
1174                                                  struct lfsck_start_param *lsp)
1175 {
1176         struct lfsck_thread_args *lta;
1177         int                       rc;
1178
1179         OBD_ALLOC_PTR(lta);
1180         if (lta == NULL)
1181                 return ERR_PTR(-ENOMEM);
1182
1183         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1184         if (rc != 0) {
1185                 OBD_FREE_PTR(lta);
1186                 return ERR_PTR(rc);
1187         }
1188
1189         lta->lta_lfsck = lfsck_instance_get(lfsck);
1190         if (com != NULL)
1191                 lta->lta_com = lfsck_component_get(com);
1192
1193         lta->lta_lsp = lsp;
1194
1195         return lta;
1196 }
1197
1198 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1199 {
1200         if (lta->lta_com != NULL)
1201                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1202         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1203         lu_env_fini(&lta->lta_env);
1204         OBD_FREE_PTR(lta);
1205 }
1206
1207 /* LFSCK wrap functions */
1208
1209 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
1210                 bool new_checked)
1211 {
1212         struct lfsck_component *com;
1213
1214         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1215                 com->lc_ops->lfsck_fail(env, com, new_checked);
1216         }
1217 }
1218
1219 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
1220 {
1221         struct lfsck_component *com;
1222         int                     rc  = 0;
1223         int                     rc1 = 0;
1224
1225         if (likely(cfs_time_beforeq(cfs_time_current(),
1226                                     lfsck->li_time_next_checkpoint)))
1227                 return 0;
1228
1229         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1230         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1231                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
1232                 if (rc != 0)
1233                         rc1 = rc;
1234         }
1235
1236         lfsck->li_time_last_checkpoint = cfs_time_current();
1237         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1238                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1239         return rc1 != 0 ? rc1 : rc;
1240 }
1241
1242 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
1243                struct lfsck_start_param *lsp)
1244 {
1245         struct dt_object       *obj     = NULL;
1246         struct lfsck_component *com;
1247         struct lfsck_component *next;
1248         struct lfsck_position  *pos     = NULL;
1249         const struct dt_it_ops *iops    =
1250                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
1251         struct dt_it           *di;
1252         int                     rc;
1253         ENTRY;
1254
1255         LASSERT(lfsck->li_obj_dir == NULL);
1256         LASSERT(lfsck->li_di_dir == NULL);
1257
1258         lfsck->li_current_oit_processed = 0;
1259         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1260                 com->lc_new_checked = 0;
1261                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1262                         com->lc_journal = 0;
1263
1264                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
1265                 if (rc != 0)
1266                         GOTO(out, rc);
1267
1268                 if ((pos == NULL) ||
1269                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
1270                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
1271                         pos = &com->lc_pos_start;
1272         }
1273
1274         /* Init otable-based iterator. */
1275         if (pos == NULL) {
1276                 rc = iops->load(env, lfsck->li_di_oit, 0);
1277                 if (rc > 0) {
1278                         lfsck->li_oit_over = 1;
1279                         rc = 0;
1280                 }
1281
1282                 GOTO(out, rc);
1283         }
1284
1285         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
1286         if (rc < 0)
1287                 GOTO(out, rc);
1288         else if (rc > 0)
1289                 lfsck->li_oit_over = 1;
1290
1291         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
1292                 GOTO(out, rc = 0);
1293
1294         /* Find the directory for namespace-based traverse. */
1295         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
1296         if (obj == NULL)
1297                 GOTO(out, rc = 0);
1298         else if (IS_ERR(obj))
1299                 RETURN(PTR_ERR(obj));
1300
1301         /* XXX: Currently, skip remote object, the consistency for
1302          *      remote object will be processed in LFSCK phase III. */
1303         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
1304             unlikely(!S_ISDIR(lfsck_object_type(obj))))
1305                 GOTO(out, rc = 0);
1306
1307         if (unlikely(!dt_try_as_dir(env, obj)))
1308                 GOTO(out, rc = -ENOTDIR);
1309
1310         /* Init the namespace-based directory traverse. */
1311         iops = &obj->do_index_ops->dio_it;
1312         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1313         if (IS_ERR(di))
1314                 GOTO(out, rc = PTR_ERR(di));
1315
1316         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
1317
1318         rc = iops->load(env, di, pos->lp_dir_cookie);
1319         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
1320                 rc = iops->next(env, di);
1321         else if (rc > 0)
1322                 rc = 0;
1323
1324         if (rc != 0) {
1325                 iops->put(env, di);
1326                 iops->fini(env, di);
1327                 GOTO(out, rc);
1328         }
1329
1330         lfsck->li_obj_dir = lfsck_object_get(obj);
1331         lfsck->li_cookie_dir = iops->store(env, di);
1332         spin_lock(&lfsck->li_lock);
1333         lfsck->li_di_dir = di;
1334         spin_unlock(&lfsck->li_lock);
1335
1336         GOTO(out, rc = 0);
1337
1338 out:
1339         if (obj != NULL)
1340                 lfsck_object_put(env, obj);
1341
1342         if (rc < 0) {
1343                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1344                                              lc_link)
1345                         com->lc_ops->lfsck_post(env, com, rc, true);
1346
1347                 return rc;
1348         }
1349
1350         rc = 0;
1351         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
1352         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1353                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
1354                 if (rc != 0)
1355                         break;
1356         }
1357
1358         lfsck->li_time_last_checkpoint = cfs_time_current();
1359         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1360                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1361         return rc;
1362 }
1363
1364 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
1365                    struct dt_object *obj)
1366 {
1367         struct lfsck_component *com;
1368         const struct dt_it_ops *iops;
1369         struct dt_it           *di;
1370         int                     rc;
1371         ENTRY;
1372
1373         LASSERT(lfsck->li_obj_dir == NULL);
1374
1375         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1376                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
1377                 if (rc != 0)
1378                         RETURN(rc);
1379         }
1380
1381         rc = lfsck_needs_scan_dir(env, lfsck, obj);
1382         if (rc <= 0)
1383                 GOTO(out, rc);
1384
1385         if (unlikely(!dt_try_as_dir(env, obj)))
1386                 GOTO(out, rc = -ENOTDIR);
1387
1388         iops = &obj->do_index_ops->dio_it;
1389         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1390         if (IS_ERR(di))
1391                 GOTO(out, rc = PTR_ERR(di));
1392
1393         rc = iops->load(env, di, 0);
1394         if (rc == 0)
1395                 rc = iops->next(env, di);
1396         else if (rc > 0)
1397                 rc = 0;
1398
1399         if (rc != 0) {
1400                 iops->put(env, di);
1401                 iops->fini(env, di);
1402                 GOTO(out, rc);
1403         }
1404
1405         lfsck->li_obj_dir = lfsck_object_get(obj);
1406         lfsck->li_cookie_dir = iops->store(env, di);
1407         spin_lock(&lfsck->li_lock);
1408         lfsck->li_di_dir = di;
1409         spin_unlock(&lfsck->li_lock);
1410
1411         GOTO(out, rc = 0);
1412
1413 out:
1414         if (rc < 0)
1415                 lfsck_fail(env, lfsck, false);
1416         return (rc > 0 ? 0 : rc);
1417 }
1418
1419 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
1420                    struct dt_object *obj, struct lu_dirent *ent)
1421 {
1422         struct lfsck_component *com;
1423         int                     rc;
1424
1425         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1426                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
1427                 if (rc != 0)
1428                         return rc;
1429         }
1430         return 0;
1431 }
1432
1433 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
1434                int result)
1435 {
1436         struct lfsck_component *com;
1437         struct lfsck_component *next;
1438         int                     rc  = 0;
1439         int                     rc1 = 0;
1440
1441         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1442         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1443                 rc = com->lc_ops->lfsck_post(env, com, result, false);
1444                 if (rc != 0)
1445                         rc1 = rc;
1446         }
1447
1448         lfsck->li_time_last_checkpoint = cfs_time_current();
1449         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1450                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1451
1452         /* Ignore some component post failure to make other can go ahead. */
1453         return result;
1454 }
1455
1456 static void lfsck_interpret(const struct lu_env *env,
1457                             struct lfsck_instance *lfsck,
1458                             struct ptlrpc_request *req, void *args, int result)
1459 {
1460         struct lfsck_async_interpret_args *laia = args;
1461         struct lfsck_component            *com;
1462
1463         LASSERT(laia->laia_com == NULL);
1464         LASSERT(laia->laia_shared);
1465
1466         spin_lock(&lfsck->li_lock);
1467         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1468                 if (com->lc_ops->lfsck_interpret != NULL) {
1469                         laia->laia_com = com;
1470                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1471                 }
1472         }
1473
1474         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1475                 if (com->lc_ops->lfsck_interpret != NULL) {
1476                         laia->laia_com = com;
1477                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1478                 }
1479         }
1480         spin_unlock(&lfsck->li_lock);
1481 }
1482
1483 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
1484 {
1485         struct lfsck_component *com;
1486         struct lfsck_component *next;
1487         struct l_wait_info      lwi = { 0 };
1488         int                     rc  = 0;
1489         int                     rc1 = 0;
1490
1491         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1492                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1493                         com->lc_journal = 0;
1494
1495                 rc = com->lc_ops->lfsck_double_scan(env, com);
1496                 if (rc != 0)
1497                         rc1 = rc;
1498         }
1499
1500         l_wait_event(lfsck->li_thread.t_ctl_waitq,
1501                      atomic_read(&lfsck->li_double_scan_count) == 0,
1502                      &lwi);
1503
1504         if (lfsck->li_status != LS_PAUSED &&
1505             lfsck->li_status != LS_CO_PAUSED) {
1506                 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1507                                          lc_link) {
1508                         spin_lock(&lfsck->li_lock);
1509                         list_del_init(&com->lc_link);
1510                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1511                         spin_unlock(&lfsck->li_lock);
1512                 }
1513         }
1514
1515         return rc1 != 0 ? rc1 : rc;
1516 }
1517
1518 static int lfsck_stop_notify(const struct lu_env *env,
1519                              struct lfsck_instance *lfsck,
1520                              struct lfsck_tgt_descs *ltds,
1521                              struct lfsck_tgt_desc *ltd, __u16 type)
1522 {
1523         struct ptlrpc_request_set *set;
1524         struct lfsck_component    *com;
1525         int                        rc  = 0;
1526         ENTRY;
1527
1528         spin_lock(&lfsck->li_lock);
1529         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1530         if (com == NULL)
1531                 com = __lfsck_component_find(lfsck, type,
1532                                              &lfsck->li_list_double_scan);
1533         if (com != NULL)
1534                 lfsck_component_get(com);
1535         spin_lock(&lfsck->li_lock);
1536
1537         if (com != NULL) {
1538                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1539                         set = ptlrpc_prep_set();
1540                         if (set == NULL) {
1541                                 lfsck_component_put(env, com);
1542
1543                                 RETURN(-ENOMEM);
1544                         }
1545
1546                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1547                                                             ltd, set);
1548                         if (rc == 0)
1549                                 rc = ptlrpc_set_wait(set);
1550
1551                         ptlrpc_set_destroy(set);
1552                 }
1553
1554                 lfsck_component_put(env, com);
1555         }
1556
1557         RETURN(rc);
1558 }
1559
1560 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
1561 {
1562         struct lfsck_component *com;
1563         struct lfsck_component *next;
1564
1565         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1566                                  lc_link) {
1567                 if (com->lc_ops->lfsck_quit != NULL)
1568                         com->lc_ops->lfsck_quit(env, com);
1569
1570                 spin_lock(&lfsck->li_lock);
1571                 list_del_init(&com->lc_link);
1572                 list_del_init(&com->lc_link_dir);
1573                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1574                 spin_unlock(&lfsck->li_lock);
1575         }
1576
1577         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1578                                  lc_link) {
1579                 if (com->lc_ops->lfsck_quit != NULL)
1580                         com->lc_ops->lfsck_quit(env, com);
1581
1582                 spin_lock(&lfsck->li_lock);
1583                 list_del_init(&com->lc_link);
1584                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1585                 spin_unlock(&lfsck->li_lock);
1586         }
1587 }
1588
1589 static int lfsck_async_interpret(const struct lu_env *env,
1590                                  struct ptlrpc_request *req,
1591                                  void *args, int rc)
1592 {
1593         struct lfsck_async_interpret_args *laia = args;
1594         struct lfsck_instance             *lfsck;
1595
1596         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1597                               li_mdt_descs);
1598         lfsck_interpret(env, lfsck, req, laia, rc);
1599         lfsck_tgt_put(laia->laia_ltd);
1600         if (rc != 0 && laia->laia_result != -EALREADY)
1601                 laia->laia_result = rc;
1602
1603         return 0;
1604 }
1605
1606 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1607                         struct lfsck_request *lr,
1608                         struct ptlrpc_request_set *set,
1609                         ptlrpc_interpterer_t interpreter,
1610                         void *args, int request)
1611 {
1612         struct lfsck_async_interpret_args *laia;
1613         struct ptlrpc_request             *req;
1614         struct lfsck_request              *tmp;
1615         struct req_format                 *format;
1616         int                                rc;
1617
1618         switch (request) {
1619         case LFSCK_NOTIFY:
1620                 format = &RQF_LFSCK_NOTIFY;
1621                 break;
1622         case LFSCK_QUERY:
1623                 format = &RQF_LFSCK_QUERY;
1624                 break;
1625         default:
1626                 CERROR("%s: unknown async request: opc = %d\n",
1627                        exp->exp_obd->obd_name, request);
1628                 return -EINVAL;
1629         }
1630
1631         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1632         if (req == NULL)
1633                 return -ENOMEM;
1634
1635         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1636         if (rc != 0) {
1637                 ptlrpc_request_free(req);
1638
1639                 return rc;
1640         }
1641
1642         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1643         *tmp = *lr;
1644         ptlrpc_request_set_replen(req);
1645
1646         laia = ptlrpc_req_async_args(req);
1647         *laia = *(struct lfsck_async_interpret_args *)args;
1648         if (laia->laia_com != NULL)
1649                 lfsck_component_get(laia->laia_com);
1650         req->rq_interpret_reply = interpreter;
1651         ptlrpc_set_add_req(set, req);
1652
1653         return 0;
1654 }
1655
1656 /* external interfaces */
1657
1658 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
1659 {
1660         struct lu_env           env;
1661         struct lfsck_instance  *lfsck;
1662         int                     rc;
1663         ENTRY;
1664
1665         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1666         if (rc != 0)
1667                 RETURN(rc);
1668
1669         lfsck = lfsck_instance_find(key, true, false);
1670         if (likely(lfsck != NULL)) {
1671                 rc = snprintf(buf, len, "%u\n",
1672                               lfsck->li_bookmark_ram.lb_speed_limit);
1673                 lfsck_instance_put(&env, lfsck);
1674         } else {
1675                 rc = -ENXIO;
1676         }
1677
1678         lu_env_fini(&env);
1679
1680         RETURN(rc);
1681 }
1682 EXPORT_SYMBOL(lfsck_get_speed);
1683
1684 int lfsck_set_speed(struct dt_device *key, int val)
1685 {
1686         struct lu_env           env;
1687         struct lfsck_instance  *lfsck;
1688         int                     rc;
1689         ENTRY;
1690
1691         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1692         if (rc != 0)
1693                 RETURN(rc);
1694
1695         lfsck = lfsck_instance_find(key, true, false);
1696         if (likely(lfsck != NULL)) {
1697                 mutex_lock(&lfsck->li_mutex);
1698                 __lfsck_set_speed(lfsck, val);
1699                 rc = lfsck_bookmark_store(&env, lfsck);
1700                 mutex_unlock(&lfsck->li_mutex);
1701                 lfsck_instance_put(&env, lfsck);
1702         } else {
1703                 rc = -ENXIO;
1704         }
1705
1706         lu_env_fini(&env);
1707
1708         RETURN(rc);
1709 }
1710 EXPORT_SYMBOL(lfsck_set_speed);
1711
1712 int lfsck_get_windows(struct dt_device *key, void *buf, int len)
1713 {
1714         struct lu_env           env;
1715         struct lfsck_instance  *lfsck;
1716         int                     rc;
1717         ENTRY;
1718
1719         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1720         if (rc != 0)
1721                 RETURN(rc);
1722
1723         lfsck = lfsck_instance_find(key, true, false);
1724         if (likely(lfsck != NULL)) {
1725                 rc = snprintf(buf, len, "%u\n",
1726                               lfsck->li_bookmark_ram.lb_async_windows);
1727                 lfsck_instance_put(&env, lfsck);
1728         } else {
1729                 rc = -ENXIO;
1730         }
1731
1732         lu_env_fini(&env);
1733
1734         RETURN(rc);
1735 }
1736 EXPORT_SYMBOL(lfsck_get_windows);
1737
1738 int lfsck_set_windows(struct dt_device *key, int val)
1739 {
1740         struct lu_env           env;
1741         struct lfsck_instance  *lfsck;
1742         int                     rc;
1743         ENTRY;
1744
1745         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1746         if (rc != 0)
1747                 RETURN(rc);
1748
1749         lfsck = lfsck_instance_find(key, true, false);
1750         if (likely(lfsck != NULL)) {
1751                 if (val > LFSCK_ASYNC_WIN_MAX) {
1752                         CERROR("%s: Too large async windows size, which "
1753                                "may cause memory issues. The valid range "
1754                                "is [0 - %u]. If you do not want to restrict "
1755                                "the windows size for async requests pipeline, "
1756                                "just set it as 0.\n",
1757                                lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1758                         rc = -EINVAL;
1759                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1760                         mutex_lock(&lfsck->li_mutex);
1761                         lfsck->li_bookmark_ram.lb_async_windows = val;
1762                         rc = lfsck_bookmark_store(&env, lfsck);
1763                         mutex_unlock(&lfsck->li_mutex);
1764                 }
1765                 lfsck_instance_put(&env, lfsck);
1766         } else {
1767                 rc = -ENXIO;
1768         }
1769
1770         lu_env_fini(&env);
1771
1772         RETURN(rc);
1773 }
1774 EXPORT_SYMBOL(lfsck_set_windows);
1775
1776 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
1777 {
1778         struct lu_env           env;
1779         struct lfsck_instance  *lfsck;
1780         struct lfsck_component *com;
1781         int                     rc;
1782         ENTRY;
1783
1784         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1785         if (rc != 0)
1786                 RETURN(rc);
1787
1788         lfsck = lfsck_instance_find(key, true, false);
1789         if (likely(lfsck != NULL)) {
1790                 com = lfsck_component_find(lfsck, type);
1791                 if (likely(com != NULL)) {
1792                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
1793                         lfsck_component_put(&env, com);
1794                 } else {
1795                         rc = -ENOTSUPP;
1796                 }
1797
1798                 lfsck_instance_put(&env, lfsck);
1799         } else {
1800                 rc = -ENXIO;
1801         }
1802
1803         lu_env_fini(&env);
1804
1805         RETURN(rc);
1806 }
1807 EXPORT_SYMBOL(lfsck_dump);
1808
1809 static int lfsck_stop_all(const struct lu_env *env,
1810                           struct lfsck_instance *lfsck,
1811                           struct lfsck_stop *stop)
1812 {
1813         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1814         struct lfsck_request              *lr     = &info->lti_lr;
1815         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1816         struct ptlrpc_request_set         *set;
1817         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1818         struct lfsck_tgt_desc             *ltd;
1819         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1820         __u32                              idx;
1821         int                                rc     = 0;
1822         int                                rc1    = 0;
1823         ENTRY;
1824
1825         LASSERT(stop->ls_flags & LPF_BROADCAST);
1826
1827         set = ptlrpc_prep_set();
1828         if (unlikely(set == NULL)) {
1829                 CERROR("%s: cannot allocate memory for stop LFSCK on "
1830                        "all targets\n", lfsck_lfsck2name(lfsck));
1831
1832                 RETURN(-ENOMEM);
1833         }
1834
1835         memset(lr, 0, sizeof(*lr));
1836         lr->lr_event = LE_STOP;
1837         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1838         lr->lr_status = stop->ls_status;
1839         lr->lr_version = bk->lb_version;
1840         lr->lr_active = LFSCK_TYPES_ALL;
1841         lr->lr_param = stop->ls_flags;
1842
1843         laia->laia_com = NULL;
1844         laia->laia_ltds = ltds;
1845         laia->laia_lr = lr;
1846         laia->laia_result = 0;
1847         laia->laia_shared = 1;
1848
1849         down_read(&ltds->ltd_rw_sem);
1850         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1851                 ltd = lfsck_tgt_get(ltds, idx);
1852                 LASSERT(ltd != NULL);
1853
1854                 laia->laia_ltd = ltd;
1855                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1856                                          lfsck_async_interpret, laia,
1857                                          LFSCK_NOTIFY);
1858                 if (rc != 0) {
1859                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1860                         lfsck_tgt_put(ltd);
1861                         CWARN("%s: cannot notify MDT %x for LFSCK stop: "
1862                               "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1863                         rc1 = rc;
1864                 }
1865         }
1866         up_read(&ltds->ltd_rw_sem);
1867
1868         rc = ptlrpc_set_wait(set);
1869         ptlrpc_set_destroy(set);
1870
1871         if (rc == 0)
1872                 rc = laia->laia_result;
1873
1874         if (rc == -EALREADY)
1875                 rc = 0;
1876
1877         if (rc != 0)
1878                 CWARN("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1879                       lfsck_lfsck2name(lfsck), rc);
1880
1881         RETURN(rc != 0 ? rc : rc1);
1882 }
1883
1884 static int lfsck_start_all(const struct lu_env *env,
1885                            struct lfsck_instance *lfsck,
1886                            struct lfsck_start *start)
1887 {
1888         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1889         struct lfsck_request              *lr     = &info->lti_lr;
1890         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1891         struct ptlrpc_request_set         *set;
1892         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1893         struct lfsck_tgt_desc             *ltd;
1894         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1895         __u32                              idx;
1896         int                                rc     = 0;
1897         ENTRY;
1898
1899         LASSERT(start->ls_flags & LPF_BROADCAST);
1900
1901         set = ptlrpc_prep_set();
1902         if (unlikely(set == NULL)) {
1903                 if (bk->lb_param & LPF_FAILOUT) {
1904                         CERROR("%s: cannot allocate memory for start LFSCK on "
1905                                "all targets, failout.\n",
1906                                lfsck_lfsck2name(lfsck));
1907
1908                         RETURN(-ENOMEM);
1909                 } else {
1910                         CWARN("%s: cannot allocate memory for start LFSCK on "
1911                               "all targets, partly scan.\n",
1912                               lfsck_lfsck2name(lfsck));
1913
1914                         RETURN(0);
1915                 }
1916         }
1917
1918         memset(lr, 0, sizeof(*lr));
1919         lr->lr_event = LE_START;
1920         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1921         lr->lr_speed = bk->lb_speed_limit;
1922         lr->lr_version = bk->lb_version;
1923         lr->lr_active = start->ls_active;
1924         lr->lr_param = start->ls_flags;
1925         lr->lr_async_windows = bk->lb_async_windows;
1926         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1927                        LSV_ASYNC_WINDOWS;
1928
1929         laia->laia_com = NULL;
1930         laia->laia_ltds = ltds;
1931         laia->laia_lr = lr;
1932         laia->laia_result = 0;
1933         laia->laia_shared = 1;
1934
1935         down_read(&ltds->ltd_rw_sem);
1936         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1937                 ltd = lfsck_tgt_get(ltds, idx);
1938                 LASSERT(ltd != NULL);
1939
1940                 laia->laia_ltd = ltd;
1941                 ltd->ltd_layout_done = 0;
1942                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1943                                          lfsck_async_interpret, laia,
1944                                          LFSCK_NOTIFY);
1945                 if (rc != 0) {
1946                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1947                         lfsck_tgt_put(ltd);
1948                         if (bk->lb_param & LPF_FAILOUT) {
1949                                 CERROR("%s: cannot notify MDT %x for LFSCK "
1950                                        "start, failout: rc = %d\n",
1951                                        lfsck_lfsck2name(lfsck), idx, rc);
1952                                 break;
1953                         } else {
1954                                 CWARN("%s: cannot notify MDT %x for LFSCK "
1955                                       "start, partly scan: rc = %d\n",
1956                                       lfsck_lfsck2name(lfsck), idx, rc);
1957                                 rc = 0;
1958                         }
1959                 }
1960         }
1961         up_read(&ltds->ltd_rw_sem);
1962
1963         if (rc != 0) {
1964                 ptlrpc_set_destroy(set);
1965
1966                 RETURN(rc);
1967         }
1968
1969         rc = ptlrpc_set_wait(set);
1970         ptlrpc_set_destroy(set);
1971
1972         if (rc == 0)
1973                 rc = laia->laia_result;
1974
1975         if (rc != 0) {
1976                 if (bk->lb_param & LPF_FAILOUT) {
1977                         struct lfsck_stop *stop = &info->lti_stop;
1978
1979                         CERROR("%s: cannot start LFSCK on some MDTs, "
1980                                "stop all: rc = %d\n",
1981                                lfsck_lfsck2name(lfsck), rc);
1982                         if (rc != -EALREADY) {
1983                                 stop->ls_status = LS_FAILED;
1984                                 stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1985                                 lfsck_stop_all(env, lfsck, stop);
1986                         }
1987                 } else {
1988                         CWARN("%s: cannot start LFSCK on some MDTs, "
1989                               "partly scan: rc = %d\n",
1990                               lfsck_lfsck2name(lfsck), rc);
1991                         rc = 0;
1992                 }
1993         }
1994
1995         RETURN(rc);
1996 }
1997
1998 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1999                 struct lfsck_start_param *lsp)
2000 {
2001         struct lfsck_start              *start  = lsp->lsp_start;
2002         struct lfsck_instance           *lfsck;
2003         struct lfsck_bookmark           *bk;
2004         struct ptlrpc_thread            *thread;
2005         struct lfsck_component          *com;
2006         struct l_wait_info               lwi    = { 0 };
2007         struct lfsck_thread_args        *lta;
2008         bool                             dirty  = false;
2009         long                             rc     = 0;
2010         __u16                            valid  = 0;
2011         __u16                            flags  = 0;
2012         __u16                            type   = 1;
2013         ENTRY;
2014
2015         lfsck = lfsck_instance_find(key, true, false);
2016         if (unlikely(lfsck == NULL))
2017                 RETURN(-ENXIO);
2018
2019         /* System is not ready, try again later. */
2020         if (unlikely(lfsck->li_namespace == NULL))
2021                 GOTO(put, rc = -EAGAIN);
2022
2023         /* start == NULL means auto trigger paused LFSCK. */
2024         if ((start == NULL) &&
2025             (cfs_list_empty(&lfsck->li_list_scan) ||
2026              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2027                 GOTO(put, rc = 0);
2028
2029         bk = &lfsck->li_bookmark_ram;
2030         thread = &lfsck->li_thread;
2031         mutex_lock(&lfsck->li_mutex);
2032         spin_lock(&lfsck->li_lock);
2033         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2034                 rc = -EALREADY;
2035                 while (start->ls_active != 0) {
2036                         if (!(type & start->ls_active)) {
2037                                 type <<= 1;
2038                                 continue;
2039                         }
2040
2041                         com = __lfsck_component_find(lfsck, type,
2042                                                      &lfsck->li_list_scan);
2043                         if (com == NULL)
2044                                 com = __lfsck_component_find(lfsck, type,
2045                                                 &lfsck->li_list_double_scan);
2046                         if (com == NULL) {
2047                                 rc = -EOPNOTSUPP;
2048                                 break;
2049                         }
2050
2051                         if (com->lc_ops->lfsck_join != NULL) {
2052                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2053                                 if (rc != 0 && rc != -EALREADY)
2054                                         break;
2055                         }
2056                         start->ls_active &= ~type;
2057                         type <<= 1;
2058                 }
2059                 spin_unlock(&lfsck->li_lock);
2060                 GOTO(out, rc);
2061         }
2062         spin_unlock(&lfsck->li_lock);
2063
2064         lfsck->li_status = 0;
2065         lfsck->li_oit_over = 0;
2066         lfsck->li_start_unplug = 0;
2067         lfsck->li_drop_dryrun = 0;
2068         lfsck->li_new_scanned = 0;
2069
2070         /* For auto trigger. */
2071         if (start == NULL)
2072                 goto trigger;
2073
2074         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2075                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2076                        lfsck_lfsck2name(lfsck));
2077
2078                 GOTO(out, rc = -EPERM);
2079         }
2080
2081         start->ls_version = bk->lb_version;
2082         if (start->ls_valid & LSV_SPEED_LIMIT) {
2083                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
2084                 dirty = true;
2085         }
2086
2087         if (start->ls_valid & LSV_ASYNC_WINDOWS &&
2088             bk->lb_async_windows != start->ls_async_windows) {
2089                 bk->lb_async_windows = start->ls_async_windows;
2090                 dirty = true;
2091         }
2092
2093         if (start->ls_valid & LSV_ERROR_HANDLE) {
2094                 valid |= DOIV_ERROR_HANDLE;
2095                 if (start->ls_flags & LPF_FAILOUT)
2096                         flags |= DOIF_FAILOUT;
2097
2098                 if ((start->ls_flags & LPF_FAILOUT) &&
2099                     !(bk->lb_param & LPF_FAILOUT)) {
2100                         bk->lb_param |= LPF_FAILOUT;
2101                         dirty = true;
2102                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
2103                            (bk->lb_param & LPF_FAILOUT)) {
2104                         bk->lb_param &= ~LPF_FAILOUT;
2105                         dirty = true;
2106                 }
2107         }
2108
2109         if (start->ls_valid & LSV_DRYRUN) {
2110                 valid |= DOIV_DRYRUN;
2111                 if (start->ls_flags & LPF_DRYRUN)
2112                         flags |= DOIF_DRYRUN;
2113
2114                 if ((start->ls_flags & LPF_DRYRUN) &&
2115                     !(bk->lb_param & LPF_DRYRUN)) {
2116                         bk->lb_param |= LPF_DRYRUN;
2117                         dirty = true;
2118                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
2119                            (bk->lb_param & LPF_DRYRUN)) {
2120                         bk->lb_param &= ~LPF_DRYRUN;
2121                         lfsck->li_drop_dryrun = 1;
2122                         dirty = true;
2123                 }
2124         }
2125
2126         if (bk->lb_param & LPF_ALL_TGT &&
2127             !(start->ls_flags & LPF_ALL_TGT)) {
2128                 bk->lb_param &= ~LPF_ALL_TGT;
2129                 dirty = true;
2130         } else if (!(bk->lb_param & LPF_ALL_TGT) &&
2131                    start->ls_flags & LPF_ALL_TGT) {
2132                 bk->lb_param |= LPF_ALL_TGT;
2133                 dirty = true;
2134         }
2135
2136         if (bk->lb_param & LPF_ORPHAN &&
2137             !(start->ls_flags & LPF_ORPHAN)) {
2138                 bk->lb_param &= ~LPF_ORPHAN;
2139                 dirty = true;
2140         } else if (!(bk->lb_param & LPF_ORPHAN) &&
2141                    start->ls_flags & LPF_ORPHAN) {
2142                 bk->lb_param |= LPF_ORPHAN;
2143                 dirty = true;
2144         }
2145
2146         if (dirty) {
2147                 rc = lfsck_bookmark_store(env, lfsck);
2148                 if (rc != 0)
2149                         GOTO(out, rc);
2150         }
2151
2152         if (start->ls_flags & LPF_RESET)
2153                 flags |= DOIF_RESET;
2154
2155         if (start->ls_active != 0) {
2156                 struct lfsck_component *next;
2157
2158                 if (start->ls_active == LFSCK_TYPES_ALL)
2159                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2160
2161                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2162                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2163                         GOTO(out, rc = -ENOTSUPP);
2164                 }
2165
2166                 cfs_list_for_each_entry_safe(com, next,
2167                                              &lfsck->li_list_scan, lc_link) {
2168                         if (!(com->lc_type & start->ls_active)) {
2169                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2170                                                              false);
2171                                 if (rc != 0)
2172                                         GOTO(out, rc);
2173                         }
2174                 }
2175
2176                 while (start->ls_active != 0) {
2177                         if (type & start->ls_active) {
2178                                 com = __lfsck_component_find(lfsck, type,
2179                                                         &lfsck->li_list_idle);
2180                                 if (com != NULL) {
2181                                         /* The component status will be updated
2182                                          * when its prep() is called later by
2183                                          * the LFSCK main engine. */
2184                                         cfs_list_del_init(&com->lc_link);
2185                                         cfs_list_add_tail(&com->lc_link,
2186                                                           &lfsck->li_list_scan);
2187                                 }
2188                                 start->ls_active &= ~type;
2189                         }
2190                         type <<= 1;
2191                 }
2192         }
2193
2194         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2195                 start->ls_active |= com->lc_type;
2196                 if (flags & DOIF_RESET) {
2197                         rc = com->lc_ops->lfsck_reset(env, com, false);
2198                         if (rc != 0)
2199                                 GOTO(out, rc);
2200                 }
2201         }
2202
2203 trigger:
2204         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
2205         if (bk->lb_param & LPF_DRYRUN) {
2206                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2207                 valid |= DOIV_DRYRUN;
2208                 flags |= DOIF_DRYRUN;
2209         }
2210
2211         if (bk->lb_param & LPF_FAILOUT) {
2212                 valid |= DOIV_ERROR_HANDLE;
2213                 flags |= DOIF_FAILOUT;
2214         }
2215
2216         if (!cfs_list_empty(&lfsck->li_list_scan))
2217                 flags |= DOIF_OUTUSED;
2218
2219         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2220         thread_set_flags(thread, 0);
2221         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2222         if (IS_ERR(lta))
2223                 GOTO(out, rc = PTR_ERR(lta));
2224
2225         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
2226         if (IS_ERR_VALUE(rc)) {
2227                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
2228                        lfsck_lfsck2name(lfsck), rc);
2229                 lfsck_thread_args_fini(lta);
2230
2231                 GOTO(out, rc);
2232         }
2233
2234         l_wait_event(thread->t_ctl_waitq,
2235                      thread_is_running(thread) ||
2236                      thread_is_stopped(thread),
2237                      &lwi);
2238         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2239                 lfsck->li_start_unplug = 1;
2240                 wake_up_all(&thread->t_ctl_waitq);
2241
2242                 GOTO(out, rc = 0);
2243         }
2244
2245         /* release lfsck::li_mutex to avoid deadlock. */
2246         mutex_unlock(&lfsck->li_mutex);
2247         rc = lfsck_start_all(env, lfsck, start);
2248         if (rc != 0) {
2249                 spin_lock(&lfsck->li_lock);
2250                 if (thread_is_stopped(thread)) {
2251                         spin_unlock(&lfsck->li_lock);
2252                 } else {
2253                         lfsck->li_status = LS_FAILED;
2254                         lfsck->li_flags = 0;
2255                         thread_set_flags(thread, SVC_STOPPING);
2256                         spin_unlock(&lfsck->li_lock);
2257
2258                         lfsck->li_start_unplug = 1;
2259                         wake_up_all(&thread->t_ctl_waitq);
2260                         l_wait_event(thread->t_ctl_waitq,
2261                                      thread_is_stopped(thread),
2262                                      &lwi);
2263                 }
2264         } else {
2265                 lfsck->li_start_unplug = 1;
2266                 wake_up_all(&thread->t_ctl_waitq);
2267         }
2268
2269         GOTO(put, rc);
2270
2271 out:
2272         mutex_unlock(&lfsck->li_mutex);
2273
2274 put:
2275         lfsck_instance_put(env, lfsck);
2276
2277         return rc < 0 ? rc : 0;
2278 }
2279 EXPORT_SYMBOL(lfsck_start);
2280
2281 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2282                struct lfsck_stop *stop)
2283 {
2284         struct lfsck_instance   *lfsck;
2285         struct ptlrpc_thread    *thread;
2286         struct l_wait_info       lwi    = { 0 };
2287         int                      rc     = 0;
2288         int                      rc1    = 0;
2289         ENTRY;
2290
2291         lfsck = lfsck_instance_find(key, true, false);
2292         if (unlikely(lfsck == NULL))
2293                 RETURN(-ENXIO);
2294
2295         thread = &lfsck->li_thread;
2296         /* release lfsck::li_mutex to avoid deadlock. */
2297         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2298                 if (!lfsck->li_master) {
2299                         CERROR("%s: only allow to specify '-A' via MDS\n",
2300                                lfsck_lfsck2name(lfsck));
2301
2302                         GOTO(out, rc = -EPERM);
2303                 }
2304
2305                 rc1 = lfsck_stop_all(env, lfsck, stop);
2306         }
2307
2308         mutex_lock(&lfsck->li_mutex);
2309         spin_lock(&lfsck->li_lock);
2310         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2311                 spin_unlock(&lfsck->li_lock);
2312                 GOTO(out, rc = -EALREADY);
2313         }
2314
2315         if (stop != NULL) {
2316                 lfsck->li_status = stop->ls_status;
2317                 lfsck->li_flags = stop->ls_flags;
2318         } else {
2319                 lfsck->li_status = LS_STOPPED;
2320                 lfsck->li_flags = 0;
2321         }
2322
2323         thread_set_flags(thread, SVC_STOPPING);
2324         spin_unlock(&lfsck->li_lock);
2325
2326         wake_up_all(&thread->t_ctl_waitq);
2327         l_wait_event(thread->t_ctl_waitq,
2328                      thread_is_stopped(thread),
2329                      &lwi);
2330
2331         GOTO(out, rc = 0);
2332
2333 out:
2334         mutex_unlock(&lfsck->li_mutex);
2335         lfsck_instance_put(env, lfsck);
2336
2337         return rc != 0 ? rc : rc1;
2338 }
2339 EXPORT_SYMBOL(lfsck_stop);
2340
2341 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2342                     struct lfsck_request *lr)
2343 {
2344         int rc = -EOPNOTSUPP;
2345         ENTRY;
2346
2347         switch (lr->lr_event) {
2348         case LE_START: {
2349                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2350                 struct lfsck_start_param  lsp;
2351
2352                 memset(start, 0, sizeof(*start));
2353                 start->ls_valid = lr->lr_valid;
2354                 start->ls_speed_limit = lr->lr_speed;
2355                 start->ls_version = lr->lr_version;
2356                 start->ls_active = lr->lr_active;
2357                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2358                 start->ls_async_windows = lr->lr_async_windows;
2359
2360                 lsp.lsp_start = start;
2361                 lsp.lsp_index = lr->lr_index;
2362                 lsp.lsp_index_valid = 1;
2363                 rc = lfsck_start(env, key, &lsp);
2364                 break;
2365         }
2366         case LE_STOP: {
2367                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2368
2369                 memset(stop, 0, sizeof(*stop));
2370                 stop->ls_status = lr->lr_status;
2371                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2372                 rc = lfsck_stop(env, key, stop);
2373                 break;
2374         }
2375         case LE_PHASE1_DONE:
2376         case LE_PHASE2_DONE:
2377         case LE_FID_ACCESSED:
2378         case LE_PEER_EXIT:
2379         case LE_CONDITIONAL_DESTROY:
2380         case LE_PAIRS_VERIFY: {
2381                 struct lfsck_instance  *lfsck;
2382                 struct lfsck_component *com;
2383
2384                 lfsck = lfsck_instance_find(key, true, false);
2385                 if (unlikely(lfsck == NULL))
2386                         RETURN(-ENXIO);
2387
2388                 com = lfsck_component_find(lfsck, lr->lr_active);
2389                 if (likely(com != NULL)) {
2390                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2391                         lfsck_component_put(env, com);
2392                 }
2393
2394                 lfsck_instance_put(env, lfsck);
2395                 break;
2396         }
2397         default:
2398                 break;
2399         }
2400
2401         RETURN(rc);
2402 }
2403 EXPORT_SYMBOL(lfsck_in_notify);
2404
2405 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2406                 struct lfsck_request *lr)
2407 {
2408         struct lfsck_instance  *lfsck;
2409         struct lfsck_component *com;
2410         int                     rc;
2411         ENTRY;
2412
2413         lfsck = lfsck_instance_find(key, true, false);
2414         if (unlikely(lfsck == NULL))
2415                 RETURN(-ENXIO);
2416
2417         com = lfsck_component_find(lfsck, lr->lr_active);
2418         if (likely(com != NULL)) {
2419                 rc = com->lc_ops->lfsck_query(env, com);
2420                 lfsck_component_put(env, com);
2421         } else {
2422                 rc = -ENOTSUPP;
2423         }
2424
2425         lfsck_instance_put(env, lfsck);
2426
2427         RETURN(rc);
2428 }
2429 EXPORT_SYMBOL(lfsck_query);
2430
2431 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2432                              struct ldlm_namespace *ns)
2433 {
2434         struct lfsck_instance  *lfsck;
2435         int                     rc      = -ENXIO;
2436
2437         lfsck = lfsck_instance_find(key, true, false);
2438         if (likely(lfsck != NULL)) {
2439                 lfsck->li_namespace = ns;
2440                 lfsck_instance_put(env, lfsck);
2441                 rc = 0;
2442         }
2443
2444         return rc;
2445 }
2446 EXPORT_SYMBOL(lfsck_register_namespace);
2447
2448 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2449                    struct dt_device *next, struct obd_device *obd,
2450                    lfsck_out_notify notify, void *notify_data, bool master)
2451 {
2452         struct lfsck_instance   *lfsck;
2453         struct dt_object        *root  = NULL;
2454         struct dt_object        *obj;
2455         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2456         int                      rc;
2457         ENTRY;
2458
2459         lfsck = lfsck_instance_find(key, false, false);
2460         if (unlikely(lfsck != NULL))
2461                 RETURN(-EEXIST);
2462
2463         OBD_ALLOC_PTR(lfsck);
2464         if (lfsck == NULL)
2465                 RETURN(-ENOMEM);
2466
2467         mutex_init(&lfsck->li_mutex);
2468         spin_lock_init(&lfsck->li_lock);
2469         CFS_INIT_LIST_HEAD(&lfsck->li_link);
2470         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
2471         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
2472         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2473         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
2474         atomic_set(&lfsck->li_ref, 1);
2475         atomic_set(&lfsck->li_double_scan_count, 0);
2476         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2477         lfsck->li_out_notify = notify;
2478         lfsck->li_out_notify_data = notify_data;
2479         lfsck->li_next = next;
2480         lfsck->li_bottom = key;
2481         lfsck->li_obd = obd;
2482
2483         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2484         if (rc != 0)
2485                 GOTO(out, rc);
2486
2487         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2488         if (rc != 0)
2489                 GOTO(out, rc);
2490
2491         fid->f_seq = FID_SEQ_LOCAL_NAME;
2492         fid->f_oid = 1;
2493         fid->f_ver = 0;
2494         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
2495         if (rc != 0)
2496                 GOTO(out, rc);
2497
2498         rc = dt_root_get(env, key, fid);
2499         if (rc != 0)
2500                 GOTO(out, rc);
2501
2502         root = dt_locate(env, lfsck->li_bottom, fid);
2503         if (IS_ERR(root))
2504                 GOTO(out, rc = PTR_ERR(root));
2505
2506         if (unlikely(!dt_try_as_dir(env, root)))
2507                 GOTO(out, rc = -ENOTDIR);
2508
2509         lfsck->li_local_root_fid = *fid;
2510         if (master) {
2511                 lfsck->li_master = 1;
2512                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
2513                         rc = dt_lookup(env, root,
2514                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2515                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2516                         if (rc != 0)
2517                                 GOTO(out, rc);
2518                 }
2519         }
2520
2521         fid->f_seq = FID_SEQ_LOCAL_FILE;
2522         fid->f_oid = OTABLE_IT_OID;
2523         fid->f_ver = 0;
2524         obj = dt_locate(env, lfsck->li_bottom, fid);
2525         if (IS_ERR(obj))
2526                 GOTO(out, rc = PTR_ERR(obj));
2527
2528         lfsck->li_obj_oit = obj;
2529         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2530         if (rc != 0) {
2531                 if (rc == -ENOTSUPP)
2532                         GOTO(add, rc = 0);
2533
2534                 GOTO(out, rc);
2535         }
2536
2537         rc = lfsck_bookmark_setup(env, lfsck);
2538         if (rc != 0)
2539                 GOTO(out, rc);
2540
2541         if (master) {
2542                 rc = lfsck_fid_init(lfsck);
2543                 if (rc < 0)
2544                         GOTO(out, rc);
2545
2546                 rc = lfsck_namespace_setup(env, lfsck);
2547                 if (rc < 0)
2548                         GOTO(out, rc);
2549         }
2550
2551         rc = lfsck_layout_setup(env, lfsck);
2552         if (rc < 0)
2553                 GOTO(out, rc);
2554
2555         /* XXX: more LFSCK components initialization to be added here. */
2556
2557 add:
2558         rc = lfsck_instance_add(lfsck);
2559         if (rc == 0)
2560                 rc = lfsck_add_target_from_orphan(env, lfsck);
2561 out:
2562         if (root != NULL && !IS_ERR(root))
2563                 lu_object_put(env, &root->do_lu);
2564         if (rc != 0)
2565                 lfsck_instance_cleanup(env, lfsck);
2566         return rc;
2567 }
2568 EXPORT_SYMBOL(lfsck_register);
2569
2570 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2571 {
2572         struct lfsck_instance *lfsck;
2573
2574         lfsck = lfsck_instance_find(key, false, true);
2575         if (lfsck != NULL)
2576                 lfsck_instance_put(env, lfsck);
2577 }
2578 EXPORT_SYMBOL(lfsck_degister);
2579
2580 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2581                      struct dt_device *tgt, struct obd_export *exp,
2582                      __u32 index, bool for_ost)
2583 {
2584         struct lfsck_instance   *lfsck;
2585         struct lfsck_tgt_desc   *ltd;
2586         int                      rc;
2587         ENTRY;
2588
2589         OBD_ALLOC_PTR(ltd);
2590         if (ltd == NULL)
2591                 RETURN(-ENOMEM);
2592
2593         ltd->ltd_tgt = tgt;
2594         ltd->ltd_key = key;
2595         ltd->ltd_exp = exp;
2596         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2597         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2598         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2599         atomic_set(&ltd->ltd_ref, 1);
2600         ltd->ltd_index = index;
2601
2602         spin_lock(&lfsck_instance_lock);
2603         lfsck = __lfsck_instance_find(key, true, false);
2604         if (lfsck == NULL) {
2605                 if (for_ost)
2606                         list_add_tail(&ltd->ltd_orphan_list,
2607                                       &lfsck_ost_orphan_list);
2608                 else
2609                         list_add_tail(&ltd->ltd_orphan_list,
2610                                       &lfsck_mdt_orphan_list);
2611                 spin_unlock(&lfsck_instance_lock);
2612
2613                 RETURN(0);
2614         }
2615         spin_unlock(&lfsck_instance_lock);
2616
2617         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2618         if (rc != 0)
2619                 lfsck_tgt_put(ltd);
2620
2621         lfsck_instance_put(env, lfsck);
2622
2623         RETURN(rc);
2624 }
2625 EXPORT_SYMBOL(lfsck_add_target);
2626
2627 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2628                       struct dt_device *tgt, __u32 index, bool for_ost)
2629 {
2630         struct lfsck_instance   *lfsck;
2631         struct lfsck_tgt_descs  *ltds;
2632         struct lfsck_tgt_desc   *ltd    = NULL;
2633         struct list_head        *head;
2634
2635         if (for_ost)
2636                 head = &lfsck_ost_orphan_list;
2637         else
2638                 head = &lfsck_mdt_orphan_list;
2639
2640         spin_lock(&lfsck_instance_lock);
2641         list_for_each_entry(ltd, head, ltd_orphan_list) {
2642                 if (ltd->ltd_tgt == tgt) {
2643                         list_del_init(&ltd->ltd_orphan_list);
2644                         spin_unlock(&lfsck_instance_lock);
2645                         lfsck_tgt_put(ltd);
2646
2647                         return;
2648                 }
2649         }
2650
2651         lfsck = __lfsck_instance_find(key, true, false);
2652         spin_unlock(&lfsck_instance_lock);
2653         if (unlikely(lfsck == NULL))
2654                 return;
2655
2656         if (for_ost)
2657                 ltds = &lfsck->li_ost_descs;
2658         else
2659                 ltds = &lfsck->li_mdt_descs;
2660
2661         down_write(&ltds->ltd_rw_sem);
2662         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2663
2664         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2665                 goto unlock;
2666
2667         ltd = LTD_TGT(ltds, index);
2668         if (unlikely(ltd == NULL))
2669                 goto unlock;
2670
2671         LASSERT(ltds->ltd_tgtnr > 0);
2672
2673         ltds->ltd_tgtnr--;
2674         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2675         LTD_TGT(ltds, index) = NULL;
2676
2677 unlock:
2678         if (ltd == NULL) {
2679                 if (for_ost)
2680                         head = &lfsck->li_ost_descs.ltd_orphan;
2681                 else
2682                         head = &lfsck->li_ost_descs.ltd_orphan;
2683
2684                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2685                         if (ltd->ltd_tgt == tgt) {
2686                                 list_del_init(&ltd->ltd_orphan_list);
2687                                 break;
2688                         }
2689                 }
2690         }
2691
2692         up_write(&ltds->ltd_rw_sem);
2693         if (ltd != NULL) {
2694                 spin_lock(&ltds->ltd_lock);
2695                 ltd->ltd_dead = 1;
2696                 spin_unlock(&ltds->ltd_lock);
2697                 lfsck_stop_notify(env, lfsck, ltds, ltd, LT_LAYOUT);
2698                 lfsck_tgt_put(ltd);
2699         }
2700
2701         lfsck_instance_put(env, lfsck);
2702 }
2703 EXPORT_SYMBOL(lfsck_del_target);
2704
2705 static int __init lfsck_init(void)
2706 {
2707         int rc;
2708
2709         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2710         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2711         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2712         rc = lu_context_key_register(&lfsck_thread_key);
2713         if (rc == 0) {
2714                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2715                 tgt_register_lfsck_query(lfsck_query);
2716         }
2717
2718         return rc;
2719 }
2720
2721 static void __exit lfsck_exit(void)
2722 {
2723         struct lfsck_tgt_desc *ltd;
2724         struct lfsck_tgt_desc *next;
2725
2726         LASSERT(cfs_list_empty(&lfsck_instance_list));
2727
2728         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2729                                  ltd_orphan_list) {
2730                 list_del_init(&ltd->ltd_orphan_list);
2731                 lfsck_tgt_put(ltd);
2732         }
2733
2734         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2735                                  ltd_orphan_list) {
2736                 list_del_init(&ltd->ltd_orphan_list);
2737                 lfsck_tgt_put(ltd);
2738         }
2739
2740         lu_context_key_degister(&lfsck_thread_key);
2741 }
2742
2743 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2744 MODULE_DESCRIPTION("LFSCK");
2745 MODULE_LICENSE("GPL");
2746
2747 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);