Whamcloud - gitweb
c8e26f062dd4399f2783cb26a2daeb6605d27e98
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static CFS_LIST_HEAD(lfsck_instance_list);
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         NULL
96 };
97
98 const char *lfsck_status2names(enum lfsck_status status)
99 {
100         if (unlikely(status < 0 || status >= LS_MAX))
101                 return "unknown";
102
103         return lfsck_status_names[status];
104 }
105
106 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
107 {
108         spin_lock_init(&ltds->ltd_lock);
109         init_rwsem(&ltds->ltd_rw_sem);
110         INIT_LIST_HEAD(&ltds->ltd_orphan);
111         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
112         if (ltds->ltd_tgts_bitmap == NULL)
113                 return -ENOMEM;
114
115         return 0;
116 }
117
118 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
119 {
120         struct lfsck_tgt_desc   *ltd;
121         struct lfsck_tgt_desc   *next;
122         int                      idx;
123
124         down_write(&ltds->ltd_rw_sem);
125
126         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
127                                  ltd_orphan_list) {
128                 list_del_init(&ltd->ltd_orphan_list);
129                 lfsck_tgt_put(ltd);
130         }
131
132         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
133                 up_write(&ltds->ltd_rw_sem);
134
135                 return;
136         }
137
138         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
139                 ltd = LTD_TGT(ltds, idx);
140                 if (likely(ltd != NULL)) {
141                         LASSERT(list_empty(&ltd->ltd_layout_list));
142                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
143
144                         ltds->ltd_tgtnr--;
145                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
146                         LTD_TGT(ltds, idx) = NULL;
147                         lfsck_tgt_put(ltd);
148                 }
149         }
150
151         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
152                  ltds->ltd_tgtnr);
153
154         for (idx = 0; idx < TGT_PTRS; idx++) {
155                 if (ltds->ltd_tgts_idx[idx] != NULL) {
156                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
157                         ltds->ltd_tgts_idx[idx] = NULL;
158                 }
159         }
160
161         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
162         ltds->ltd_tgts_bitmap = NULL;
163         up_write(&ltds->ltd_rw_sem);
164 }
165
166 static int __lfsck_add_target(const struct lu_env *env,
167                               struct lfsck_instance *lfsck,
168                               struct lfsck_tgt_desc *ltd,
169                               bool for_ost, bool locked)
170 {
171         struct lfsck_tgt_descs *ltds;
172         __u32                   index = ltd->ltd_index;
173         int                     rc    = 0;
174         ENTRY;
175
176         if (for_ost)
177                 ltds = &lfsck->li_ost_descs;
178         else
179                 ltds = &lfsck->li_mdt_descs;
180
181         if (!locked)
182                 down_write(&ltds->ltd_rw_sem);
183
184         LASSERT(ltds->ltd_tgts_bitmap != NULL);
185
186         if (index >= ltds->ltd_tgts_bitmap->size) {
187                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
188                                     (__u32)BITS_PER_LONG);
189                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
190                 cfs_bitmap_t *new_bitmap;
191
192                 while (newsize < index + 1)
193                         newsize <<= 1;
194
195                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
196                 if (new_bitmap == NULL)
197                         GOTO(unlock, rc = -ENOMEM);
198
199                 if (ltds->ltd_tgtnr > 0)
200                         cfs_bitmap_copy(new_bitmap, old_bitmap);
201                 ltds->ltd_tgts_bitmap = new_bitmap;
202                 CFS_FREE_BITMAP(old_bitmap);
203         }
204
205         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
206                 CERROR("%s: the device %s (%u) is registered already\n",
207                        lfsck_lfsck2name(lfsck),
208                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
209                 GOTO(unlock, rc = -EEXIST);
210         }
211
212         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
213                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
214                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
215                         GOTO(unlock, rc = -ENOMEM);
216         }
217
218         LTD_TGT(ltds, index) = ltd;
219         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
220         ltds->ltd_tgtnr++;
221
222         GOTO(unlock, rc = 0);
223
224 unlock:
225         if (!locked)
226                 up_write(&ltds->ltd_rw_sem);
227
228         return rc;
229 }
230
231 static int lfsck_add_target_from_orphan(const struct lu_env *env,
232                                         struct lfsck_instance *lfsck)
233 {
234         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
235         struct lfsck_tgt_desc   *ltd;
236         struct lfsck_tgt_desc   *next;
237         struct list_head        *head    = &lfsck_ost_orphan_list;
238         int                      rc;
239         bool                     for_ost = true;
240
241 again:
242         spin_lock(&lfsck_instance_lock);
243         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
244                 if (ltd->ltd_key == lfsck->li_bottom) {
245                         list_del_init(&ltd->ltd_orphan_list);
246                         list_add_tail(&ltd->ltd_orphan_list,
247                                       &ltds->ltd_orphan);
248                 }
249         }
250         spin_unlock(&lfsck_instance_lock);
251
252         down_write(&ltds->ltd_rw_sem);
253         while (!list_empty(&ltds->ltd_orphan)) {
254                 ltd = list_entry(ltds->ltd_orphan.next,
255                                  struct lfsck_tgt_desc,
256                                  ltd_orphan_list);
257                 list_del_init(&ltd->ltd_orphan_list);
258                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
259                 /* Do not hold the semaphore for too long time. */
260                 up_write(&ltds->ltd_rw_sem);
261                 if (rc != 0)
262                         return rc;
263
264                 down_write(&ltds->ltd_rw_sem);
265         }
266         up_write(&ltds->ltd_rw_sem);
267
268         if (for_ost) {
269                 ltds = &lfsck->li_mdt_descs;
270                 head = &lfsck_mdt_orphan_list;
271                 for_ost = false;
272                 goto again;
273         }
274
275         return 0;
276 }
277
278 static inline struct lfsck_component *
279 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
280 {
281         struct lfsck_component *com;
282
283         cfs_list_for_each_entry(com, list, lc_link) {
284                 if (com->lc_type == type)
285                         return com;
286         }
287         return NULL;
288 }
289
290 struct lfsck_component *
291 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
292 {
293         struct lfsck_component *com;
294
295         spin_lock(&lfsck->li_lock);
296         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
297         if (com != NULL)
298                 goto unlock;
299
300         com = __lfsck_component_find(lfsck, type,
301                                      &lfsck->li_list_double_scan);
302         if (com != NULL)
303                 goto unlock;
304
305         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
306
307 unlock:
308         if (com != NULL)
309                 lfsck_component_get(com);
310         spin_unlock(&lfsck->li_lock);
311         return com;
312 }
313
314 void lfsck_component_cleanup(const struct lu_env *env,
315                              struct lfsck_component *com)
316 {
317         if (!cfs_list_empty(&com->lc_link))
318                 cfs_list_del_init(&com->lc_link);
319         if (!cfs_list_empty(&com->lc_link_dir))
320                 cfs_list_del_init(&com->lc_link_dir);
321
322         lfsck_component_put(env, com);
323 }
324
325 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
326                     struct lu_fid *fid, bool locked)
327 {
328         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
329         int                      rc = 0;
330         ENTRY;
331
332         if (!locked)
333                 mutex_lock(&lfsck->li_mutex);
334
335         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
336         if (rc >= 0) {
337                 bk->lb_last_fid = *fid;
338                 /* We do not care about whether the subsequent sub-operations
339                  * failed or not. The worst case is that one FID is lost that
340                  * is not a big issue for the LFSCK since it is relative rare
341                  * for LFSCK create. */
342                 rc = lfsck_bookmark_store(env, lfsck);
343         }
344
345         if (!locked)
346                 mutex_unlock(&lfsck->li_mutex);
347
348         RETURN(rc);
349 }
350
351 static const char dot[] = ".";
352 static const char dotdot[] = "..";
353
354 static int lfsck_create_lpf_local(const struct lu_env *env,
355                                   struct lfsck_instance *lfsck,
356                                   struct dt_object *parent,
357                                   struct dt_object *child,
358                                   struct lu_attr *la,
359                                   struct dt_object_format *dof,
360                                   const char *name)
361 {
362         struct dt_device        *dev    = lfsck->li_bottom;
363         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
364         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
365         const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
366         struct thandle          *th     = NULL;
367         loff_t                   pos    = 0;
368         int                      len    = sizeof(struct lfsck_bookmark);
369         int                      rc     = 0;
370         ENTRY;
371
372         th = dt_trans_create(env, dev);
373         if (IS_ERR(th))
374                 RETURN(PTR_ERR(th));
375
376         /* 1a. create child */
377         rc = dt_declare_create(env, child, la, NULL, dof, th);
378         if (rc != 0)
379                 GOTO(stop, rc);
380
381         /* 2a. increase child nlink */
382         rc = dt_declare_ref_add(env, child, th);
383         if (rc != 0)
384                 GOTO(stop, rc);
385
386         /* 3a. insert name into parent dir */
387         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
388                                (const struct dt_key *)name, th);
389         if (rc != 0)
390                 GOTO(stop, rc);
391
392         /* 4a. increase parent nlink */
393         rc = dt_declare_ref_add(env, parent, th);
394         if (rc != 0)
395                 GOTO(stop, rc);
396
397         /* 5a. update bookmark */
398         rc = dt_declare_record_write(env, bk_obj,
399                                      lfsck_buf_get(env, bk, len), 0, th);
400         if (rc != 0)
401                 GOTO(stop, rc);
402
403         rc = dt_trans_start_local(env, dev, th);
404         if (rc != 0)
405                 GOTO(stop, rc);
406
407         dt_write_lock(env, child, 0);
408         /* 1b.1 create child */
409         rc = dt_create(env, child, la, NULL, dof, th);
410         if (rc != 0)
411                 GOTO(unlock, rc);
412
413         if (unlikely(!dt_try_as_dir(env, child)))
414                 GOTO(unlock, rc = -ENOTDIR);
415
416         /* 1b.2 insert dot into child dir */
417         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
418                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
419         if (rc != 0)
420                 GOTO(unlock, rc);
421
422         /* 1b.3 insert dotdot into child dir */
423         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
424                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
425         if (rc != 0)
426                 GOTO(unlock, rc);
427
428         /* 2b. increase child nlink */
429         rc = dt_ref_add(env, child, th);
430         dt_write_unlock(env, child);
431         if (rc != 0)
432                 GOTO(stop, rc);
433
434         /* 3b. insert name into parent dir */
435         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
436                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
437         if (rc != 0)
438                 GOTO(stop, rc);
439
440         dt_write_lock(env, parent, 0);
441         /* 4b. increase parent nlink */
442         rc = dt_ref_add(env, parent, th);
443         dt_write_unlock(env, parent);
444         if (rc != 0)
445                 GOTO(stop, rc);
446
447         bk->lb_lpf_fid = *cfid;
448         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
449
450         /* 5b. update bookmark */
451         rc = dt_record_write(env, bk_obj,
452                              lfsck_buf_get(env, bk, len), &pos, th);
453
454         GOTO(stop, rc);
455
456 unlock:
457         dt_write_unlock(env, child);
458
459 stop:
460         dt_trans_stop(env, dev, th);
461
462         return rc;
463 }
464
465 static int lfsck_create_lpf_remote(const struct lu_env *env,
466                                    struct lfsck_instance *lfsck,
467                                    struct dt_object *parent,
468                                    struct dt_object *child,
469                                    struct lu_attr *la,
470                                    struct dt_object_format *dof,
471                                    const char *name)
472 {
473         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
474         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
475         const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
476         struct thandle          *th     = NULL;
477         struct dt_device        *dev;
478         loff_t                   pos    = 0;
479         int                      len    = sizeof(struct lfsck_bookmark);
480         int                      rc     = 0;
481         ENTRY;
482
483         /* Create .lustre/lost+found/MDTxxxx. */
484
485         /* XXX: Currently, cross-MDT create operation needs to create the child
486          *      object firstly, then insert name into the parent directory. For
487          *      this case, the child object resides on current MDT (local), but
488          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
489          *      easy to contain all the sub-modifications orderly within single
490          *      transaction.
491          *
492          *      To avoid more inconsistency, we split the create operation into
493          *      two transactions:
494          *
495          *      1) create the child locally.
496          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
497          *         remotely and update the lfsck_bookmark::lb_lpf_fid locally.
498          *
499          *      If 1) done but 2) failed, then the worst case is that we lose
500          *      one object locally, which is not a big issue. (can be repaird
501          *      by LFSCK phase III) */
502
503         /* Transaction I: */
504
505         dev = lfsck->li_bottom;
506         th = dt_trans_create(env, dev);
507         if (IS_ERR(th))
508                 RETURN(PTR_ERR(th));
509
510         /* 1a. create child locally. */
511         rc = dt_declare_create(env, child, la, NULL, dof, th);
512         if (rc != 0)
513                 GOTO(stop, rc);
514
515         /* 2a. increase child nlink locally. */
516         rc = dt_declare_ref_add(env, child, th);
517         if (rc != 0)
518                 GOTO(stop, rc);
519
520         rc = dt_trans_start_local(env, dev, th);
521         if (rc != 0)
522                 GOTO(stop, rc);
523
524         dt_write_lock(env, child, 0);
525         /* 1b. create child locally. */
526         rc = dt_create(env, child, la, NULL, dof, th);
527         if (rc != 0)
528                 GOTO(unlock, rc);
529
530         if (unlikely(!dt_try_as_dir(env, child)))
531                 GOTO(unlock, rc = -ENOTDIR);
532
533         /* 2b.1 insert dot into child dir locally. */
534         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
535                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
536         if (rc != 0)
537                 GOTO(unlock, rc);
538
539         /* 2b.2 insert dotdot into child dir locally. */
540         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
541                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
542         if (rc != 0)
543                 GOTO(unlock, rc);
544
545         /* 2b.3 increase child nlink locally. */
546         rc = dt_ref_add(env, child, th);
547         dt_write_unlock(env, child);
548         dt_trans_stop(env, dev, th);
549         if (rc != 0)
550                 RETURN(rc);
551
552         /* Transaction II: */
553
554         dev = lfsck->li_next;
555         th = dt_trans_create(env, dev);
556         if (IS_ERR(th))
557                 RETURN(PTR_ERR(th));
558
559         /* 3a. insert name into parent dir remotely. */
560         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
561                                (const struct dt_key *)name, th);
562         if (rc != 0)
563                 GOTO(stop, rc);
564
565         /* 4a. increase parent nlink remotely. */
566         rc = dt_declare_ref_add(env, parent, th);
567         if (rc != 0)
568                 GOTO(stop, rc);
569
570         /* 5a. decrease child nlink for dotdot locally if former remote
571          *     update failed. */
572         rc = dt_declare_ref_del(env, child, th);
573         if (rc != 0)
574                 GOTO(stop, rc);
575
576         /* 6a. decrease child nlink for dot locally if former remote
577          *     update failed. */
578         rc = dt_declare_ref_del(env, child, th);
579         if (rc != 0)
580                 GOTO(stop, rc);
581
582         /* 7a. destroy child locally if former remote update failed. */
583         rc = dt_declare_destroy(env, child, th);
584         if (rc != 0)
585                 GOTO(stop, rc);
586
587         /* 8a. update bookmark locally. */
588         rc = dt_declare_record_write(env, bk_obj,
589                                      lfsck_buf_get(env, bk, len), 0, th);
590         if (rc != 0)
591                 GOTO(stop, rc);
592
593         rc = dt_trans_start(env, dev, th);
594         if (rc != 0)
595                 GOTO(stop, rc);
596
597         /* 3b. insert name into parent dir remotely. */
598         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
599                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
600         if (rc == 0) {
601                 dt_write_lock(env, parent, 0);
602                 /* 4b. increase parent nlink remotely. */
603                 rc = dt_ref_add(env, parent, th);
604                 dt_write_unlock(env, parent);
605         }
606         if (rc != 0) {
607                 /* 5b. decrease child nlink for dotdot locally. */
608                 dt_ref_del(env, child, th);
609                 /* 6b. decrease child nlink for dot locally. */
610                 dt_ref_del(env, child, th);
611                 /* 7b. destroy child locally. */
612                 dt_destroy(env, child, th);
613                 GOTO(stop, rc);
614         }
615
616         bk->lb_lpf_fid = *cfid;
617         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
618
619         /* 8b. update bookmark locally. */
620         rc = dt_record_write(env, bk_obj,
621                              lfsck_buf_get(env, bk, len), &pos, th);
622
623         GOTO(stop, rc);
624
625 unlock:
626         dt_write_unlock(env, child);
627 stop:
628         dt_trans_stop(env, dev, th);
629
630         return rc;
631 }
632
633 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
634  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
635  * only when it is required, such as orphan OST-objects repairing. */
636 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
637 {
638         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
639         struct lfsck_thread_info *info  = lfsck_env_info(env);
640         struct lu_fid            *cfid  = &info->lti_fid2;
641         struct lu_attr           *la    = &info->lti_la;
642         struct dt_object_format  *dof   = &info->lti_dof;
643         struct dt_object         *parent = NULL;
644         struct dt_object         *child = NULL;
645         char                      name[8];
646         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
647         int                       rc    = 0;
648         ENTRY;
649
650         LASSERT(lfsck->li_master);
651
652         sprintf(name, "MDT%04x", node);
653         if (node == 0) {
654                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
655                                                   &LU_LPF_FID);
656         } else {
657                 struct lfsck_tgt_desc *ltd;
658
659                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
660                 if (unlikely(ltd == NULL))
661                         RETURN(-ENXIO);
662
663                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
664                                                   &LU_LPF_FID);
665                 lfsck_tgt_put(ltd);
666         }
667         if (IS_ERR(parent))
668                 RETURN(PTR_ERR(parent));
669
670         if (unlikely(!dt_try_as_dir(env, parent)))
671                 GOTO(out, rc = -ENOTDIR);
672
673         mutex_lock(&lfsck->li_mutex);
674         if (lfsck->li_lpf_obj != NULL)
675                 GOTO(unlock, rc = 0);
676
677         if (fid_is_zero(&bk->lb_lpf_fid)) {
678                 /* There is corner case that: in former LFSCK scanning we have
679                  * created the .lustre/lost+found/MDTxxxx but failed to update
680                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
681                  * it from MDT0 firstly. */
682                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
683                                (const struct dt_key *)name, BYPASS_CAPA);
684                 if (rc != 0 && rc != -ENOENT)
685                         GOTO(unlock, rc);
686
687                 if (rc == 0) {
688                         bk->lb_lpf_fid = *cfid;
689                         rc = lfsck_bookmark_store(env, lfsck);
690                 } else {
691                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
692                 }
693                 if (rc != 0)
694                         GOTO(unlock, rc);
695         } else {
696                 *cfid = bk->lb_lpf_fid;
697         }
698
699         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
700         if (IS_ERR(child))
701                 GOTO(unlock, rc = PTR_ERR(child));
702
703         if (dt_object_exists(child) != 0) {
704                 if (unlikely(!dt_try_as_dir(env, child)))
705                         GOTO(unlock, rc = -ENOTDIR);
706
707                 lfsck->li_lpf_obj = child;
708                 GOTO(unlock, rc = 0);
709         }
710
711         memset(la, 0, sizeof(*la));
712         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
713         la->la_mode = S_IFDIR | S_IRWXU;
714         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
715                        LA_UID | LA_GID;
716         memset(dof, 0, sizeof(*dof));
717         dof->dof_type = dt_mode_to_dft(S_IFDIR);
718
719         if (node == 0)
720                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
721                                             dof, name);
722         else
723                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
724                                              dof, name);
725         if (rc == 0)
726                 lfsck->li_lpf_obj = child;
727
728         GOTO(unlock, rc);
729
730 unlock:
731         mutex_unlock(&lfsck->li_mutex);
732         if (rc != 0 && child != NULL && !IS_ERR(child))
733                 lu_object_put(env, &child->do_lu);
734 out:
735         if (parent != NULL && !IS_ERR(parent))
736                 lu_object_put(env, &parent->do_lu);
737
738         return rc;
739 }
740
741 static int lfsck_fid_init(struct lfsck_instance *lfsck)
742 {
743         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
744         struct seq_server_site  *ss;
745         char                    *prefix;
746         int                      rc     = 0;
747         ENTRY;
748
749         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
750         if (unlikely(ss == NULL))
751                 RETURN(-ENXIO);
752
753         OBD_ALLOC_PTR(lfsck->li_seq);
754         if (lfsck->li_seq == NULL)
755                 RETURN(-ENOMEM);
756
757         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
758         if (prefix == NULL)
759                 GOTO(out, rc = -ENOMEM);
760
761         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
762         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
763                              ss->ss_server_seq);
764         OBD_FREE(prefix, MAX_OBD_NAME + 7);
765         if (rc != 0)
766                 GOTO(out, rc);
767
768         if (fid_is_sane(&bk->lb_last_fid))
769                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
770
771         RETURN(0);
772
773 out:
774         OBD_FREE_PTR(lfsck->li_seq);
775         lfsck->li_seq = NULL;
776
777         return rc;
778 }
779
780 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
781 {
782         if (lfsck->li_seq != NULL) {
783                 seq_client_fini(lfsck->li_seq);
784                 OBD_FREE_PTR(lfsck->li_seq);
785                 lfsck->li_seq = NULL;
786         }
787 }
788
789 void lfsck_instance_cleanup(const struct lu_env *env,
790                             struct lfsck_instance *lfsck)
791 {
792         struct ptlrpc_thread    *thread = &lfsck->li_thread;
793         struct lfsck_component  *com;
794         ENTRY;
795
796         LASSERT(list_empty(&lfsck->li_link));
797         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
798
799         if (lfsck->li_obj_oit != NULL) {
800                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
801                 lfsck->li_obj_oit = NULL;
802         }
803
804         LASSERT(lfsck->li_obj_dir == NULL);
805
806         while (!cfs_list_empty(&lfsck->li_list_scan)) {
807                 com = cfs_list_entry(lfsck->li_list_scan.next,
808                                      struct lfsck_component,
809                                      lc_link);
810                 lfsck_component_cleanup(env, com);
811         }
812
813         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
814
815         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
816                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
817                                      struct lfsck_component,
818                                      lc_link);
819                 lfsck_component_cleanup(env, com);
820         }
821
822         while (!cfs_list_empty(&lfsck->li_list_idle)) {
823                 com = cfs_list_entry(lfsck->li_list_idle.next,
824                                      struct lfsck_component,
825                                      lc_link);
826                 lfsck_component_cleanup(env, com);
827         }
828
829         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
830         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
831
832         if (lfsck->li_bookmark_obj != NULL) {
833                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
834                 lfsck->li_bookmark_obj = NULL;
835         }
836
837         if (lfsck->li_lpf_obj != NULL) {
838                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
839                 lfsck->li_lpf_obj = NULL;
840         }
841
842         if (lfsck->li_los != NULL) {
843                 local_oid_storage_fini(env, lfsck->li_los);
844                 lfsck->li_los = NULL;
845         }
846
847         lfsck_fid_fini(lfsck);
848
849         OBD_FREE_PTR(lfsck);
850 }
851
852 static inline struct lfsck_instance *
853 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
854 {
855         struct lfsck_instance *lfsck;
856
857         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
858                 if (lfsck->li_bottom == key) {
859                         if (ref)
860                                 lfsck_instance_get(lfsck);
861                         if (unlink)
862                                 list_del_init(&lfsck->li_link);
863
864                         return lfsck;
865                 }
866         }
867
868         return NULL;
869 }
870
871 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
872                                            bool unlink)
873 {
874         struct lfsck_instance *lfsck;
875
876         spin_lock(&lfsck_instance_lock);
877         lfsck = __lfsck_instance_find(key, ref, unlink);
878         spin_unlock(&lfsck_instance_lock);
879
880         return lfsck;
881 }
882
883 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
884 {
885         struct lfsck_instance *tmp;
886
887         spin_lock(&lfsck_instance_lock);
888         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
889                 if (lfsck->li_bottom == tmp->li_bottom) {
890                         spin_unlock(&lfsck_instance_lock);
891                         return -EEXIST;
892                 }
893         }
894
895         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
896         spin_unlock(&lfsck_instance_lock);
897         return 0;
898 }
899
900 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
901                     const char *prefix)
902 {
903         int save = *len;
904         int flag;
905         int rc;
906         int i;
907
908         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
909         if (rc <= 0)
910                 return -ENOSPC;
911
912         *buf += rc;
913         *len -= rc;
914         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
915                 if (flag & bits) {
916                         bits &= ~flag;
917                         if (names[i] != NULL) {
918                                 rc = snprintf(*buf, *len, "%s%c", names[i],
919                                               bits != 0 ? ',' : '\n');
920                                 if (rc <= 0)
921                                         return -ENOSPC;
922
923                                 *buf += rc;
924                                 *len -= rc;
925                         }
926                 }
927         }
928         return save - *len;
929 }
930
931 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
932 {
933         int rc;
934
935         if (time != 0)
936                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
937                               cfs_time_current_sec() - time);
938         else
939                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
940         if (rc <= 0)
941                 return -ENOSPC;
942
943         *buf += rc;
944         *len -= rc;
945         return rc;
946 }
947
948 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
949                    const char *prefix)
950 {
951         int rc;
952
953         if (fid_is_zero(&pos->lp_dir_parent)) {
954                 if (pos->lp_oit_cookie == 0)
955                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
956                                       prefix);
957                 else
958                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
959                                       prefix, pos->lp_oit_cookie);
960         } else {
961                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
962                               prefix, pos->lp_oit_cookie,
963                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
964         }
965         if (rc <= 0)
966                 return -ENOSPC;
967
968         *buf += rc;
969         *len -= rc;
970         return rc;
971 }
972
973 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
974                     struct lfsck_position *pos, bool init)
975 {
976         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
977
978         if (unlikely(lfsck->li_di_oit == NULL)) {
979                 memset(pos, 0, sizeof(*pos));
980                 return;
981         }
982
983         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
984         if (!lfsck->li_current_oit_processed && !init)
985                 pos->lp_oit_cookie--;
986
987         LASSERT(pos->lp_oit_cookie > 0);
988
989         if (lfsck->li_di_dir != NULL) {
990                 struct dt_object *dto = lfsck->li_obj_dir;
991
992                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
993                                                         lfsck->li_di_dir);
994
995                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
996                         fid_zero(&pos->lp_dir_parent);
997                         pos->lp_dir_cookie = 0;
998                 } else {
999                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1000                 }
1001         } else {
1002                 fid_zero(&pos->lp_dir_parent);
1003                 pos->lp_dir_cookie = 0;
1004         }
1005 }
1006
1007 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1008 {
1009         lfsck->li_bookmark_ram.lb_speed_limit = limit;
1010         if (limit != LFSCK_SPEED_NO_LIMIT) {
1011                 if (limit > HZ) {
1012                         lfsck->li_sleep_rate = limit / HZ;
1013                         lfsck->li_sleep_jif = 1;
1014                 } else {
1015                         lfsck->li_sleep_rate = 1;
1016                         lfsck->li_sleep_jif = HZ / limit;
1017                 }
1018         } else {
1019                 lfsck->li_sleep_jif = 0;
1020                 lfsck->li_sleep_rate = 0;
1021         }
1022 }
1023
1024 void lfsck_control_speed(struct lfsck_instance *lfsck)
1025 {
1026         struct ptlrpc_thread *thread = &lfsck->li_thread;
1027         struct l_wait_info    lwi;
1028
1029         if (lfsck->li_sleep_jif > 0 &&
1030             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1031                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1032                                        LWI_ON_SIGNAL_NOOP, NULL);
1033
1034                 l_wait_event(thread->t_ctl_waitq,
1035                              !thread_is_running(thread),
1036                              &lwi);
1037                 lfsck->li_new_scanned = 0;
1038         }
1039 }
1040
1041 void lfsck_control_speed_by_self(struct lfsck_component *com)
1042 {
1043         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1044         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1045         struct l_wait_info       lwi;
1046
1047         if (lfsck->li_sleep_jif > 0 &&
1048             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1049                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1050                                        LWI_ON_SIGNAL_NOOP, NULL);
1051
1052                 l_wait_event(thread->t_ctl_waitq,
1053                              !thread_is_running(thread),
1054                              &lwi);
1055                 com->lc_new_scanned = 0;
1056         }
1057 }
1058
1059 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
1060                             struct lu_fid *fid)
1061 {
1062         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
1063                      !dt_try_as_dir(env, obj)))
1064                 return -ENOTDIR;
1065
1066         return dt_lookup(env, obj, (struct dt_rec *)fid,
1067                          (const struct dt_key *)"..", BYPASS_CAPA);
1068 }
1069
1070 static int lfsck_needs_scan_dir(const struct lu_env *env,
1071                                 struct lfsck_instance *lfsck,
1072                                 struct dt_object *obj)
1073 {
1074         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
1075         int            depth = 0;
1076         int            rc;
1077
1078         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
1079             cfs_list_empty(&lfsck->li_list_dir))
1080                RETURN(0);
1081
1082         while (1) {
1083                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
1084                  *      which is the agent directory to manage the objects
1085                  *      which name entries reside on remote MDTs. Related
1086                  *      consistency verification will be processed in LFSCK
1087                  *      phase III. */
1088                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
1089                         if (depth > 0)
1090                                 lfsck_object_put(env, obj);
1091                         return 1;
1092                 }
1093
1094                 /* .lustre doesn't contain "real" user objects, no need lfsck */
1095                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
1096                         if (depth > 0)
1097                                 lfsck_object_put(env, obj);
1098                         return 0;
1099                 }
1100
1101                 dt_read_lock(env, obj, MOR_TGT_CHILD);
1102                 if (unlikely(lfsck_is_dead_obj(obj))) {
1103                         dt_read_unlock(env, obj);
1104                         if (depth > 0)
1105                                 lfsck_object_put(env, obj);
1106                         return 0;
1107                 }
1108
1109                 rc = dt_xattr_get(env, obj,
1110                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
1111                                   BYPASS_CAPA);
1112                 dt_read_unlock(env, obj);
1113                 if (rc >= 0) {
1114                         if (depth > 0)
1115                                 lfsck_object_put(env, obj);
1116                         return 1;
1117                 }
1118
1119                 if (rc < 0 && rc != -ENODATA) {
1120                         if (depth > 0)
1121                                 lfsck_object_put(env, obj);
1122                         return rc;
1123                 }
1124
1125                 rc = lfsck_parent_fid(env, obj, fid);
1126                 if (depth > 0)
1127                         lfsck_object_put(env, obj);
1128                 if (rc != 0)
1129                         return rc;
1130
1131                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
1132                         return 0;
1133
1134                 obj = lfsck_object_find(env, lfsck, fid);
1135                 if (obj == NULL)
1136                         return 0;
1137                 else if (IS_ERR(obj))
1138                         return PTR_ERR(obj);
1139
1140                 if (!dt_object_exists(obj)) {
1141                         lfsck_object_put(env, obj);
1142                         return 0;
1143                 }
1144
1145                 /* Currently, only client visible directory can be remote. */
1146                 if (dt_object_remote(obj)) {
1147                         lfsck_object_put(env, obj);
1148                         return 1;
1149                 }
1150
1151                 depth++;
1152         }
1153         return 0;
1154 }
1155
1156 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1157                                                  struct lfsck_component *com,
1158                                                  struct lfsck_start_param *lsp)
1159 {
1160         struct lfsck_thread_args *lta;
1161         int                       rc;
1162
1163         OBD_ALLOC_PTR(lta);
1164         if (lta == NULL)
1165                 return ERR_PTR(-ENOMEM);
1166
1167         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1168         if (rc != 0) {
1169                 OBD_FREE_PTR(lta);
1170                 return ERR_PTR(rc);
1171         }
1172
1173         lta->lta_lfsck = lfsck_instance_get(lfsck);
1174         if (com != NULL)
1175                 lta->lta_com = lfsck_component_get(com);
1176
1177         lta->lta_lsp = lsp;
1178
1179         return lta;
1180 }
1181
1182 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1183 {
1184         if (lta->lta_com != NULL)
1185                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1186         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1187         lu_env_fini(&lta->lta_env);
1188         OBD_FREE_PTR(lta);
1189 }
1190
1191 /* LFSCK wrap functions */
1192
1193 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
1194                 bool new_checked)
1195 {
1196         struct lfsck_component *com;
1197
1198         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1199                 com->lc_ops->lfsck_fail(env, com, new_checked);
1200         }
1201 }
1202
1203 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
1204 {
1205         struct lfsck_component *com;
1206         int                     rc  = 0;
1207         int                     rc1 = 0;
1208
1209         if (likely(cfs_time_beforeq(cfs_time_current(),
1210                                     lfsck->li_time_next_checkpoint)))
1211                 return 0;
1212
1213         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1214         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1215                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
1216                 if (rc != 0)
1217                         rc1 = rc;
1218         }
1219
1220         lfsck->li_time_last_checkpoint = cfs_time_current();
1221         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1222                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1223         return rc1 != 0 ? rc1 : rc;
1224 }
1225
1226 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
1227                struct lfsck_start_param *lsp)
1228 {
1229         struct dt_object       *obj     = NULL;
1230         struct lfsck_component *com;
1231         struct lfsck_component *next;
1232         struct lfsck_position  *pos     = NULL;
1233         const struct dt_it_ops *iops    =
1234                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
1235         struct dt_it           *di;
1236         int                     rc;
1237         ENTRY;
1238
1239         LASSERT(lfsck->li_obj_dir == NULL);
1240         LASSERT(lfsck->li_di_dir == NULL);
1241
1242         lfsck->li_current_oit_processed = 0;
1243         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1244                 com->lc_new_checked = 0;
1245                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1246                         com->lc_journal = 0;
1247
1248                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
1249                 if (rc != 0)
1250                         GOTO(out, rc);
1251
1252                 if ((pos == NULL) ||
1253                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
1254                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
1255                         pos = &com->lc_pos_start;
1256         }
1257
1258         /* Init otable-based iterator. */
1259         if (pos == NULL) {
1260                 rc = iops->load(env, lfsck->li_di_oit, 0);
1261                 if (rc > 0) {
1262                         lfsck->li_oit_over = 1;
1263                         rc = 0;
1264                 }
1265
1266                 GOTO(out, rc);
1267         }
1268
1269         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
1270         if (rc < 0)
1271                 GOTO(out, rc);
1272         else if (rc > 0)
1273                 lfsck->li_oit_over = 1;
1274
1275         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
1276                 GOTO(out, rc = 0);
1277
1278         /* Find the directory for namespace-based traverse. */
1279         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
1280         if (obj == NULL)
1281                 GOTO(out, rc = 0);
1282         else if (IS_ERR(obj))
1283                 RETURN(PTR_ERR(obj));
1284
1285         /* XXX: Currently, skip remote object, the consistency for
1286          *      remote object will be processed in LFSCK phase III. */
1287         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
1288             unlikely(!S_ISDIR(lfsck_object_type(obj))))
1289                 GOTO(out, rc = 0);
1290
1291         if (unlikely(!dt_try_as_dir(env, obj)))
1292                 GOTO(out, rc = -ENOTDIR);
1293
1294         /* Init the namespace-based directory traverse. */
1295         iops = &obj->do_index_ops->dio_it;
1296         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1297         if (IS_ERR(di))
1298                 GOTO(out, rc = PTR_ERR(di));
1299
1300         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
1301
1302         rc = iops->load(env, di, pos->lp_dir_cookie);
1303         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
1304                 rc = iops->next(env, di);
1305         else if (rc > 0)
1306                 rc = 0;
1307
1308         if (rc != 0) {
1309                 iops->put(env, di);
1310                 iops->fini(env, di);
1311                 GOTO(out, rc);
1312         }
1313
1314         lfsck->li_obj_dir = lfsck_object_get(obj);
1315         lfsck->li_cookie_dir = iops->store(env, di);
1316         spin_lock(&lfsck->li_lock);
1317         lfsck->li_di_dir = di;
1318         spin_unlock(&lfsck->li_lock);
1319
1320         GOTO(out, rc = 0);
1321
1322 out:
1323         if (obj != NULL)
1324                 lfsck_object_put(env, obj);
1325
1326         if (rc < 0) {
1327                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1328                                              lc_link)
1329                         com->lc_ops->lfsck_post(env, com, rc, true);
1330
1331                 return rc;
1332         }
1333
1334         rc = 0;
1335         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
1336         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1337                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
1338                 if (rc != 0)
1339                         break;
1340         }
1341
1342         lfsck->li_time_last_checkpoint = cfs_time_current();
1343         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1344                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1345         return rc;
1346 }
1347
1348 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
1349                    struct dt_object *obj)
1350 {
1351         struct lfsck_component *com;
1352         const struct dt_it_ops *iops;
1353         struct dt_it           *di;
1354         int                     rc;
1355         ENTRY;
1356
1357         LASSERT(lfsck->li_obj_dir == NULL);
1358
1359         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1360                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
1361                 if (rc != 0)
1362                         RETURN(rc);
1363         }
1364
1365         rc = lfsck_needs_scan_dir(env, lfsck, obj);
1366         if (rc <= 0)
1367                 GOTO(out, rc);
1368
1369         if (unlikely(!dt_try_as_dir(env, obj)))
1370                 GOTO(out, rc = -ENOTDIR);
1371
1372         iops = &obj->do_index_ops->dio_it;
1373         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1374         if (IS_ERR(di))
1375                 GOTO(out, rc = PTR_ERR(di));
1376
1377         rc = iops->load(env, di, 0);
1378         if (rc == 0)
1379                 rc = iops->next(env, di);
1380         else if (rc > 0)
1381                 rc = 0;
1382
1383         if (rc != 0) {
1384                 iops->put(env, di);
1385                 iops->fini(env, di);
1386                 GOTO(out, rc);
1387         }
1388
1389         lfsck->li_obj_dir = lfsck_object_get(obj);
1390         lfsck->li_cookie_dir = iops->store(env, di);
1391         spin_lock(&lfsck->li_lock);
1392         lfsck->li_di_dir = di;
1393         spin_unlock(&lfsck->li_lock);
1394
1395         GOTO(out, rc = 0);
1396
1397 out:
1398         if (rc < 0)
1399                 lfsck_fail(env, lfsck, false);
1400         return (rc > 0 ? 0 : rc);
1401 }
1402
1403 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
1404                    struct dt_object *obj, struct lu_dirent *ent)
1405 {
1406         struct lfsck_component *com;
1407         int                     rc;
1408
1409         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1410                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
1411                 if (rc != 0)
1412                         return rc;
1413         }
1414         return 0;
1415 }
1416
1417 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
1418                int result)
1419 {
1420         struct lfsck_component *com;
1421         struct lfsck_component *next;
1422         int                     rc  = 0;
1423         int                     rc1 = 0;
1424
1425         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1426         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1427                 rc = com->lc_ops->lfsck_post(env, com, result, false);
1428                 if (rc != 0)
1429                         rc1 = rc;
1430         }
1431
1432         lfsck->li_time_last_checkpoint = cfs_time_current();
1433         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1434                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1435
1436         /* Ignore some component post failure to make other can go ahead. */
1437         return result;
1438 }
1439
1440 static void lfsck_interpret(const struct lu_env *env,
1441                             struct lfsck_instance *lfsck,
1442                             struct ptlrpc_request *req, void *args, int result)
1443 {
1444         struct lfsck_async_interpret_args *laia = args;
1445         struct lfsck_component            *com;
1446
1447         LASSERT(laia->laia_com == NULL);
1448         LASSERT(laia->laia_shared);
1449
1450         spin_lock(&lfsck->li_lock);
1451         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1452                 if (com->lc_ops->lfsck_interpret != NULL) {
1453                         laia->laia_com = com;
1454                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1455                 }
1456         }
1457
1458         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1459                 if (com->lc_ops->lfsck_interpret != NULL) {
1460                         laia->laia_com = com;
1461                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1462                 }
1463         }
1464         spin_unlock(&lfsck->li_lock);
1465 }
1466
1467 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
1468 {
1469         struct lfsck_component *com;
1470         struct lfsck_component *next;
1471         struct l_wait_info      lwi = { 0 };
1472         int                     rc  = 0;
1473         int                     rc1 = 0;
1474
1475         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1476                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1477                         com->lc_journal = 0;
1478
1479                 rc = com->lc_ops->lfsck_double_scan(env, com);
1480                 if (rc != 0)
1481                         rc1 = rc;
1482         }
1483
1484         l_wait_event(lfsck->li_thread.t_ctl_waitq,
1485                      atomic_read(&lfsck->li_double_scan_count) == 0,
1486                      &lwi);
1487
1488         if (lfsck->li_status != LS_PAUSED &&
1489             lfsck->li_status != LS_CO_PAUSED) {
1490                 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1491                                          lc_link) {
1492                         spin_lock(&lfsck->li_lock);
1493                         list_del_init(&com->lc_link);
1494                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1495                         spin_unlock(&lfsck->li_lock);
1496                 }
1497         }
1498
1499         return rc1 != 0 ? rc1 : rc;
1500 }
1501
1502 static int lfsck_stop_notify(const struct lu_env *env,
1503                              struct lfsck_instance *lfsck,
1504                              struct lfsck_tgt_descs *ltds,
1505                              struct lfsck_tgt_desc *ltd, __u16 type)
1506 {
1507         struct ptlrpc_request_set *set;
1508         struct lfsck_component    *com;
1509         int                        rc  = 0;
1510         ENTRY;
1511
1512         spin_lock(&lfsck->li_lock);
1513         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1514         if (com == NULL)
1515                 com = __lfsck_component_find(lfsck, type,
1516                                              &lfsck->li_list_double_scan);
1517         if (com != NULL)
1518                 lfsck_component_get(com);
1519         spin_lock(&lfsck->li_lock);
1520
1521         if (com != NULL) {
1522                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1523                         set = ptlrpc_prep_set();
1524                         if (set == NULL) {
1525                                 lfsck_component_put(env, com);
1526
1527                                 RETURN(-ENOMEM);
1528                         }
1529
1530                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1531                                                             ltd, set);
1532                         if (rc == 0)
1533                                 rc = ptlrpc_set_wait(set);
1534
1535                         ptlrpc_set_destroy(set);
1536                 }
1537
1538                 lfsck_component_put(env, com);
1539         }
1540
1541         RETURN(rc);
1542 }
1543
1544 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
1545 {
1546         struct lfsck_component *com;
1547         struct lfsck_component *next;
1548
1549         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1550                                  lc_link) {
1551                 if (com->lc_ops->lfsck_quit != NULL)
1552                         com->lc_ops->lfsck_quit(env, com);
1553
1554                 spin_lock(&lfsck->li_lock);
1555                 list_del_init(&com->lc_link);
1556                 list_del_init(&com->lc_link_dir);
1557                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1558                 spin_unlock(&lfsck->li_lock);
1559         }
1560
1561         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1562                                  lc_link) {
1563                 if (com->lc_ops->lfsck_quit != NULL)
1564                         com->lc_ops->lfsck_quit(env, com);
1565
1566                 spin_lock(&lfsck->li_lock);
1567                 list_del_init(&com->lc_link);
1568                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1569                 spin_unlock(&lfsck->li_lock);
1570         }
1571 }
1572
1573 static int lfsck_async_interpret(const struct lu_env *env,
1574                                  struct ptlrpc_request *req,
1575                                  void *args, int rc)
1576 {
1577         struct lfsck_async_interpret_args *laia = args;
1578         struct lfsck_instance             *lfsck;
1579
1580         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1581                               li_mdt_descs);
1582         lfsck_interpret(env, lfsck, req, laia, rc);
1583         lfsck_tgt_put(laia->laia_ltd);
1584         if (rc != 0 && laia->laia_result != -EALREADY)
1585                 laia->laia_result = rc;
1586
1587         return 0;
1588 }
1589
1590 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1591                         struct lfsck_request *lr,
1592                         struct ptlrpc_request_set *set,
1593                         ptlrpc_interpterer_t interpreter,
1594                         void *args, int request)
1595 {
1596         struct lfsck_async_interpret_args *laia;
1597         struct ptlrpc_request             *req;
1598         struct lfsck_request              *tmp;
1599         struct req_format                 *format;
1600         int                                rc;
1601
1602         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
1603                 return -EOPNOTSUPP;
1604
1605         switch (request) {
1606         case LFSCK_NOTIFY:
1607                 format = &RQF_LFSCK_NOTIFY;
1608                 break;
1609         case LFSCK_QUERY:
1610                 format = &RQF_LFSCK_QUERY;
1611                 break;
1612         default:
1613                 CERROR("%s: unknown async request: opc = %d\n",
1614                        exp->exp_obd->obd_name, request);
1615                 return -EINVAL;
1616         }
1617
1618         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1619         if (req == NULL)
1620                 return -ENOMEM;
1621
1622         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1623         if (rc != 0) {
1624                 ptlrpc_request_free(req);
1625
1626                 return rc;
1627         }
1628
1629         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1630         *tmp = *lr;
1631         ptlrpc_request_set_replen(req);
1632
1633         laia = ptlrpc_req_async_args(req);
1634         *laia = *(struct lfsck_async_interpret_args *)args;
1635         if (laia->laia_com != NULL)
1636                 lfsck_component_get(laia->laia_com);
1637         req->rq_interpret_reply = interpreter;
1638         ptlrpc_set_add_req(set, req);
1639
1640         return 0;
1641 }
1642
1643 /* external interfaces */
1644
1645 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
1646 {
1647         struct lu_env           env;
1648         struct lfsck_instance  *lfsck;
1649         int                     rc;
1650         ENTRY;
1651
1652         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1653         if (rc != 0)
1654                 RETURN(rc);
1655
1656         lfsck = lfsck_instance_find(key, true, false);
1657         if (likely(lfsck != NULL)) {
1658                 rc = snprintf(buf, len, "%u\n",
1659                               lfsck->li_bookmark_ram.lb_speed_limit);
1660                 lfsck_instance_put(&env, lfsck);
1661         } else {
1662                 rc = -ENXIO;
1663         }
1664
1665         lu_env_fini(&env);
1666
1667         RETURN(rc);
1668 }
1669 EXPORT_SYMBOL(lfsck_get_speed);
1670
1671 int lfsck_set_speed(struct dt_device *key, int val)
1672 {
1673         struct lu_env           env;
1674         struct lfsck_instance  *lfsck;
1675         int                     rc;
1676         ENTRY;
1677
1678         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1679         if (rc != 0)
1680                 RETURN(rc);
1681
1682         lfsck = lfsck_instance_find(key, true, false);
1683         if (likely(lfsck != NULL)) {
1684                 mutex_lock(&lfsck->li_mutex);
1685                 __lfsck_set_speed(lfsck, val);
1686                 rc = lfsck_bookmark_store(&env, lfsck);
1687                 mutex_unlock(&lfsck->li_mutex);
1688                 lfsck_instance_put(&env, lfsck);
1689         } else {
1690                 rc = -ENXIO;
1691         }
1692
1693         lu_env_fini(&env);
1694
1695         RETURN(rc);
1696 }
1697 EXPORT_SYMBOL(lfsck_set_speed);
1698
1699 int lfsck_get_windows(struct dt_device *key, void *buf, int len)
1700 {
1701         struct lu_env           env;
1702         struct lfsck_instance  *lfsck;
1703         int                     rc;
1704         ENTRY;
1705
1706         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1707         if (rc != 0)
1708                 RETURN(rc);
1709
1710         lfsck = lfsck_instance_find(key, true, false);
1711         if (likely(lfsck != NULL)) {
1712                 rc = snprintf(buf, len, "%u\n",
1713                               lfsck->li_bookmark_ram.lb_async_windows);
1714                 lfsck_instance_put(&env, lfsck);
1715         } else {
1716                 rc = -ENXIO;
1717         }
1718
1719         lu_env_fini(&env);
1720
1721         RETURN(rc);
1722 }
1723 EXPORT_SYMBOL(lfsck_get_windows);
1724
1725 int lfsck_set_windows(struct dt_device *key, int val)
1726 {
1727         struct lu_env           env;
1728         struct lfsck_instance  *lfsck;
1729         int                     rc;
1730         ENTRY;
1731
1732         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1733         if (rc != 0)
1734                 RETURN(rc);
1735
1736         lfsck = lfsck_instance_find(key, true, false);
1737         if (likely(lfsck != NULL)) {
1738                 if (val > LFSCK_ASYNC_WIN_MAX) {
1739                         CERROR("%s: Too large async windows size, which "
1740                                "may cause memory issues. The valid range "
1741                                "is [0 - %u]. If you do not want to restrict "
1742                                "the windows size for async requests pipeline, "
1743                                "just set it as 0.\n",
1744                                lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1745                         rc = -EINVAL;
1746                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1747                         mutex_lock(&lfsck->li_mutex);
1748                         lfsck->li_bookmark_ram.lb_async_windows = val;
1749                         rc = lfsck_bookmark_store(&env, lfsck);
1750                         mutex_unlock(&lfsck->li_mutex);
1751                 }
1752                 lfsck_instance_put(&env, lfsck);
1753         } else {
1754                 rc = -ENXIO;
1755         }
1756
1757         lu_env_fini(&env);
1758
1759         RETURN(rc);
1760 }
1761 EXPORT_SYMBOL(lfsck_set_windows);
1762
1763 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
1764 {
1765         struct lu_env           env;
1766         struct lfsck_instance  *lfsck;
1767         struct lfsck_component *com;
1768         int                     rc;
1769         ENTRY;
1770
1771         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1772         if (rc != 0)
1773                 RETURN(rc);
1774
1775         lfsck = lfsck_instance_find(key, true, false);
1776         if (likely(lfsck != NULL)) {
1777                 com = lfsck_component_find(lfsck, type);
1778                 if (likely(com != NULL)) {
1779                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
1780                         lfsck_component_put(&env, com);
1781                 } else {
1782                         rc = -ENOTSUPP;
1783                 }
1784
1785                 lfsck_instance_put(&env, lfsck);
1786         } else {
1787                 rc = -ENXIO;
1788         }
1789
1790         lu_env_fini(&env);
1791
1792         RETURN(rc);
1793 }
1794 EXPORT_SYMBOL(lfsck_dump);
1795
1796 static int lfsck_stop_all(const struct lu_env *env,
1797                           struct lfsck_instance *lfsck,
1798                           struct lfsck_stop *stop)
1799 {
1800         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1801         struct lfsck_request              *lr     = &info->lti_lr;
1802         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1803         struct ptlrpc_request_set         *set;
1804         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1805         struct lfsck_tgt_desc             *ltd;
1806         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1807         __u32                              idx;
1808         int                                rc     = 0;
1809         int                                rc1    = 0;
1810         ENTRY;
1811
1812         LASSERT(stop->ls_flags & LPF_BROADCAST);
1813
1814         set = ptlrpc_prep_set();
1815         if (unlikely(set == NULL)) {
1816                 CERROR("%s: cannot allocate memory for stop LFSCK on "
1817                        "all targets\n", lfsck_lfsck2name(lfsck));
1818
1819                 RETURN(-ENOMEM);
1820         }
1821
1822         memset(lr, 0, sizeof(*lr));
1823         lr->lr_event = LE_STOP;
1824         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1825         lr->lr_status = stop->ls_status;
1826         lr->lr_version = bk->lb_version;
1827         lr->lr_active = LFSCK_TYPES_ALL;
1828         lr->lr_param = stop->ls_flags;
1829
1830         laia->laia_com = NULL;
1831         laia->laia_ltds = ltds;
1832         laia->laia_lr = lr;
1833         laia->laia_result = 0;
1834         laia->laia_shared = 1;
1835
1836         down_read(&ltds->ltd_rw_sem);
1837         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1838                 ltd = lfsck_tgt_get(ltds, idx);
1839                 LASSERT(ltd != NULL);
1840
1841                 laia->laia_ltd = ltd;
1842                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1843                                          lfsck_async_interpret, laia,
1844                                          LFSCK_NOTIFY);
1845                 if (rc != 0) {
1846                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1847                         lfsck_tgt_put(ltd);
1848                         CWARN("%s: cannot notify MDT %x for LFSCK stop: "
1849                               "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1850                         rc1 = rc;
1851                 }
1852         }
1853         up_read(&ltds->ltd_rw_sem);
1854
1855         rc = ptlrpc_set_wait(set);
1856         ptlrpc_set_destroy(set);
1857
1858         if (rc == 0)
1859                 rc = laia->laia_result;
1860
1861         if (rc == -EALREADY)
1862                 rc = 0;
1863
1864         if (rc != 0)
1865                 CWARN("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1866                       lfsck_lfsck2name(lfsck), rc);
1867
1868         RETURN(rc != 0 ? rc : rc1);
1869 }
1870
1871 static int lfsck_start_all(const struct lu_env *env,
1872                            struct lfsck_instance *lfsck,
1873                            struct lfsck_start *start)
1874 {
1875         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1876         struct lfsck_request              *lr     = &info->lti_lr;
1877         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1878         struct ptlrpc_request_set         *set;
1879         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1880         struct lfsck_tgt_desc             *ltd;
1881         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1882         __u32                              idx;
1883         int                                rc     = 0;
1884         ENTRY;
1885
1886         LASSERT(start->ls_flags & LPF_BROADCAST);
1887
1888         set = ptlrpc_prep_set();
1889         if (unlikely(set == NULL)) {
1890                 if (bk->lb_param & LPF_FAILOUT) {
1891                         CERROR("%s: cannot allocate memory for start LFSCK on "
1892                                "all targets, failout.\n",
1893                                lfsck_lfsck2name(lfsck));
1894
1895                         RETURN(-ENOMEM);
1896                 } else {
1897                         CWARN("%s: cannot allocate memory for start LFSCK on "
1898                               "all targets, partly scan.\n",
1899                               lfsck_lfsck2name(lfsck));
1900
1901                         RETURN(0);
1902                 }
1903         }
1904
1905         memset(lr, 0, sizeof(*lr));
1906         lr->lr_event = LE_START;
1907         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1908         lr->lr_speed = bk->lb_speed_limit;
1909         lr->lr_version = bk->lb_version;
1910         lr->lr_active = start->ls_active;
1911         lr->lr_param = start->ls_flags;
1912         lr->lr_async_windows = bk->lb_async_windows;
1913         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1914                        LSV_ASYNC_WINDOWS;
1915
1916         laia->laia_com = NULL;
1917         laia->laia_ltds = ltds;
1918         laia->laia_lr = lr;
1919         laia->laia_result = 0;
1920         laia->laia_shared = 1;
1921
1922         down_read(&ltds->ltd_rw_sem);
1923         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1924                 ltd = lfsck_tgt_get(ltds, idx);
1925                 LASSERT(ltd != NULL);
1926
1927                 laia->laia_ltd = ltd;
1928                 ltd->ltd_layout_done = 0;
1929                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1930                                          lfsck_async_interpret, laia,
1931                                          LFSCK_NOTIFY);
1932                 if (rc != 0) {
1933                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1934                         lfsck_tgt_put(ltd);
1935                         if (bk->lb_param & LPF_FAILOUT) {
1936                                 CERROR("%s: cannot notify MDT %x for LFSCK "
1937                                        "start, failout: rc = %d\n",
1938                                        lfsck_lfsck2name(lfsck), idx, rc);
1939                                 break;
1940                         } else {
1941                                 CWARN("%s: cannot notify MDT %x for LFSCK "
1942                                       "start, partly scan: rc = %d\n",
1943                                       lfsck_lfsck2name(lfsck), idx, rc);
1944                                 rc = 0;
1945                         }
1946                 }
1947         }
1948         up_read(&ltds->ltd_rw_sem);
1949
1950         if (rc != 0) {
1951                 ptlrpc_set_destroy(set);
1952
1953                 RETURN(rc);
1954         }
1955
1956         rc = ptlrpc_set_wait(set);
1957         ptlrpc_set_destroy(set);
1958
1959         if (rc == 0)
1960                 rc = laia->laia_result;
1961
1962         if (rc != 0) {
1963                 if (bk->lb_param & LPF_FAILOUT) {
1964                         struct lfsck_stop *stop = &info->lti_stop;
1965
1966                         CERROR("%s: cannot start LFSCK on some MDTs, "
1967                                "stop all: rc = %d\n",
1968                                lfsck_lfsck2name(lfsck), rc);
1969                         if (rc != -EALREADY) {
1970                                 stop->ls_status = LS_FAILED;
1971                                 stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1972                                 lfsck_stop_all(env, lfsck, stop);
1973                         }
1974                 } else {
1975                         CWARN("%s: cannot start LFSCK on some MDTs, "
1976                               "partly scan: rc = %d\n",
1977                               lfsck_lfsck2name(lfsck), rc);
1978                         rc = 0;
1979                 }
1980         }
1981
1982         RETURN(rc);
1983 }
1984
1985 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1986                 struct lfsck_start_param *lsp)
1987 {
1988         struct lfsck_start              *start  = lsp->lsp_start;
1989         struct lfsck_instance           *lfsck;
1990         struct lfsck_bookmark           *bk;
1991         struct ptlrpc_thread            *thread;
1992         struct lfsck_component          *com;
1993         struct l_wait_info               lwi    = { 0 };
1994         struct lfsck_thread_args        *lta;
1995         bool                             dirty  = false;
1996         long                             rc     = 0;
1997         __u16                            valid  = 0;
1998         __u16                            flags  = 0;
1999         __u16                            type   = 1;
2000         ENTRY;
2001
2002         lfsck = lfsck_instance_find(key, true, false);
2003         if (unlikely(lfsck == NULL))
2004                 RETURN(-ENXIO);
2005
2006         /* System is not ready, try again later. */
2007         if (unlikely(lfsck->li_namespace == NULL))
2008                 GOTO(put, rc = -EAGAIN);
2009
2010         /* start == NULL means auto trigger paused LFSCK. */
2011         if ((start == NULL) &&
2012             (cfs_list_empty(&lfsck->li_list_scan) ||
2013              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2014                 GOTO(put, rc = 0);
2015
2016         bk = &lfsck->li_bookmark_ram;
2017         thread = &lfsck->li_thread;
2018         mutex_lock(&lfsck->li_mutex);
2019         spin_lock(&lfsck->li_lock);
2020         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2021                 rc = -EALREADY;
2022                 while (start->ls_active != 0) {
2023                         if (!(type & start->ls_active)) {
2024                                 type <<= 1;
2025                                 continue;
2026                         }
2027
2028                         com = __lfsck_component_find(lfsck, type,
2029                                                      &lfsck->li_list_scan);
2030                         if (com == NULL)
2031                                 com = __lfsck_component_find(lfsck, type,
2032                                                 &lfsck->li_list_double_scan);
2033                         if (com == NULL) {
2034                                 rc = -EOPNOTSUPP;
2035                                 break;
2036                         }
2037
2038                         if (com->lc_ops->lfsck_join != NULL) {
2039                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2040                                 if (rc != 0 && rc != -EALREADY)
2041                                         break;
2042                         }
2043                         start->ls_active &= ~type;
2044                         type <<= 1;
2045                 }
2046                 spin_unlock(&lfsck->li_lock);
2047                 GOTO(out, rc);
2048         }
2049         spin_unlock(&lfsck->li_lock);
2050
2051         lfsck->li_status = 0;
2052         lfsck->li_oit_over = 0;
2053         lfsck->li_start_unplug = 0;
2054         lfsck->li_drop_dryrun = 0;
2055         lfsck->li_new_scanned = 0;
2056
2057         /* For auto trigger. */
2058         if (start == NULL)
2059                 goto trigger;
2060
2061         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2062                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2063                        lfsck_lfsck2name(lfsck));
2064
2065                 GOTO(out, rc = -EPERM);
2066         }
2067
2068         start->ls_version = bk->lb_version;
2069         if (start->ls_valid & LSV_SPEED_LIMIT) {
2070                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
2071                 dirty = true;
2072         }
2073
2074         if (start->ls_valid & LSV_ASYNC_WINDOWS &&
2075             bk->lb_async_windows != start->ls_async_windows) {
2076                 bk->lb_async_windows = start->ls_async_windows;
2077                 dirty = true;
2078         }
2079
2080         if (start->ls_valid & LSV_ERROR_HANDLE) {
2081                 valid |= DOIV_ERROR_HANDLE;
2082                 if (start->ls_flags & LPF_FAILOUT)
2083                         flags |= DOIF_FAILOUT;
2084
2085                 if ((start->ls_flags & LPF_FAILOUT) &&
2086                     !(bk->lb_param & LPF_FAILOUT)) {
2087                         bk->lb_param |= LPF_FAILOUT;
2088                         dirty = true;
2089                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
2090                            (bk->lb_param & LPF_FAILOUT)) {
2091                         bk->lb_param &= ~LPF_FAILOUT;
2092                         dirty = true;
2093                 }
2094         }
2095
2096         if (start->ls_valid & LSV_DRYRUN) {
2097                 valid |= DOIV_DRYRUN;
2098                 if (start->ls_flags & LPF_DRYRUN)
2099                         flags |= DOIF_DRYRUN;
2100
2101                 if ((start->ls_flags & LPF_DRYRUN) &&
2102                     !(bk->lb_param & LPF_DRYRUN)) {
2103                         bk->lb_param |= LPF_DRYRUN;
2104                         dirty = true;
2105                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
2106                            (bk->lb_param & LPF_DRYRUN)) {
2107                         bk->lb_param &= ~LPF_DRYRUN;
2108                         lfsck->li_drop_dryrun = 1;
2109                         dirty = true;
2110                 }
2111         }
2112
2113         if (bk->lb_param & LPF_ALL_TGT &&
2114             !(start->ls_flags & LPF_ALL_TGT)) {
2115                 bk->lb_param &= ~LPF_ALL_TGT;
2116                 dirty = true;
2117         } else if (!(bk->lb_param & LPF_ALL_TGT) &&
2118                    start->ls_flags & LPF_ALL_TGT) {
2119                 bk->lb_param |= LPF_ALL_TGT;
2120                 dirty = true;
2121         }
2122
2123         if (bk->lb_param & LPF_ORPHAN &&
2124             !(start->ls_flags & LPF_ORPHAN)) {
2125                 bk->lb_param &= ~LPF_ORPHAN;
2126                 dirty = true;
2127         } else if (!(bk->lb_param & LPF_ORPHAN) &&
2128                    start->ls_flags & LPF_ORPHAN) {
2129                 bk->lb_param |= LPF_ORPHAN;
2130                 dirty = true;
2131         }
2132
2133         if (dirty) {
2134                 rc = lfsck_bookmark_store(env, lfsck);
2135                 if (rc != 0)
2136                         GOTO(out, rc);
2137         }
2138
2139         if (start->ls_flags & LPF_RESET)
2140                 flags |= DOIF_RESET;
2141
2142         if (start->ls_active != 0) {
2143                 struct lfsck_component *next;
2144
2145                 if (start->ls_active == LFSCK_TYPES_ALL)
2146                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2147
2148                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2149                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2150                         GOTO(out, rc = -ENOTSUPP);
2151                 }
2152
2153                 cfs_list_for_each_entry_safe(com, next,
2154                                              &lfsck->li_list_scan, lc_link) {
2155                         if (!(com->lc_type & start->ls_active)) {
2156                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2157                                                              false);
2158                                 if (rc != 0)
2159                                         GOTO(out, rc);
2160                         }
2161                 }
2162
2163                 while (start->ls_active != 0) {
2164                         if (type & start->ls_active) {
2165                                 com = __lfsck_component_find(lfsck, type,
2166                                                         &lfsck->li_list_idle);
2167                                 if (com != NULL) {
2168                                         /* The component status will be updated
2169                                          * when its prep() is called later by
2170                                          * the LFSCK main engine. */
2171                                         cfs_list_del_init(&com->lc_link);
2172                                         cfs_list_add_tail(&com->lc_link,
2173                                                           &lfsck->li_list_scan);
2174                                 }
2175                                 start->ls_active &= ~type;
2176                         }
2177                         type <<= 1;
2178                 }
2179         }
2180
2181         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2182                 start->ls_active |= com->lc_type;
2183                 if (flags & DOIF_RESET) {
2184                         rc = com->lc_ops->lfsck_reset(env, com, false);
2185                         if (rc != 0)
2186                                 GOTO(out, rc);
2187                 }
2188         }
2189
2190 trigger:
2191         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
2192         if (bk->lb_param & LPF_DRYRUN) {
2193                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2194                 valid |= DOIV_DRYRUN;
2195                 flags |= DOIF_DRYRUN;
2196         }
2197
2198         if (bk->lb_param & LPF_FAILOUT) {
2199                 valid |= DOIV_ERROR_HANDLE;
2200                 flags |= DOIF_FAILOUT;
2201         }
2202
2203         if (!cfs_list_empty(&lfsck->li_list_scan))
2204                 flags |= DOIF_OUTUSED;
2205
2206         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2207         thread_set_flags(thread, 0);
2208         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2209         if (IS_ERR(lta))
2210                 GOTO(out, rc = PTR_ERR(lta));
2211
2212         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
2213         if (IS_ERR_VALUE(rc)) {
2214                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
2215                        lfsck_lfsck2name(lfsck), rc);
2216                 lfsck_thread_args_fini(lta);
2217
2218                 GOTO(out, rc);
2219         }
2220
2221         l_wait_event(thread->t_ctl_waitq,
2222                      thread_is_running(thread) ||
2223                      thread_is_stopped(thread),
2224                      &lwi);
2225         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2226                 lfsck->li_start_unplug = 1;
2227                 wake_up_all(&thread->t_ctl_waitq);
2228
2229                 GOTO(out, rc = 0);
2230         }
2231
2232         /* release lfsck::li_mutex to avoid deadlock. */
2233         mutex_unlock(&lfsck->li_mutex);
2234         rc = lfsck_start_all(env, lfsck, start);
2235         if (rc != 0) {
2236                 spin_lock(&lfsck->li_lock);
2237                 if (thread_is_stopped(thread)) {
2238                         spin_unlock(&lfsck->li_lock);
2239                 } else {
2240                         lfsck->li_status = LS_FAILED;
2241                         lfsck->li_flags = 0;
2242                         thread_set_flags(thread, SVC_STOPPING);
2243                         spin_unlock(&lfsck->li_lock);
2244
2245                         lfsck->li_start_unplug = 1;
2246                         wake_up_all(&thread->t_ctl_waitq);
2247                         l_wait_event(thread->t_ctl_waitq,
2248                                      thread_is_stopped(thread),
2249                                      &lwi);
2250                 }
2251         } else {
2252                 lfsck->li_start_unplug = 1;
2253                 wake_up_all(&thread->t_ctl_waitq);
2254         }
2255
2256         GOTO(put, rc);
2257
2258 out:
2259         mutex_unlock(&lfsck->li_mutex);
2260
2261 put:
2262         lfsck_instance_put(env, lfsck);
2263
2264         return rc < 0 ? rc : 0;
2265 }
2266 EXPORT_SYMBOL(lfsck_start);
2267
2268 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2269                struct lfsck_stop *stop)
2270 {
2271         struct lfsck_instance   *lfsck;
2272         struct ptlrpc_thread    *thread;
2273         struct l_wait_info       lwi    = { 0 };
2274         int                      rc     = 0;
2275         int                      rc1    = 0;
2276         ENTRY;
2277
2278         lfsck = lfsck_instance_find(key, true, false);
2279         if (unlikely(lfsck == NULL))
2280                 RETURN(-ENXIO);
2281
2282         thread = &lfsck->li_thread;
2283         /* release lfsck::li_mutex to avoid deadlock. */
2284         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2285                 if (!lfsck->li_master) {
2286                         CERROR("%s: only allow to specify '-A' via MDS\n",
2287                                lfsck_lfsck2name(lfsck));
2288
2289                         GOTO(out, rc = -EPERM);
2290                 }
2291
2292                 rc1 = lfsck_stop_all(env, lfsck, stop);
2293         }
2294
2295         mutex_lock(&lfsck->li_mutex);
2296         spin_lock(&lfsck->li_lock);
2297         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2298                 spin_unlock(&lfsck->li_lock);
2299                 GOTO(out, rc = -EALREADY);
2300         }
2301
2302         if (stop != NULL) {
2303                 lfsck->li_status = stop->ls_status;
2304                 lfsck->li_flags = stop->ls_flags;
2305         } else {
2306                 lfsck->li_status = LS_STOPPED;
2307                 lfsck->li_flags = 0;
2308         }
2309
2310         thread_set_flags(thread, SVC_STOPPING);
2311         spin_unlock(&lfsck->li_lock);
2312
2313         wake_up_all(&thread->t_ctl_waitq);
2314         l_wait_event(thread->t_ctl_waitq,
2315                      thread_is_stopped(thread),
2316                      &lwi);
2317
2318         GOTO(out, rc = 0);
2319
2320 out:
2321         mutex_unlock(&lfsck->li_mutex);
2322         lfsck_instance_put(env, lfsck);
2323
2324         return rc != 0 ? rc : rc1;
2325 }
2326 EXPORT_SYMBOL(lfsck_stop);
2327
2328 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2329                     struct lfsck_request *lr)
2330 {
2331         int rc = -EOPNOTSUPP;
2332         ENTRY;
2333
2334         switch (lr->lr_event) {
2335         case LE_START: {
2336                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2337                 struct lfsck_start_param  lsp;
2338
2339                 memset(start, 0, sizeof(*start));
2340                 start->ls_valid = lr->lr_valid;
2341                 start->ls_speed_limit = lr->lr_speed;
2342                 start->ls_version = lr->lr_version;
2343                 start->ls_active = lr->lr_active;
2344                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2345                 start->ls_async_windows = lr->lr_async_windows;
2346
2347                 lsp.lsp_start = start;
2348                 lsp.lsp_index = lr->lr_index;
2349                 lsp.lsp_index_valid = 1;
2350                 rc = lfsck_start(env, key, &lsp);
2351                 break;
2352         }
2353         case LE_STOP: {
2354                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2355
2356                 memset(stop, 0, sizeof(*stop));
2357                 stop->ls_status = lr->lr_status;
2358                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2359                 rc = lfsck_stop(env, key, stop);
2360                 break;
2361         }
2362         case LE_PHASE1_DONE:
2363         case LE_PHASE2_DONE:
2364         case LE_FID_ACCESSED:
2365         case LE_PEER_EXIT:
2366         case LE_CONDITIONAL_DESTROY:
2367         case LE_PAIRS_VERIFY: {
2368                 struct lfsck_instance  *lfsck;
2369                 struct lfsck_component *com;
2370
2371                 lfsck = lfsck_instance_find(key, true, false);
2372                 if (unlikely(lfsck == NULL))
2373                         RETURN(-ENXIO);
2374
2375                 com = lfsck_component_find(lfsck, lr->lr_active);
2376                 if (likely(com != NULL)) {
2377                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2378                         lfsck_component_put(env, com);
2379                 }
2380
2381                 lfsck_instance_put(env, lfsck);
2382                 break;
2383         }
2384         default:
2385                 break;
2386         }
2387
2388         RETURN(rc);
2389 }
2390 EXPORT_SYMBOL(lfsck_in_notify);
2391
2392 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2393                 struct lfsck_request *lr)
2394 {
2395         struct lfsck_instance  *lfsck;
2396         struct lfsck_component *com;
2397         int                     rc;
2398         ENTRY;
2399
2400         lfsck = lfsck_instance_find(key, true, false);
2401         if (unlikely(lfsck == NULL))
2402                 RETURN(-ENXIO);
2403
2404         com = lfsck_component_find(lfsck, lr->lr_active);
2405         if (likely(com != NULL)) {
2406                 rc = com->lc_ops->lfsck_query(env, com);
2407                 lfsck_component_put(env, com);
2408         } else {
2409                 rc = -ENOTSUPP;
2410         }
2411
2412         lfsck_instance_put(env, lfsck);
2413
2414         RETURN(rc);
2415 }
2416 EXPORT_SYMBOL(lfsck_query);
2417
2418 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2419                              struct ldlm_namespace *ns)
2420 {
2421         struct lfsck_instance  *lfsck;
2422         int                     rc      = -ENXIO;
2423
2424         lfsck = lfsck_instance_find(key, true, false);
2425         if (likely(lfsck != NULL)) {
2426                 lfsck->li_namespace = ns;
2427                 lfsck_instance_put(env, lfsck);
2428                 rc = 0;
2429         }
2430
2431         return rc;
2432 }
2433 EXPORT_SYMBOL(lfsck_register_namespace);
2434
2435 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2436                    struct dt_device *next, struct obd_device *obd,
2437                    lfsck_out_notify notify, void *notify_data, bool master)
2438 {
2439         struct lfsck_instance   *lfsck;
2440         struct dt_object        *root  = NULL;
2441         struct dt_object        *obj;
2442         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2443         int                      rc;
2444         ENTRY;
2445
2446         lfsck = lfsck_instance_find(key, false, false);
2447         if (unlikely(lfsck != NULL))
2448                 RETURN(-EEXIST);
2449
2450         OBD_ALLOC_PTR(lfsck);
2451         if (lfsck == NULL)
2452                 RETURN(-ENOMEM);
2453
2454         mutex_init(&lfsck->li_mutex);
2455         spin_lock_init(&lfsck->li_lock);
2456         CFS_INIT_LIST_HEAD(&lfsck->li_link);
2457         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
2458         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
2459         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2460         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
2461         atomic_set(&lfsck->li_ref, 1);
2462         atomic_set(&lfsck->li_double_scan_count, 0);
2463         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2464         lfsck->li_out_notify = notify;
2465         lfsck->li_out_notify_data = notify_data;
2466         lfsck->li_next = next;
2467         lfsck->li_bottom = key;
2468         lfsck->li_obd = obd;
2469
2470         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2471         if (rc != 0)
2472                 GOTO(out, rc);
2473
2474         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2475         if (rc != 0)
2476                 GOTO(out, rc);
2477
2478         fid->f_seq = FID_SEQ_LOCAL_NAME;
2479         fid->f_oid = 1;
2480         fid->f_ver = 0;
2481         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
2482         if (rc != 0)
2483                 GOTO(out, rc);
2484
2485         rc = dt_root_get(env, key, fid);
2486         if (rc != 0)
2487                 GOTO(out, rc);
2488
2489         root = dt_locate(env, lfsck->li_bottom, fid);
2490         if (IS_ERR(root))
2491                 GOTO(out, rc = PTR_ERR(root));
2492
2493         if (unlikely(!dt_try_as_dir(env, root)))
2494                 GOTO(out, rc = -ENOTDIR);
2495
2496         lfsck->li_local_root_fid = *fid;
2497         if (master) {
2498                 lfsck->li_master = 1;
2499                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
2500                         rc = dt_lookup(env, root,
2501                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2502                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2503                         if (rc != 0)
2504                                 GOTO(out, rc);
2505                 }
2506         }
2507
2508         fid->f_seq = FID_SEQ_LOCAL_FILE;
2509         fid->f_oid = OTABLE_IT_OID;
2510         fid->f_ver = 0;
2511         obj = dt_locate(env, lfsck->li_bottom, fid);
2512         if (IS_ERR(obj))
2513                 GOTO(out, rc = PTR_ERR(obj));
2514
2515         lfsck->li_obj_oit = obj;
2516         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2517         if (rc != 0) {
2518                 if (rc == -ENOTSUPP)
2519                         GOTO(add, rc = 0);
2520
2521                 GOTO(out, rc);
2522         }
2523
2524         rc = lfsck_bookmark_setup(env, lfsck);
2525         if (rc != 0)
2526                 GOTO(out, rc);
2527
2528         if (master) {
2529                 rc = lfsck_fid_init(lfsck);
2530                 if (rc < 0)
2531                         GOTO(out, rc);
2532
2533                 rc = lfsck_namespace_setup(env, lfsck);
2534                 if (rc < 0)
2535                         GOTO(out, rc);
2536         }
2537
2538         rc = lfsck_layout_setup(env, lfsck);
2539         if (rc < 0)
2540                 GOTO(out, rc);
2541
2542         /* XXX: more LFSCK components initialization to be added here. */
2543
2544 add:
2545         rc = lfsck_instance_add(lfsck);
2546         if (rc == 0)
2547                 rc = lfsck_add_target_from_orphan(env, lfsck);
2548 out:
2549         if (root != NULL && !IS_ERR(root))
2550                 lu_object_put(env, &root->do_lu);
2551         if (rc != 0)
2552                 lfsck_instance_cleanup(env, lfsck);
2553         return rc;
2554 }
2555 EXPORT_SYMBOL(lfsck_register);
2556
2557 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2558 {
2559         struct lfsck_instance *lfsck;
2560
2561         lfsck = lfsck_instance_find(key, false, true);
2562         if (lfsck != NULL)
2563                 lfsck_instance_put(env, lfsck);
2564 }
2565 EXPORT_SYMBOL(lfsck_degister);
2566
2567 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2568                      struct dt_device *tgt, struct obd_export *exp,
2569                      __u32 index, bool for_ost)
2570 {
2571         struct lfsck_instance   *lfsck;
2572         struct lfsck_tgt_desc   *ltd;
2573         int                      rc;
2574         ENTRY;
2575
2576         OBD_ALLOC_PTR(ltd);
2577         if (ltd == NULL)
2578                 RETURN(-ENOMEM);
2579
2580         ltd->ltd_tgt = tgt;
2581         ltd->ltd_key = key;
2582         ltd->ltd_exp = exp;
2583         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2584         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2585         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2586         atomic_set(&ltd->ltd_ref, 1);
2587         ltd->ltd_index = index;
2588
2589         spin_lock(&lfsck_instance_lock);
2590         lfsck = __lfsck_instance_find(key, true, false);
2591         if (lfsck == NULL) {
2592                 if (for_ost)
2593                         list_add_tail(&ltd->ltd_orphan_list,
2594                                       &lfsck_ost_orphan_list);
2595                 else
2596                         list_add_tail(&ltd->ltd_orphan_list,
2597                                       &lfsck_mdt_orphan_list);
2598                 spin_unlock(&lfsck_instance_lock);
2599
2600                 RETURN(0);
2601         }
2602         spin_unlock(&lfsck_instance_lock);
2603
2604         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2605         if (rc != 0)
2606                 lfsck_tgt_put(ltd);
2607
2608         lfsck_instance_put(env, lfsck);
2609
2610         RETURN(rc);
2611 }
2612 EXPORT_SYMBOL(lfsck_add_target);
2613
2614 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2615                       struct dt_device *tgt, __u32 index, bool for_ost)
2616 {
2617         struct lfsck_instance   *lfsck;
2618         struct lfsck_tgt_descs  *ltds;
2619         struct lfsck_tgt_desc   *ltd    = NULL;
2620         struct list_head        *head;
2621
2622         if (for_ost)
2623                 head = &lfsck_ost_orphan_list;
2624         else
2625                 head = &lfsck_mdt_orphan_list;
2626
2627         spin_lock(&lfsck_instance_lock);
2628         list_for_each_entry(ltd, head, ltd_orphan_list) {
2629                 if (ltd->ltd_tgt == tgt) {
2630                         list_del_init(&ltd->ltd_orphan_list);
2631                         spin_unlock(&lfsck_instance_lock);
2632                         lfsck_tgt_put(ltd);
2633
2634                         return;
2635                 }
2636         }
2637
2638         lfsck = __lfsck_instance_find(key, true, false);
2639         spin_unlock(&lfsck_instance_lock);
2640         if (unlikely(lfsck == NULL))
2641                 return;
2642
2643         if (for_ost)
2644                 ltds = &lfsck->li_ost_descs;
2645         else
2646                 ltds = &lfsck->li_mdt_descs;
2647
2648         down_write(&ltds->ltd_rw_sem);
2649         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2650
2651         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2652                 goto unlock;
2653
2654         ltd = LTD_TGT(ltds, index);
2655         if (unlikely(ltd == NULL))
2656                 goto unlock;
2657
2658         LASSERT(ltds->ltd_tgtnr > 0);
2659
2660         ltds->ltd_tgtnr--;
2661         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2662         LTD_TGT(ltds, index) = NULL;
2663
2664 unlock:
2665         if (ltd == NULL) {
2666                 if (for_ost)
2667                         head = &lfsck->li_ost_descs.ltd_orphan;
2668                 else
2669                         head = &lfsck->li_ost_descs.ltd_orphan;
2670
2671                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2672                         if (ltd->ltd_tgt == tgt) {
2673                                 list_del_init(&ltd->ltd_orphan_list);
2674                                 break;
2675                         }
2676                 }
2677         }
2678
2679         up_write(&ltds->ltd_rw_sem);
2680         if (ltd != NULL) {
2681                 spin_lock(&ltds->ltd_lock);
2682                 ltd->ltd_dead = 1;
2683                 spin_unlock(&ltds->ltd_lock);
2684                 lfsck_stop_notify(env, lfsck, ltds, ltd, LT_LAYOUT);
2685                 lfsck_tgt_put(ltd);
2686         }
2687
2688         lfsck_instance_put(env, lfsck);
2689 }
2690 EXPORT_SYMBOL(lfsck_del_target);
2691
2692 static int __init lfsck_init(void)
2693 {
2694         int rc;
2695
2696         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2697         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2698         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2699         rc = lu_context_key_register(&lfsck_thread_key);
2700         if (rc == 0) {
2701                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2702                 tgt_register_lfsck_query(lfsck_query);
2703         }
2704
2705         return rc;
2706 }
2707
2708 static void __exit lfsck_exit(void)
2709 {
2710         struct lfsck_tgt_desc *ltd;
2711         struct lfsck_tgt_desc *next;
2712
2713         LASSERT(cfs_list_empty(&lfsck_instance_list));
2714
2715         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2716                                  ltd_orphan_list) {
2717                 list_del_init(&ltd->ltd_orphan_list);
2718                 lfsck_tgt_put(ltd);
2719         }
2720
2721         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2722                                  ltd_orphan_list) {
2723                 list_del_init(&ltd->ltd_orphan_list);
2724                 lfsck_tgt_put(ltd);
2725         }
2726
2727         lu_context_key_degister(&lfsck_thread_key);
2728 }
2729
2730 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2731 MODULE_DESCRIPTION("LFSCK");
2732 MODULE_LICENSE("GPL");
2733
2734 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);