Whamcloud - gitweb
85d39287f132aae9c450a9499402c84d245c280f
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static CFS_LIST_HEAD(lfsck_instance_list);
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         "broadcast",
96         "orphan",
97         "create_ostobj",
98         NULL
99 };
100
101 const char *lfsck_status2names(enum lfsck_status status)
102 {
103         if (unlikely(status < 0 || status >= LS_MAX))
104                 return "unknown";
105
106         return lfsck_status_names[status];
107 }
108
109 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
110 {
111         spin_lock_init(&ltds->ltd_lock);
112         init_rwsem(&ltds->ltd_rw_sem);
113         INIT_LIST_HEAD(&ltds->ltd_orphan);
114         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
115         if (ltds->ltd_tgts_bitmap == NULL)
116                 return -ENOMEM;
117
118         return 0;
119 }
120
121 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
122 {
123         struct lfsck_tgt_desc   *ltd;
124         struct lfsck_tgt_desc   *next;
125         int                      idx;
126
127         down_write(&ltds->ltd_rw_sem);
128
129         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
130                                  ltd_orphan_list) {
131                 list_del_init(&ltd->ltd_orphan_list);
132                 lfsck_tgt_put(ltd);
133         }
134
135         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
136                 up_write(&ltds->ltd_rw_sem);
137
138                 return;
139         }
140
141         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
142                 ltd = LTD_TGT(ltds, idx);
143                 if (likely(ltd != NULL)) {
144                         LASSERT(list_empty(&ltd->ltd_layout_list));
145                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
146
147                         ltds->ltd_tgtnr--;
148                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
149                         LTD_TGT(ltds, idx) = NULL;
150                         lfsck_tgt_put(ltd);
151                 }
152         }
153
154         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
155                  ltds->ltd_tgtnr);
156
157         for (idx = 0; idx < TGT_PTRS; idx++) {
158                 if (ltds->ltd_tgts_idx[idx] != NULL) {
159                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
160                         ltds->ltd_tgts_idx[idx] = NULL;
161                 }
162         }
163
164         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
165         ltds->ltd_tgts_bitmap = NULL;
166         up_write(&ltds->ltd_rw_sem);
167 }
168
169 static int __lfsck_add_target(const struct lu_env *env,
170                               struct lfsck_instance *lfsck,
171                               struct lfsck_tgt_desc *ltd,
172                               bool for_ost, bool locked)
173 {
174         struct lfsck_tgt_descs *ltds;
175         __u32                   index = ltd->ltd_index;
176         int                     rc    = 0;
177         ENTRY;
178
179         if (for_ost)
180                 ltds = &lfsck->li_ost_descs;
181         else
182                 ltds = &lfsck->li_mdt_descs;
183
184         if (!locked)
185                 down_write(&ltds->ltd_rw_sem);
186
187         LASSERT(ltds->ltd_tgts_bitmap != NULL);
188
189         if (index >= ltds->ltd_tgts_bitmap->size) {
190                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
191                                     (__u32)BITS_PER_LONG);
192                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
193                 cfs_bitmap_t *new_bitmap;
194
195                 while (newsize < index + 1)
196                         newsize <<= 1;
197
198                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
199                 if (new_bitmap == NULL)
200                         GOTO(unlock, rc = -ENOMEM);
201
202                 if (ltds->ltd_tgtnr > 0)
203                         cfs_bitmap_copy(new_bitmap, old_bitmap);
204                 ltds->ltd_tgts_bitmap = new_bitmap;
205                 CFS_FREE_BITMAP(old_bitmap);
206         }
207
208         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
209                 CERROR("%s: the device %s (%u) is registered already\n",
210                        lfsck_lfsck2name(lfsck),
211                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
212                 GOTO(unlock, rc = -EEXIST);
213         }
214
215         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
216                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
217                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
218                         GOTO(unlock, rc = -ENOMEM);
219         }
220
221         LTD_TGT(ltds, index) = ltd;
222         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
223         ltds->ltd_tgtnr++;
224
225         GOTO(unlock, rc = 0);
226
227 unlock:
228         if (!locked)
229                 up_write(&ltds->ltd_rw_sem);
230
231         return rc;
232 }
233
234 static int lfsck_add_target_from_orphan(const struct lu_env *env,
235                                         struct lfsck_instance *lfsck)
236 {
237         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
238         struct lfsck_tgt_desc   *ltd;
239         struct lfsck_tgt_desc   *next;
240         struct list_head        *head    = &lfsck_ost_orphan_list;
241         int                      rc;
242         bool                     for_ost = true;
243
244 again:
245         spin_lock(&lfsck_instance_lock);
246         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
247                 if (ltd->ltd_key == lfsck->li_bottom) {
248                         list_del_init(&ltd->ltd_orphan_list);
249                         list_add_tail(&ltd->ltd_orphan_list,
250                                       &ltds->ltd_orphan);
251                 }
252         }
253         spin_unlock(&lfsck_instance_lock);
254
255         down_write(&ltds->ltd_rw_sem);
256         while (!list_empty(&ltds->ltd_orphan)) {
257                 ltd = list_entry(ltds->ltd_orphan.next,
258                                  struct lfsck_tgt_desc,
259                                  ltd_orphan_list);
260                 list_del_init(&ltd->ltd_orphan_list);
261                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
262                 /* Do not hold the semaphore for too long time. */
263                 up_write(&ltds->ltd_rw_sem);
264                 if (rc != 0)
265                         return rc;
266
267                 down_write(&ltds->ltd_rw_sem);
268         }
269         up_write(&ltds->ltd_rw_sem);
270
271         if (for_ost) {
272                 ltds = &lfsck->li_mdt_descs;
273                 head = &lfsck_mdt_orphan_list;
274                 for_ost = false;
275                 goto again;
276         }
277
278         return 0;
279 }
280
281 static inline struct lfsck_component *
282 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
283 {
284         struct lfsck_component *com;
285
286         cfs_list_for_each_entry(com, list, lc_link) {
287                 if (com->lc_type == type)
288                         return com;
289         }
290         return NULL;
291 }
292
293 struct lfsck_component *
294 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
295 {
296         struct lfsck_component *com;
297
298         spin_lock(&lfsck->li_lock);
299         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
300         if (com != NULL)
301                 goto unlock;
302
303         com = __lfsck_component_find(lfsck, type,
304                                      &lfsck->li_list_double_scan);
305         if (com != NULL)
306                 goto unlock;
307
308         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
309
310 unlock:
311         if (com != NULL)
312                 lfsck_component_get(com);
313         spin_unlock(&lfsck->li_lock);
314         return com;
315 }
316
317 void lfsck_component_cleanup(const struct lu_env *env,
318                              struct lfsck_component *com)
319 {
320         if (!cfs_list_empty(&com->lc_link))
321                 cfs_list_del_init(&com->lc_link);
322         if (!cfs_list_empty(&com->lc_link_dir))
323                 cfs_list_del_init(&com->lc_link_dir);
324
325         lfsck_component_put(env, com);
326 }
327
328 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
329                     struct lu_fid *fid, bool locked)
330 {
331         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
332         int                      rc = 0;
333         ENTRY;
334
335         if (!locked)
336                 mutex_lock(&lfsck->li_mutex);
337
338         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
339         if (rc >= 0) {
340                 bk->lb_last_fid = *fid;
341                 /* We do not care about whether the subsequent sub-operations
342                  * failed or not. The worst case is that one FID is lost that
343                  * is not a big issue for the LFSCK since it is relative rare
344                  * for LFSCK create. */
345                 rc = lfsck_bookmark_store(env, lfsck);
346         }
347
348         if (!locked)
349                 mutex_unlock(&lfsck->li_mutex);
350
351         RETURN(rc);
352 }
353
354 static const char dot[] = ".";
355 static const char dotdot[] = "..";
356
357 static int lfsck_create_lpf_local(const struct lu_env *env,
358                                   struct lfsck_instance *lfsck,
359                                   struct dt_object *parent,
360                                   struct dt_object *child,
361                                   struct lu_attr *la,
362                                   struct dt_object_format *dof,
363                                   const char *name)
364 {
365         struct dt_device        *dev    = lfsck->li_bottom;
366         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
367         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
368         const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
369         struct thandle          *th     = NULL;
370         loff_t                   pos    = 0;
371         int                      len    = sizeof(struct lfsck_bookmark);
372         int                      rc     = 0;
373         ENTRY;
374
375         th = dt_trans_create(env, dev);
376         if (IS_ERR(th))
377                 RETURN(PTR_ERR(th));
378
379         /* 1a. create child */
380         rc = dt_declare_create(env, child, la, NULL, dof, th);
381         if (rc != 0)
382                 GOTO(stop, rc);
383
384         /* 2a. increase child nlink */
385         rc = dt_declare_ref_add(env, child, th);
386         if (rc != 0)
387                 GOTO(stop, rc);
388
389         /* 3a. insert name into parent dir */
390         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
391                                (const struct dt_key *)name, th);
392         if (rc != 0)
393                 GOTO(stop, rc);
394
395         /* 4a. increase parent nlink */
396         rc = dt_declare_ref_add(env, parent, th);
397         if (rc != 0)
398                 GOTO(stop, rc);
399
400         /* 5a. update bookmark */
401         rc = dt_declare_record_write(env, bk_obj,
402                                      lfsck_buf_get(env, bk, len), 0, th);
403         if (rc != 0)
404                 GOTO(stop, rc);
405
406         rc = dt_trans_start_local(env, dev, th);
407         if (rc != 0)
408                 GOTO(stop, rc);
409
410         dt_write_lock(env, child, 0);
411         /* 1b.1 create child */
412         rc = dt_create(env, child, la, NULL, dof, th);
413         if (rc != 0)
414                 GOTO(unlock, rc);
415
416         if (unlikely(!dt_try_as_dir(env, child)))
417                 GOTO(unlock, rc = -ENOTDIR);
418
419         /* 1b.2 insert dot into child dir */
420         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
421                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
422         if (rc != 0)
423                 GOTO(unlock, rc);
424
425         /* 1b.3 insert dotdot into child dir */
426         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
427                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
428         if (rc != 0)
429                 GOTO(unlock, rc);
430
431         /* 2b. increase child nlink */
432         rc = dt_ref_add(env, child, th);
433         dt_write_unlock(env, child);
434         if (rc != 0)
435                 GOTO(stop, rc);
436
437         /* 3b. insert name into parent dir */
438         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
439                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
440         if (rc != 0)
441                 GOTO(stop, rc);
442
443         dt_write_lock(env, parent, 0);
444         /* 4b. increase parent nlink */
445         rc = dt_ref_add(env, parent, th);
446         dt_write_unlock(env, parent);
447         if (rc != 0)
448                 GOTO(stop, rc);
449
450         bk->lb_lpf_fid = *cfid;
451         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
452
453         /* 5b. update bookmark */
454         rc = dt_record_write(env, bk_obj,
455                              lfsck_buf_get(env, bk, len), &pos, th);
456
457         GOTO(stop, rc);
458
459 unlock:
460         dt_write_unlock(env, child);
461
462 stop:
463         dt_trans_stop(env, dev, th);
464
465         return rc;
466 }
467
468 static int lfsck_create_lpf_remote(const struct lu_env *env,
469                                    struct lfsck_instance *lfsck,
470                                    struct dt_object *parent,
471                                    struct dt_object *child,
472                                    struct lu_attr *la,
473                                    struct dt_object_format *dof,
474                                    const char *name)
475 {
476         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
477         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
478         const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
479         struct thandle          *th     = NULL;
480         struct dt_device        *dev;
481         loff_t                   pos    = 0;
482         int                      len    = sizeof(struct lfsck_bookmark);
483         int                      rc     = 0;
484         ENTRY;
485
486         /* Create .lustre/lost+found/MDTxxxx. */
487
488         /* XXX: Currently, cross-MDT create operation needs to create the child
489          *      object firstly, then insert name into the parent directory. For
490          *      this case, the child object resides on current MDT (local), but
491          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
492          *      easy to contain all the sub-modifications orderly within single
493          *      transaction.
494          *
495          *      To avoid more inconsistency, we split the create operation into
496          *      two transactions:
497          *
498          *      1) create the child locally.
499          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
500          *         remotely and update the lfsck_bookmark::lb_lpf_fid locally.
501          *
502          *      If 1) done but 2) failed, then the worst case is that we lose
503          *      one object locally, which is not a big issue. (can be repaird
504          *      by LFSCK phase III) */
505
506         /* Transaction I: */
507
508         dev = lfsck->li_bottom;
509         th = dt_trans_create(env, dev);
510         if (IS_ERR(th))
511                 RETURN(PTR_ERR(th));
512
513         /* 1a. create child locally. */
514         rc = dt_declare_create(env, child, la, NULL, dof, th);
515         if (rc != 0)
516                 GOTO(stop, rc);
517
518         /* 2a. increase child nlink locally. */
519         rc = dt_declare_ref_add(env, child, th);
520         if (rc != 0)
521                 GOTO(stop, rc);
522
523         rc = dt_trans_start_local(env, dev, th);
524         if (rc != 0)
525                 GOTO(stop, rc);
526
527         dt_write_lock(env, child, 0);
528         /* 1b. create child locally. */
529         rc = dt_create(env, child, la, NULL, dof, th);
530         if (rc != 0)
531                 GOTO(unlock, rc);
532
533         if (unlikely(!dt_try_as_dir(env, child)))
534                 GOTO(unlock, rc = -ENOTDIR);
535
536         /* 2b.1 insert dot into child dir locally. */
537         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
538                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
539         if (rc != 0)
540                 GOTO(unlock, rc);
541
542         /* 2b.2 insert dotdot into child dir locally. */
543         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
544                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
545         if (rc != 0)
546                 GOTO(unlock, rc);
547
548         /* 2b.3 increase child nlink locally. */
549         rc = dt_ref_add(env, child, th);
550         dt_write_unlock(env, child);
551         dt_trans_stop(env, dev, th);
552         if (rc != 0)
553                 RETURN(rc);
554
555         /* Transaction II: */
556
557         dev = lfsck->li_next;
558         th = dt_trans_create(env, dev);
559         if (IS_ERR(th))
560                 RETURN(PTR_ERR(th));
561
562         /* 3a. insert name into parent dir remotely. */
563         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
564                                (const struct dt_key *)name, th);
565         if (rc != 0)
566                 GOTO(stop, rc);
567
568         /* 4a. increase parent nlink remotely. */
569         rc = dt_declare_ref_add(env, parent, th);
570         if (rc != 0)
571                 GOTO(stop, rc);
572
573         /* 5a. decrease child nlink for dotdot locally if former remote
574          *     update failed. */
575         rc = dt_declare_ref_del(env, child, th);
576         if (rc != 0)
577                 GOTO(stop, rc);
578
579         /* 6a. decrease child nlink for dot locally if former remote
580          *     update failed. */
581         rc = dt_declare_ref_del(env, child, th);
582         if (rc != 0)
583                 GOTO(stop, rc);
584
585         /* 7a. destroy child locally if former remote update failed. */
586         rc = dt_declare_destroy(env, child, th);
587         if (rc != 0)
588                 GOTO(stop, rc);
589
590         /* 8a. update bookmark locally. */
591         rc = dt_declare_record_write(env, bk_obj,
592                                      lfsck_buf_get(env, bk, len), 0, th);
593         if (rc != 0)
594                 GOTO(stop, rc);
595
596         rc = dt_trans_start(env, dev, th);
597         if (rc != 0)
598                 GOTO(stop, rc);
599
600         /* 3b. insert name into parent dir remotely. */
601         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
602                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
603         if (rc == 0) {
604                 dt_write_lock(env, parent, 0);
605                 /* 4b. increase parent nlink remotely. */
606                 rc = dt_ref_add(env, parent, th);
607                 dt_write_unlock(env, parent);
608         }
609         if (rc != 0) {
610                 /* 5b. decrease child nlink for dotdot locally. */
611                 dt_ref_del(env, child, th);
612                 /* 6b. decrease child nlink for dot locally. */
613                 dt_ref_del(env, child, th);
614                 /* 7b. destroy child locally. */
615                 dt_destroy(env, child, th);
616                 GOTO(stop, rc);
617         }
618
619         bk->lb_lpf_fid = *cfid;
620         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
621
622         /* 8b. update bookmark locally. */
623         rc = dt_record_write(env, bk_obj,
624                              lfsck_buf_get(env, bk, len), &pos, th);
625
626         GOTO(stop, rc);
627
628 unlock:
629         dt_write_unlock(env, child);
630 stop:
631         dt_trans_stop(env, dev, th);
632
633         return rc;
634 }
635
636 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
637  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
638  * only when it is required, such as orphan OST-objects repairing. */
639 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
640 {
641         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
642         struct lfsck_thread_info *info  = lfsck_env_info(env);
643         struct lu_fid            *cfid  = &info->lti_fid2;
644         struct lu_attr           *la    = &info->lti_la;
645         struct dt_object_format  *dof   = &info->lti_dof;
646         struct dt_object         *parent = NULL;
647         struct dt_object         *child = NULL;
648         char                      name[8];
649         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
650         int                       rc    = 0;
651         ENTRY;
652
653         LASSERT(lfsck->li_master);
654
655         sprintf(name, "MDT%04x", node);
656         if (node == 0) {
657                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
658                                                   &LU_LPF_FID);
659         } else {
660                 struct lfsck_tgt_desc *ltd;
661
662                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
663                 if (unlikely(ltd == NULL))
664                         RETURN(-ENXIO);
665
666                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
667                                                   &LU_LPF_FID);
668                 lfsck_tgt_put(ltd);
669         }
670         if (IS_ERR(parent))
671                 RETURN(PTR_ERR(parent));
672
673         if (unlikely(!dt_try_as_dir(env, parent)))
674                 GOTO(out, rc = -ENOTDIR);
675
676         mutex_lock(&lfsck->li_mutex);
677         if (lfsck->li_lpf_obj != NULL)
678                 GOTO(unlock, rc = 0);
679
680         if (fid_is_zero(&bk->lb_lpf_fid)) {
681                 /* There is corner case that: in former LFSCK scanning we have
682                  * created the .lustre/lost+found/MDTxxxx but failed to update
683                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
684                  * it from MDT0 firstly. */
685                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
686                                (const struct dt_key *)name, BYPASS_CAPA);
687                 if (rc != 0 && rc != -ENOENT)
688                         GOTO(unlock, rc);
689
690                 if (rc == 0) {
691                         bk->lb_lpf_fid = *cfid;
692                         rc = lfsck_bookmark_store(env, lfsck);
693                 } else {
694                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
695                 }
696                 if (rc != 0)
697                         GOTO(unlock, rc);
698         } else {
699                 *cfid = bk->lb_lpf_fid;
700         }
701
702         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
703         if (IS_ERR(child))
704                 GOTO(unlock, rc = PTR_ERR(child));
705
706         if (dt_object_exists(child) != 0) {
707                 if (unlikely(!dt_try_as_dir(env, child)))
708                         GOTO(unlock, rc = -ENOTDIR);
709
710                 lfsck->li_lpf_obj = child;
711                 GOTO(unlock, rc = 0);
712         }
713
714         memset(la, 0, sizeof(*la));
715         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
716         la->la_mode = S_IFDIR | S_IRWXU;
717         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
718                        LA_UID | LA_GID;
719         memset(dof, 0, sizeof(*dof));
720         dof->dof_type = dt_mode_to_dft(S_IFDIR);
721
722         if (node == 0)
723                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
724                                             dof, name);
725         else
726                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
727                                              dof, name);
728         if (rc == 0)
729                 lfsck->li_lpf_obj = child;
730
731         GOTO(unlock, rc);
732
733 unlock:
734         mutex_unlock(&lfsck->li_mutex);
735         if (rc != 0 && child != NULL && !IS_ERR(child))
736                 lu_object_put(env, &child->do_lu);
737 out:
738         if (parent != NULL && !IS_ERR(parent))
739                 lu_object_put(env, &parent->do_lu);
740
741         return rc;
742 }
743
744 static int lfsck_fid_init(struct lfsck_instance *lfsck)
745 {
746         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
747         struct seq_server_site  *ss;
748         char                    *prefix;
749         int                      rc     = 0;
750         ENTRY;
751
752         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
753         if (unlikely(ss == NULL))
754                 RETURN(-ENXIO);
755
756         OBD_ALLOC_PTR(lfsck->li_seq);
757         if (lfsck->li_seq == NULL)
758                 RETURN(-ENOMEM);
759
760         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
761         if (prefix == NULL)
762                 GOTO(out, rc = -ENOMEM);
763
764         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
765         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
766                              ss->ss_server_seq);
767         OBD_FREE(prefix, MAX_OBD_NAME + 7);
768         if (rc != 0)
769                 GOTO(out, rc);
770
771         if (fid_is_sane(&bk->lb_last_fid))
772                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
773
774         RETURN(0);
775
776 out:
777         OBD_FREE_PTR(lfsck->li_seq);
778         lfsck->li_seq = NULL;
779
780         return rc;
781 }
782
783 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
784 {
785         if (lfsck->li_seq != NULL) {
786                 seq_client_fini(lfsck->li_seq);
787                 OBD_FREE_PTR(lfsck->li_seq);
788                 lfsck->li_seq = NULL;
789         }
790 }
791
792 void lfsck_instance_cleanup(const struct lu_env *env,
793                             struct lfsck_instance *lfsck)
794 {
795         struct ptlrpc_thread    *thread = &lfsck->li_thread;
796         struct lfsck_component  *com;
797         ENTRY;
798
799         LASSERT(list_empty(&lfsck->li_link));
800         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
801
802         if (lfsck->li_obj_oit != NULL) {
803                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
804                 lfsck->li_obj_oit = NULL;
805         }
806
807         LASSERT(lfsck->li_obj_dir == NULL);
808
809         while (!cfs_list_empty(&lfsck->li_list_scan)) {
810                 com = cfs_list_entry(lfsck->li_list_scan.next,
811                                      struct lfsck_component,
812                                      lc_link);
813                 lfsck_component_cleanup(env, com);
814         }
815
816         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
817
818         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
819                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
820                                      struct lfsck_component,
821                                      lc_link);
822                 lfsck_component_cleanup(env, com);
823         }
824
825         while (!cfs_list_empty(&lfsck->li_list_idle)) {
826                 com = cfs_list_entry(lfsck->li_list_idle.next,
827                                      struct lfsck_component,
828                                      lc_link);
829                 lfsck_component_cleanup(env, com);
830         }
831
832         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
833         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
834
835         if (lfsck->li_bookmark_obj != NULL) {
836                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
837                 lfsck->li_bookmark_obj = NULL;
838         }
839
840         if (lfsck->li_lpf_obj != NULL) {
841                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
842                 lfsck->li_lpf_obj = NULL;
843         }
844
845         if (lfsck->li_los != NULL) {
846                 local_oid_storage_fini(env, lfsck->li_los);
847                 lfsck->li_los = NULL;
848         }
849
850         lfsck_fid_fini(lfsck);
851
852         OBD_FREE_PTR(lfsck);
853 }
854
855 static inline struct lfsck_instance *
856 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
857 {
858         struct lfsck_instance *lfsck;
859
860         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
861                 if (lfsck->li_bottom == key) {
862                         if (ref)
863                                 lfsck_instance_get(lfsck);
864                         if (unlink)
865                                 list_del_init(&lfsck->li_link);
866
867                         return lfsck;
868                 }
869         }
870
871         return NULL;
872 }
873
874 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
875                                            bool unlink)
876 {
877         struct lfsck_instance *lfsck;
878
879         spin_lock(&lfsck_instance_lock);
880         lfsck = __lfsck_instance_find(key, ref, unlink);
881         spin_unlock(&lfsck_instance_lock);
882
883         return lfsck;
884 }
885
886 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
887 {
888         struct lfsck_instance *tmp;
889
890         spin_lock(&lfsck_instance_lock);
891         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
892                 if (lfsck->li_bottom == tmp->li_bottom) {
893                         spin_unlock(&lfsck_instance_lock);
894                         return -EEXIST;
895                 }
896         }
897
898         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
899         spin_unlock(&lfsck_instance_lock);
900         return 0;
901 }
902
903 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
904                     const char *prefix)
905 {
906         int save = *len;
907         int flag;
908         int rc;
909         int i;
910         bool newline = (bits != 0 ? false : true);
911
912         rc = snprintf(*buf, *len, "%s:%c", prefix, newline ? '\n' : ' ');
913         if (rc <= 0)
914                 return -ENOSPC;
915
916         *buf += rc;
917         *len -= rc;
918         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
919                 if (flag & bits) {
920                         bits &= ~flag;
921                         if (names[i] != NULL) {
922                                 if (bits == 0)
923                                         newline = true;
924
925                                 rc = snprintf(*buf, *len, "%s%c", names[i],
926                                               newline ? '\n' : ',');
927                                 if (rc <= 0)
928                                         return -ENOSPC;
929
930                                 *buf += rc;
931                                 *len -= rc;
932                         }
933                 }
934         }
935
936         if (!newline) {
937                 rc = snprintf(*buf, *len, "\n");
938                 if (rc <= 0)
939                         return -ENOSPC;
940
941                 *buf += rc;
942                 *len -= rc;
943         }
944
945         return save - *len;
946 }
947
948 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
949 {
950         int rc;
951
952         if (time != 0)
953                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
954                               cfs_time_current_sec() - time);
955         else
956                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
957         if (rc <= 0)
958                 return -ENOSPC;
959
960         *buf += rc;
961         *len -= rc;
962         return rc;
963 }
964
965 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
966                    const char *prefix)
967 {
968         int rc;
969
970         if (fid_is_zero(&pos->lp_dir_parent)) {
971                 if (pos->lp_oit_cookie == 0)
972                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
973                                       prefix);
974                 else
975                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
976                                       prefix, pos->lp_oit_cookie);
977         } else {
978                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
979                               prefix, pos->lp_oit_cookie,
980                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
981         }
982         if (rc <= 0)
983                 return -ENOSPC;
984
985         *buf += rc;
986         *len -= rc;
987         return rc;
988 }
989
990 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
991                     struct lfsck_position *pos, bool init)
992 {
993         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
994
995         if (unlikely(lfsck->li_di_oit == NULL)) {
996                 memset(pos, 0, sizeof(*pos));
997                 return;
998         }
999
1000         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1001         if (!lfsck->li_current_oit_processed && !init)
1002                 pos->lp_oit_cookie--;
1003
1004         LASSERT(pos->lp_oit_cookie > 0);
1005
1006         if (lfsck->li_di_dir != NULL) {
1007                 struct dt_object *dto = lfsck->li_obj_dir;
1008
1009                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1010                                                         lfsck->li_di_dir);
1011
1012                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1013                         fid_zero(&pos->lp_dir_parent);
1014                         pos->lp_dir_cookie = 0;
1015                 } else {
1016                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1017                 }
1018         } else {
1019                 fid_zero(&pos->lp_dir_parent);
1020                 pos->lp_dir_cookie = 0;
1021         }
1022 }
1023
1024 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1025 {
1026         lfsck->li_bookmark_ram.lb_speed_limit = limit;
1027         if (limit != LFSCK_SPEED_NO_LIMIT) {
1028                 if (limit > HZ) {
1029                         lfsck->li_sleep_rate = limit / HZ;
1030                         lfsck->li_sleep_jif = 1;
1031                 } else {
1032                         lfsck->li_sleep_rate = 1;
1033                         lfsck->li_sleep_jif = HZ / limit;
1034                 }
1035         } else {
1036                 lfsck->li_sleep_jif = 0;
1037                 lfsck->li_sleep_rate = 0;
1038         }
1039 }
1040
1041 void lfsck_control_speed(struct lfsck_instance *lfsck)
1042 {
1043         struct ptlrpc_thread *thread = &lfsck->li_thread;
1044         struct l_wait_info    lwi;
1045
1046         if (lfsck->li_sleep_jif > 0 &&
1047             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1048                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1049                                        LWI_ON_SIGNAL_NOOP, NULL);
1050
1051                 l_wait_event(thread->t_ctl_waitq,
1052                              !thread_is_running(thread),
1053                              &lwi);
1054                 lfsck->li_new_scanned = 0;
1055         }
1056 }
1057
1058 void lfsck_control_speed_by_self(struct lfsck_component *com)
1059 {
1060         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1061         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1062         struct l_wait_info       lwi;
1063
1064         if (lfsck->li_sleep_jif > 0 &&
1065             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1066                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1067                                        LWI_ON_SIGNAL_NOOP, NULL);
1068
1069                 l_wait_event(thread->t_ctl_waitq,
1070                              !thread_is_running(thread),
1071                              &lwi);
1072                 com->lc_new_scanned = 0;
1073         }
1074 }
1075
1076 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
1077                             struct lu_fid *fid)
1078 {
1079         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
1080                      !dt_try_as_dir(env, obj)))
1081                 return -ENOTDIR;
1082
1083         return dt_lookup(env, obj, (struct dt_rec *)fid,
1084                          (const struct dt_key *)"..", BYPASS_CAPA);
1085 }
1086
1087 static int lfsck_needs_scan_dir(const struct lu_env *env,
1088                                 struct lfsck_instance *lfsck,
1089                                 struct dt_object *obj)
1090 {
1091         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
1092         int            depth = 0;
1093         int            rc;
1094
1095         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
1096             cfs_list_empty(&lfsck->li_list_dir))
1097                RETURN(0);
1098
1099         while (1) {
1100                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
1101                  *      which is the agent directory to manage the objects
1102                  *      which name entries reside on remote MDTs. Related
1103                  *      consistency verification will be processed in LFSCK
1104                  *      phase III. */
1105                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
1106                         if (depth > 0)
1107                                 lfsck_object_put(env, obj);
1108                         return 1;
1109                 }
1110
1111                 /* .lustre doesn't contain "real" user objects, no need lfsck */
1112                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
1113                         if (depth > 0)
1114                                 lfsck_object_put(env, obj);
1115                         return 0;
1116                 }
1117
1118                 dt_read_lock(env, obj, MOR_TGT_CHILD);
1119                 if (unlikely(lfsck_is_dead_obj(obj))) {
1120                         dt_read_unlock(env, obj);
1121                         if (depth > 0)
1122                                 lfsck_object_put(env, obj);
1123                         return 0;
1124                 }
1125
1126                 rc = dt_xattr_get(env, obj,
1127                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
1128                                   BYPASS_CAPA);
1129                 dt_read_unlock(env, obj);
1130                 if (rc >= 0) {
1131                         if (depth > 0)
1132                                 lfsck_object_put(env, obj);
1133                         return 1;
1134                 }
1135
1136                 if (rc < 0 && rc != -ENODATA) {
1137                         if (depth > 0)
1138                                 lfsck_object_put(env, obj);
1139                         return rc;
1140                 }
1141
1142                 rc = lfsck_parent_fid(env, obj, fid);
1143                 if (depth > 0)
1144                         lfsck_object_put(env, obj);
1145                 if (rc != 0)
1146                         return rc;
1147
1148                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
1149                         return 0;
1150
1151                 obj = lfsck_object_find(env, lfsck, fid);
1152                 if (obj == NULL)
1153                         return 0;
1154                 else if (IS_ERR(obj))
1155                         return PTR_ERR(obj);
1156
1157                 if (!dt_object_exists(obj)) {
1158                         lfsck_object_put(env, obj);
1159                         return 0;
1160                 }
1161
1162                 /* Currently, only client visible directory can be remote. */
1163                 if (dt_object_remote(obj)) {
1164                         lfsck_object_put(env, obj);
1165                         return 1;
1166                 }
1167
1168                 depth++;
1169         }
1170         return 0;
1171 }
1172
1173 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1174                                                  struct lfsck_component *com,
1175                                                  struct lfsck_start_param *lsp)
1176 {
1177         struct lfsck_thread_args *lta;
1178         int                       rc;
1179
1180         OBD_ALLOC_PTR(lta);
1181         if (lta == NULL)
1182                 return ERR_PTR(-ENOMEM);
1183
1184         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1185         if (rc != 0) {
1186                 OBD_FREE_PTR(lta);
1187                 return ERR_PTR(rc);
1188         }
1189
1190         lta->lta_lfsck = lfsck_instance_get(lfsck);
1191         if (com != NULL)
1192                 lta->lta_com = lfsck_component_get(com);
1193
1194         lta->lta_lsp = lsp;
1195
1196         return lta;
1197 }
1198
1199 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1200 {
1201         if (lta->lta_com != NULL)
1202                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1203         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1204         lu_env_fini(&lta->lta_env);
1205         OBD_FREE_PTR(lta);
1206 }
1207
1208 /* LFSCK wrap functions */
1209
1210 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
1211                 bool new_checked)
1212 {
1213         struct lfsck_component *com;
1214
1215         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1216                 com->lc_ops->lfsck_fail(env, com, new_checked);
1217         }
1218 }
1219
1220 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
1221 {
1222         struct lfsck_component *com;
1223         int                     rc  = 0;
1224         int                     rc1 = 0;
1225
1226         if (likely(cfs_time_beforeq(cfs_time_current(),
1227                                     lfsck->li_time_next_checkpoint)))
1228                 return 0;
1229
1230         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1231         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1232                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
1233                 if (rc != 0)
1234                         rc1 = rc;
1235         }
1236
1237         lfsck->li_time_last_checkpoint = cfs_time_current();
1238         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1239                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1240         return rc1 != 0 ? rc1 : rc;
1241 }
1242
1243 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
1244                struct lfsck_start_param *lsp)
1245 {
1246         struct dt_object       *obj     = NULL;
1247         struct lfsck_component *com;
1248         struct lfsck_component *next;
1249         struct lfsck_position  *pos     = NULL;
1250         const struct dt_it_ops *iops    =
1251                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
1252         struct dt_it           *di;
1253         int                     rc;
1254         ENTRY;
1255
1256         LASSERT(lfsck->li_obj_dir == NULL);
1257         LASSERT(lfsck->li_di_dir == NULL);
1258
1259         lfsck->li_current_oit_processed = 0;
1260         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1261                 com->lc_new_checked = 0;
1262                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1263                         com->lc_journal = 0;
1264
1265                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
1266                 if (rc != 0)
1267                         GOTO(out, rc);
1268
1269                 if ((pos == NULL) ||
1270                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
1271                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
1272                         pos = &com->lc_pos_start;
1273         }
1274
1275         /* Init otable-based iterator. */
1276         if (pos == NULL) {
1277                 rc = iops->load(env, lfsck->li_di_oit, 0);
1278                 if (rc > 0) {
1279                         lfsck->li_oit_over = 1;
1280                         rc = 0;
1281                 }
1282
1283                 GOTO(out, rc);
1284         }
1285
1286         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
1287         if (rc < 0)
1288                 GOTO(out, rc);
1289         else if (rc > 0)
1290                 lfsck->li_oit_over = 1;
1291
1292         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
1293                 GOTO(out, rc = 0);
1294
1295         /* Find the directory for namespace-based traverse. */
1296         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
1297         if (obj == NULL)
1298                 GOTO(out, rc = 0);
1299         else if (IS_ERR(obj))
1300                 RETURN(PTR_ERR(obj));
1301
1302         /* XXX: Currently, skip remote object, the consistency for
1303          *      remote object will be processed in LFSCK phase III. */
1304         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
1305             unlikely(!S_ISDIR(lfsck_object_type(obj))))
1306                 GOTO(out, rc = 0);
1307
1308         if (unlikely(!dt_try_as_dir(env, obj)))
1309                 GOTO(out, rc = -ENOTDIR);
1310
1311         /* Init the namespace-based directory traverse. */
1312         iops = &obj->do_index_ops->dio_it;
1313         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1314         if (IS_ERR(di))
1315                 GOTO(out, rc = PTR_ERR(di));
1316
1317         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
1318
1319         rc = iops->load(env, di, pos->lp_dir_cookie);
1320         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
1321                 rc = iops->next(env, di);
1322         else if (rc > 0)
1323                 rc = 0;
1324
1325         if (rc != 0) {
1326                 iops->put(env, di);
1327                 iops->fini(env, di);
1328                 GOTO(out, rc);
1329         }
1330
1331         lfsck->li_obj_dir = lfsck_object_get(obj);
1332         lfsck->li_cookie_dir = iops->store(env, di);
1333         spin_lock(&lfsck->li_lock);
1334         lfsck->li_di_dir = di;
1335         spin_unlock(&lfsck->li_lock);
1336
1337         GOTO(out, rc = 0);
1338
1339 out:
1340         if (obj != NULL)
1341                 lfsck_object_put(env, obj);
1342
1343         if (rc < 0) {
1344                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1345                                              lc_link)
1346                         com->lc_ops->lfsck_post(env, com, rc, true);
1347
1348                 return rc;
1349         }
1350
1351         rc = 0;
1352         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
1353         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1354                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
1355                 if (rc != 0)
1356                         break;
1357         }
1358
1359         lfsck->li_time_last_checkpoint = cfs_time_current();
1360         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1361                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1362         return rc;
1363 }
1364
1365 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
1366                    struct dt_object *obj)
1367 {
1368         struct lfsck_component *com;
1369         const struct dt_it_ops *iops;
1370         struct dt_it           *di;
1371         int                     rc;
1372         ENTRY;
1373
1374         LASSERT(lfsck->li_obj_dir == NULL);
1375
1376         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1377                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
1378                 if (rc != 0)
1379                         RETURN(rc);
1380         }
1381
1382         rc = lfsck_needs_scan_dir(env, lfsck, obj);
1383         if (rc <= 0)
1384                 GOTO(out, rc);
1385
1386         if (unlikely(!dt_try_as_dir(env, obj)))
1387                 GOTO(out, rc = -ENOTDIR);
1388
1389         iops = &obj->do_index_ops->dio_it;
1390         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1391         if (IS_ERR(di))
1392                 GOTO(out, rc = PTR_ERR(di));
1393
1394         rc = iops->load(env, di, 0);
1395         if (rc == 0)
1396                 rc = iops->next(env, di);
1397         else if (rc > 0)
1398                 rc = 0;
1399
1400         if (rc != 0) {
1401                 iops->put(env, di);
1402                 iops->fini(env, di);
1403                 GOTO(out, rc);
1404         }
1405
1406         lfsck->li_obj_dir = lfsck_object_get(obj);
1407         lfsck->li_cookie_dir = iops->store(env, di);
1408         spin_lock(&lfsck->li_lock);
1409         lfsck->li_di_dir = di;
1410         spin_unlock(&lfsck->li_lock);
1411
1412         GOTO(out, rc = 0);
1413
1414 out:
1415         if (rc < 0)
1416                 lfsck_fail(env, lfsck, false);
1417         return (rc > 0 ? 0 : rc);
1418 }
1419
1420 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
1421                    struct dt_object *obj, struct lu_dirent *ent)
1422 {
1423         struct lfsck_component *com;
1424         int                     rc;
1425
1426         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1427                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
1428                 if (rc != 0)
1429                         return rc;
1430         }
1431         return 0;
1432 }
1433
1434 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
1435                int result)
1436 {
1437         struct lfsck_component *com;
1438         struct lfsck_component *next;
1439         int                     rc  = 0;
1440         int                     rc1 = 0;
1441
1442         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1443         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1444                 rc = com->lc_ops->lfsck_post(env, com, result, false);
1445                 if (rc != 0)
1446                         rc1 = rc;
1447         }
1448
1449         lfsck->li_time_last_checkpoint = cfs_time_current();
1450         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1451                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1452
1453         /* Ignore some component post failure to make other can go ahead. */
1454         return result;
1455 }
1456
1457 static void lfsck_interpret(const struct lu_env *env,
1458                             struct lfsck_instance *lfsck,
1459                             struct ptlrpc_request *req, void *args, int result)
1460 {
1461         struct lfsck_async_interpret_args *laia = args;
1462         struct lfsck_component            *com;
1463
1464         LASSERT(laia->laia_com == NULL);
1465         LASSERT(laia->laia_shared);
1466
1467         spin_lock(&lfsck->li_lock);
1468         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1469                 if (com->lc_ops->lfsck_interpret != NULL) {
1470                         laia->laia_com = com;
1471                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1472                 }
1473         }
1474
1475         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1476                 if (com->lc_ops->lfsck_interpret != NULL) {
1477                         laia->laia_com = com;
1478                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1479                 }
1480         }
1481         spin_unlock(&lfsck->li_lock);
1482 }
1483
1484 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
1485 {
1486         struct lfsck_component *com;
1487         struct lfsck_component *next;
1488         struct l_wait_info      lwi = { 0 };
1489         int                     rc  = 0;
1490         int                     rc1 = 0;
1491
1492         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1493                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1494                         com->lc_journal = 0;
1495
1496                 rc = com->lc_ops->lfsck_double_scan(env, com);
1497                 if (rc != 0)
1498                         rc1 = rc;
1499         }
1500
1501         l_wait_event(lfsck->li_thread.t_ctl_waitq,
1502                      atomic_read(&lfsck->li_double_scan_count) == 0,
1503                      &lwi);
1504
1505         if (lfsck->li_status != LS_PAUSED &&
1506             lfsck->li_status != LS_CO_PAUSED) {
1507                 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1508                                          lc_link) {
1509                         spin_lock(&lfsck->li_lock);
1510                         list_del_init(&com->lc_link);
1511                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1512                         spin_unlock(&lfsck->li_lock);
1513                 }
1514         }
1515
1516         return rc1 != 0 ? rc1 : rc;
1517 }
1518
1519 static int lfsck_stop_notify(const struct lu_env *env,
1520                              struct lfsck_instance *lfsck,
1521                              struct lfsck_tgt_descs *ltds,
1522                              struct lfsck_tgt_desc *ltd, __u16 type)
1523 {
1524         struct ptlrpc_request_set *set;
1525         struct lfsck_component    *com;
1526         int                        rc  = 0;
1527         ENTRY;
1528
1529         spin_lock(&lfsck->li_lock);
1530         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1531         if (com == NULL)
1532                 com = __lfsck_component_find(lfsck, type,
1533                                              &lfsck->li_list_double_scan);
1534         if (com != NULL)
1535                 lfsck_component_get(com);
1536         spin_lock(&lfsck->li_lock);
1537
1538         if (com != NULL) {
1539                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1540                         set = ptlrpc_prep_set();
1541                         if (set == NULL) {
1542                                 lfsck_component_put(env, com);
1543
1544                                 RETURN(-ENOMEM);
1545                         }
1546
1547                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1548                                                             ltd, set);
1549                         if (rc == 0)
1550                                 rc = ptlrpc_set_wait(set);
1551
1552                         ptlrpc_set_destroy(set);
1553                 }
1554
1555                 lfsck_component_put(env, com);
1556         }
1557
1558         RETURN(rc);
1559 }
1560
1561 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
1562 {
1563         struct lfsck_component *com;
1564         struct lfsck_component *next;
1565
1566         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1567                                  lc_link) {
1568                 if (com->lc_ops->lfsck_quit != NULL)
1569                         com->lc_ops->lfsck_quit(env, com);
1570
1571                 spin_lock(&lfsck->li_lock);
1572                 list_del_init(&com->lc_link);
1573                 list_del_init(&com->lc_link_dir);
1574                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1575                 spin_unlock(&lfsck->li_lock);
1576         }
1577
1578         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1579                                  lc_link) {
1580                 if (com->lc_ops->lfsck_quit != NULL)
1581                         com->lc_ops->lfsck_quit(env, com);
1582
1583                 spin_lock(&lfsck->li_lock);
1584                 list_del_init(&com->lc_link);
1585                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1586                 spin_unlock(&lfsck->li_lock);
1587         }
1588 }
1589
1590 static int lfsck_async_interpret(const struct lu_env *env,
1591                                  struct ptlrpc_request *req,
1592                                  void *args, int rc)
1593 {
1594         struct lfsck_async_interpret_args *laia = args;
1595         struct lfsck_instance             *lfsck;
1596
1597         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1598                               li_mdt_descs);
1599         lfsck_interpret(env, lfsck, req, laia, rc);
1600         lfsck_tgt_put(laia->laia_ltd);
1601         if (rc != 0 && laia->laia_result != -EALREADY)
1602                 laia->laia_result = rc;
1603
1604         return 0;
1605 }
1606
1607 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1608                         struct lfsck_request *lr,
1609                         struct ptlrpc_request_set *set,
1610                         ptlrpc_interpterer_t interpreter,
1611                         void *args, int request)
1612 {
1613         struct lfsck_async_interpret_args *laia;
1614         struct ptlrpc_request             *req;
1615         struct lfsck_request              *tmp;
1616         struct req_format                 *format;
1617         int                                rc;
1618
1619         switch (request) {
1620         case LFSCK_NOTIFY:
1621                 format = &RQF_LFSCK_NOTIFY;
1622                 break;
1623         case LFSCK_QUERY:
1624                 format = &RQF_LFSCK_QUERY;
1625                 break;
1626         default:
1627                 CERROR("%s: unknown async request: opc = %d\n",
1628                        exp->exp_obd->obd_name, request);
1629                 return -EINVAL;
1630         }
1631
1632         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1633         if (req == NULL)
1634                 return -ENOMEM;
1635
1636         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1637         if (rc != 0) {
1638                 ptlrpc_request_free(req);
1639
1640                 return rc;
1641         }
1642
1643         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1644         *tmp = *lr;
1645         ptlrpc_request_set_replen(req);
1646
1647         laia = ptlrpc_req_async_args(req);
1648         *laia = *(struct lfsck_async_interpret_args *)args;
1649         if (laia->laia_com != NULL)
1650                 lfsck_component_get(laia->laia_com);
1651         req->rq_interpret_reply = interpreter;
1652         ptlrpc_set_add_req(set, req);
1653
1654         return 0;
1655 }
1656
1657 /* external interfaces */
1658
1659 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
1660 {
1661         struct lu_env           env;
1662         struct lfsck_instance  *lfsck;
1663         int                     rc;
1664         ENTRY;
1665
1666         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1667         if (rc != 0)
1668                 RETURN(rc);
1669
1670         lfsck = lfsck_instance_find(key, true, false);
1671         if (likely(lfsck != NULL)) {
1672                 rc = snprintf(buf, len, "%u\n",
1673                               lfsck->li_bookmark_ram.lb_speed_limit);
1674                 lfsck_instance_put(&env, lfsck);
1675         } else {
1676                 rc = -ENXIO;
1677         }
1678
1679         lu_env_fini(&env);
1680
1681         RETURN(rc);
1682 }
1683 EXPORT_SYMBOL(lfsck_get_speed);
1684
1685 int lfsck_set_speed(struct dt_device *key, int val)
1686 {
1687         struct lu_env           env;
1688         struct lfsck_instance  *lfsck;
1689         int                     rc;
1690         ENTRY;
1691
1692         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1693         if (rc != 0)
1694                 RETURN(rc);
1695
1696         lfsck = lfsck_instance_find(key, true, false);
1697         if (likely(lfsck != NULL)) {
1698                 mutex_lock(&lfsck->li_mutex);
1699                 __lfsck_set_speed(lfsck, val);
1700                 rc = lfsck_bookmark_store(&env, lfsck);
1701                 mutex_unlock(&lfsck->li_mutex);
1702                 lfsck_instance_put(&env, lfsck);
1703         } else {
1704                 rc = -ENXIO;
1705         }
1706
1707         lu_env_fini(&env);
1708
1709         RETURN(rc);
1710 }
1711 EXPORT_SYMBOL(lfsck_set_speed);
1712
1713 int lfsck_get_windows(struct dt_device *key, void *buf, int len)
1714 {
1715         struct lu_env           env;
1716         struct lfsck_instance  *lfsck;
1717         int                     rc;
1718         ENTRY;
1719
1720         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1721         if (rc != 0)
1722                 RETURN(rc);
1723
1724         lfsck = lfsck_instance_find(key, true, false);
1725         if (likely(lfsck != NULL)) {
1726                 rc = snprintf(buf, len, "%u\n",
1727                               lfsck->li_bookmark_ram.lb_async_windows);
1728                 lfsck_instance_put(&env, lfsck);
1729         } else {
1730                 rc = -ENXIO;
1731         }
1732
1733         lu_env_fini(&env);
1734
1735         RETURN(rc);
1736 }
1737 EXPORT_SYMBOL(lfsck_get_windows);
1738
1739 int lfsck_set_windows(struct dt_device *key, int val)
1740 {
1741         struct lu_env           env;
1742         struct lfsck_instance  *lfsck;
1743         int                     rc;
1744         ENTRY;
1745
1746         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1747         if (rc != 0)
1748                 RETURN(rc);
1749
1750         lfsck = lfsck_instance_find(key, true, false);
1751         if (likely(lfsck != NULL)) {
1752                 if (val > LFSCK_ASYNC_WIN_MAX) {
1753                         CERROR("%s: Too large async windows size, which "
1754                                "may cause memory issues. The valid range "
1755                                "is [0 - %u]. If you do not want to restrict "
1756                                "the windows size for async requests pipeline, "
1757                                "just set it as 0.\n",
1758                                lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1759                         rc = -EINVAL;
1760                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1761                         mutex_lock(&lfsck->li_mutex);
1762                         lfsck->li_bookmark_ram.lb_async_windows = val;
1763                         rc = lfsck_bookmark_store(&env, lfsck);
1764                         mutex_unlock(&lfsck->li_mutex);
1765                 }
1766                 lfsck_instance_put(&env, lfsck);
1767         } else {
1768                 rc = -ENXIO;
1769         }
1770
1771         lu_env_fini(&env);
1772
1773         RETURN(rc);
1774 }
1775 EXPORT_SYMBOL(lfsck_set_windows);
1776
1777 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
1778 {
1779         struct lu_env           env;
1780         struct lfsck_instance  *lfsck;
1781         struct lfsck_component *com;
1782         int                     rc;
1783         ENTRY;
1784
1785         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1786         if (rc != 0)
1787                 RETURN(rc);
1788
1789         lfsck = lfsck_instance_find(key, true, false);
1790         if (likely(lfsck != NULL)) {
1791                 com = lfsck_component_find(lfsck, type);
1792                 if (likely(com != NULL)) {
1793                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
1794                         lfsck_component_put(&env, com);
1795                 } else {
1796                         rc = -ENOTSUPP;
1797                 }
1798
1799                 lfsck_instance_put(&env, lfsck);
1800         } else {
1801                 rc = -ENXIO;
1802         }
1803
1804         lu_env_fini(&env);
1805
1806         RETURN(rc);
1807 }
1808 EXPORT_SYMBOL(lfsck_dump);
1809
1810 static int lfsck_stop_all(const struct lu_env *env,
1811                           struct lfsck_instance *lfsck,
1812                           struct lfsck_stop *stop)
1813 {
1814         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1815         struct lfsck_request              *lr     = &info->lti_lr;
1816         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1817         struct ptlrpc_request_set         *set;
1818         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1819         struct lfsck_tgt_desc             *ltd;
1820         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1821         __u32                              idx;
1822         int                                rc     = 0;
1823         int                                rc1    = 0;
1824         ENTRY;
1825
1826         LASSERT(stop->ls_flags & LPF_BROADCAST);
1827
1828         set = ptlrpc_prep_set();
1829         if (unlikely(set == NULL)) {
1830                 CERROR("%s: cannot allocate memory for stop LFSCK on "
1831                        "all targets\n", lfsck_lfsck2name(lfsck));
1832
1833                 RETURN(-ENOMEM);
1834         }
1835
1836         memset(lr, 0, sizeof(*lr));
1837         lr->lr_event = LE_STOP;
1838         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1839         lr->lr_status = stop->ls_status;
1840         lr->lr_version = bk->lb_version;
1841         lr->lr_active = LFSCK_TYPES_ALL;
1842         lr->lr_param = stop->ls_flags;
1843
1844         laia->laia_com = NULL;
1845         laia->laia_ltds = ltds;
1846         laia->laia_lr = lr;
1847         laia->laia_result = 0;
1848         laia->laia_shared = 1;
1849
1850         down_read(&ltds->ltd_rw_sem);
1851         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1852                 ltd = lfsck_tgt_get(ltds, idx);
1853                 LASSERT(ltd != NULL);
1854
1855                 laia->laia_ltd = ltd;
1856                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1857                                          lfsck_async_interpret, laia,
1858                                          LFSCK_NOTIFY);
1859                 if (rc != 0) {
1860                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1861                         lfsck_tgt_put(ltd);
1862                         CWARN("%s: cannot notify MDT %x for LFSCK stop: "
1863                               "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1864                         rc1 = rc;
1865                 }
1866         }
1867         up_read(&ltds->ltd_rw_sem);
1868
1869         rc = ptlrpc_set_wait(set);
1870         ptlrpc_set_destroy(set);
1871
1872         if (rc == 0)
1873                 rc = laia->laia_result;
1874
1875         if (rc == -EALREADY)
1876                 rc = 0;
1877
1878         if (rc != 0)
1879                 CWARN("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1880                       lfsck_lfsck2name(lfsck), rc);
1881
1882         RETURN(rc != 0 ? rc : rc1);
1883 }
1884
1885 static int lfsck_start_all(const struct lu_env *env,
1886                            struct lfsck_instance *lfsck,
1887                            struct lfsck_start *start)
1888 {
1889         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1890         struct lfsck_request              *lr     = &info->lti_lr;
1891         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1892         struct ptlrpc_request_set         *set;
1893         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1894         struct lfsck_tgt_desc             *ltd;
1895         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1896         __u32                              idx;
1897         int                                rc     = 0;
1898         ENTRY;
1899
1900         LASSERT(start->ls_flags & LPF_BROADCAST);
1901
1902         set = ptlrpc_prep_set();
1903         if (unlikely(set == NULL)) {
1904                 if (bk->lb_param & LPF_FAILOUT) {
1905                         CERROR("%s: cannot allocate memory for start LFSCK on "
1906                                "all targets, failout.\n",
1907                                lfsck_lfsck2name(lfsck));
1908
1909                         RETURN(-ENOMEM);
1910                 } else {
1911                         CWARN("%s: cannot allocate memory for start LFSCK on "
1912                               "all targets, partly scan.\n",
1913                               lfsck_lfsck2name(lfsck));
1914
1915                         RETURN(0);
1916                 }
1917         }
1918
1919         memset(lr, 0, sizeof(*lr));
1920         lr->lr_event = LE_START;
1921         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1922         lr->lr_speed = bk->lb_speed_limit;
1923         lr->lr_version = bk->lb_version;
1924         lr->lr_active = start->ls_active;
1925         lr->lr_param = start->ls_flags;
1926         lr->lr_async_windows = bk->lb_async_windows;
1927         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1928                        LSV_ASYNC_WINDOWS;
1929
1930         laia->laia_com = NULL;
1931         laia->laia_ltds = ltds;
1932         laia->laia_lr = lr;
1933         laia->laia_result = 0;
1934         laia->laia_shared = 1;
1935
1936         down_read(&ltds->ltd_rw_sem);
1937         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1938                 ltd = lfsck_tgt_get(ltds, idx);
1939                 LASSERT(ltd != NULL);
1940
1941                 laia->laia_ltd = ltd;
1942                 ltd->ltd_layout_done = 0;
1943                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1944                                          lfsck_async_interpret, laia,
1945                                          LFSCK_NOTIFY);
1946                 if (rc != 0) {
1947                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1948                         lfsck_tgt_put(ltd);
1949                         if (bk->lb_param & LPF_FAILOUT) {
1950                                 CERROR("%s: cannot notify MDT %x for LFSCK "
1951                                        "start, failout: rc = %d\n",
1952                                        lfsck_lfsck2name(lfsck), idx, rc);
1953                                 break;
1954                         } else {
1955                                 CWARN("%s: cannot notify MDT %x for LFSCK "
1956                                       "start, partly scan: rc = %d\n",
1957                                       lfsck_lfsck2name(lfsck), idx, rc);
1958                                 rc = 0;
1959                         }
1960                 }
1961         }
1962         up_read(&ltds->ltd_rw_sem);
1963
1964         if (rc != 0) {
1965                 ptlrpc_set_destroy(set);
1966
1967                 RETURN(rc);
1968         }
1969
1970         rc = ptlrpc_set_wait(set);
1971         ptlrpc_set_destroy(set);
1972
1973         if (rc == 0)
1974                 rc = laia->laia_result;
1975
1976         if (rc != 0) {
1977                 if (bk->lb_param & LPF_FAILOUT) {
1978                         struct lfsck_stop *stop = &info->lti_stop;
1979
1980                         CERROR("%s: cannot start LFSCK on some MDTs, "
1981                                "stop all: rc = %d\n",
1982                                lfsck_lfsck2name(lfsck), rc);
1983                         if (rc != -EALREADY) {
1984                                 stop->ls_status = LS_FAILED;
1985                                 stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1986                                 lfsck_stop_all(env, lfsck, stop);
1987                         }
1988                 } else {
1989                         CWARN("%s: cannot start LFSCK on some MDTs, "
1990                               "partly scan: rc = %d\n",
1991                               lfsck_lfsck2name(lfsck), rc);
1992                         rc = 0;
1993                 }
1994         }
1995
1996         RETURN(rc);
1997 }
1998
1999 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2000                 struct lfsck_start_param *lsp)
2001 {
2002         struct lfsck_start              *start  = lsp->lsp_start;
2003         struct lfsck_instance           *lfsck;
2004         struct lfsck_bookmark           *bk;
2005         struct ptlrpc_thread            *thread;
2006         struct lfsck_component          *com;
2007         struct l_wait_info               lwi    = { 0 };
2008         struct lfsck_thread_args        *lta;
2009         bool                             dirty  = false;
2010         long                             rc     = 0;
2011         __u16                            valid  = 0;
2012         __u16                            flags  = 0;
2013         __u16                            type   = 1;
2014         ENTRY;
2015
2016         lfsck = lfsck_instance_find(key, true, false);
2017         if (unlikely(lfsck == NULL))
2018                 RETURN(-ENXIO);
2019
2020         /* System is not ready, try again later. */
2021         if (unlikely(lfsck->li_namespace == NULL))
2022                 GOTO(put, rc = -EAGAIN);
2023
2024         /* start == NULL means auto trigger paused LFSCK. */
2025         if ((start == NULL) &&
2026             (cfs_list_empty(&lfsck->li_list_scan) ||
2027              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2028                 GOTO(put, rc = 0);
2029
2030         bk = &lfsck->li_bookmark_ram;
2031         thread = &lfsck->li_thread;
2032         mutex_lock(&lfsck->li_mutex);
2033         spin_lock(&lfsck->li_lock);
2034         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2035                 rc = -EALREADY;
2036                 while (start->ls_active != 0) {
2037                         if (!(type & start->ls_active)) {
2038                                 type <<= 1;
2039                                 continue;
2040                         }
2041
2042                         com = __lfsck_component_find(lfsck, type,
2043                                                      &lfsck->li_list_scan);
2044                         if (com == NULL)
2045                                 com = __lfsck_component_find(lfsck, type,
2046                                                 &lfsck->li_list_double_scan);
2047                         if (com == NULL) {
2048                                 rc = -EOPNOTSUPP;
2049                                 break;
2050                         }
2051
2052                         if (com->lc_ops->lfsck_join != NULL) {
2053                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2054                                 if (rc != 0 && rc != -EALREADY)
2055                                         break;
2056                         }
2057                         start->ls_active &= ~type;
2058                         type <<= 1;
2059                 }
2060                 spin_unlock(&lfsck->li_lock);
2061                 GOTO(out, rc);
2062         }
2063         spin_unlock(&lfsck->li_lock);
2064
2065         lfsck->li_status = 0;
2066         lfsck->li_oit_over = 0;
2067         lfsck->li_start_unplug = 0;
2068         lfsck->li_drop_dryrun = 0;
2069         lfsck->li_new_scanned = 0;
2070
2071         /* For auto trigger. */
2072         if (start == NULL)
2073                 goto trigger;
2074
2075         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2076                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2077                        lfsck_lfsck2name(lfsck));
2078
2079                 GOTO(out, rc = -EPERM);
2080         }
2081
2082         start->ls_version = bk->lb_version;
2083         if (start->ls_valid & LSV_SPEED_LIMIT) {
2084                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
2085                 dirty = true;
2086         }
2087
2088         if (start->ls_valid & LSV_ASYNC_WINDOWS &&
2089             bk->lb_async_windows != start->ls_async_windows) {
2090                 bk->lb_async_windows = start->ls_async_windows;
2091                 dirty = true;
2092         }
2093
2094         if (start->ls_valid & LSV_ERROR_HANDLE) {
2095                 valid |= DOIV_ERROR_HANDLE;
2096                 if (start->ls_flags & LPF_FAILOUT)
2097                         flags |= DOIF_FAILOUT;
2098
2099                 if ((start->ls_flags & LPF_FAILOUT) &&
2100                     !(bk->lb_param & LPF_FAILOUT)) {
2101                         bk->lb_param |= LPF_FAILOUT;
2102                         dirty = true;
2103                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
2104                            (bk->lb_param & LPF_FAILOUT)) {
2105                         bk->lb_param &= ~LPF_FAILOUT;
2106                         dirty = true;
2107                 }
2108         }
2109
2110         if (start->ls_valid & LSV_DRYRUN) {
2111                 valid |= DOIV_DRYRUN;
2112                 if (start->ls_flags & LPF_DRYRUN)
2113                         flags |= DOIF_DRYRUN;
2114
2115                 if ((start->ls_flags & LPF_DRYRUN) &&
2116                     !(bk->lb_param & LPF_DRYRUN)) {
2117                         bk->lb_param |= LPF_DRYRUN;
2118                         dirty = true;
2119                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
2120                            (bk->lb_param & LPF_DRYRUN)) {
2121                         bk->lb_param &= ~LPF_DRYRUN;
2122                         lfsck->li_drop_dryrun = 1;
2123                         dirty = true;
2124                 }
2125         }
2126
2127         if (bk->lb_param & LPF_ALL_TGT &&
2128             !(start->ls_flags & LPF_ALL_TGT)) {
2129                 bk->lb_param &= ~LPF_ALL_TGT;
2130                 dirty = true;
2131         } else if (!(bk->lb_param & LPF_ALL_TGT) &&
2132                    start->ls_flags & LPF_ALL_TGT) {
2133                 bk->lb_param |= LPF_ALL_TGT;
2134                 dirty = true;
2135         }
2136
2137         if (bk->lb_param & LPF_ORPHAN &&
2138             !(start->ls_flags & LPF_ORPHAN)) {
2139                 bk->lb_param &= ~LPF_ORPHAN;
2140                 dirty = true;
2141         } else if (!(bk->lb_param & LPF_ORPHAN) &&
2142                    start->ls_flags & LPF_ORPHAN) {
2143                 bk->lb_param |= LPF_ORPHAN;
2144                 dirty = true;
2145         }
2146
2147         if (start->ls_valid & LSV_CREATE_OSTOBJ) {
2148                 if (bk->lb_param & LPF_CREATE_OSTOBJ &&
2149                     !(start->ls_flags & LPF_CREATE_OSTOBJ)) {
2150                         bk->lb_param &= ~LPF_CREATE_OSTOBJ;
2151                         dirty = true;
2152                 } else if (!(bk->lb_param & LPF_CREATE_OSTOBJ) &&
2153                            start->ls_flags & LPF_CREATE_OSTOBJ) {
2154                         bk->lb_param |= LPF_CREATE_OSTOBJ;
2155                         dirty = true;
2156                 }
2157         }
2158
2159         if (dirty) {
2160                 rc = lfsck_bookmark_store(env, lfsck);
2161                 if (rc != 0)
2162                         GOTO(out, rc);
2163         }
2164
2165         if (start->ls_flags & LPF_RESET)
2166                 flags |= DOIF_RESET;
2167
2168         if (start->ls_active != 0) {
2169                 struct lfsck_component *next;
2170
2171                 if (start->ls_active == LFSCK_TYPES_ALL)
2172                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2173
2174                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2175                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2176                         GOTO(out, rc = -ENOTSUPP);
2177                 }
2178
2179                 cfs_list_for_each_entry_safe(com, next,
2180                                              &lfsck->li_list_scan, lc_link) {
2181                         if (!(com->lc_type & start->ls_active)) {
2182                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2183                                                              false);
2184                                 if (rc != 0)
2185                                         GOTO(out, rc);
2186                         }
2187                 }
2188
2189                 while (start->ls_active != 0) {
2190                         if (type & start->ls_active) {
2191                                 com = __lfsck_component_find(lfsck, type,
2192                                                         &lfsck->li_list_idle);
2193                                 if (com != NULL) {
2194                                         /* The component status will be updated
2195                                          * when its prep() is called later by
2196                                          * the LFSCK main engine. */
2197                                         cfs_list_del_init(&com->lc_link);
2198                                         cfs_list_add_tail(&com->lc_link,
2199                                                           &lfsck->li_list_scan);
2200                                 }
2201                                 start->ls_active &= ~type;
2202                         }
2203                         type <<= 1;
2204                 }
2205         }
2206
2207         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2208                 start->ls_active |= com->lc_type;
2209                 if (flags & DOIF_RESET) {
2210                         rc = com->lc_ops->lfsck_reset(env, com, false);
2211                         if (rc != 0)
2212                                 GOTO(out, rc);
2213                 }
2214         }
2215
2216 trigger:
2217         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
2218         if (bk->lb_param & LPF_DRYRUN) {
2219                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2220                 valid |= DOIV_DRYRUN;
2221                 flags |= DOIF_DRYRUN;
2222         }
2223
2224         if (bk->lb_param & LPF_FAILOUT) {
2225                 valid |= DOIV_ERROR_HANDLE;
2226                 flags |= DOIF_FAILOUT;
2227         }
2228
2229         if (!cfs_list_empty(&lfsck->li_list_scan))
2230                 flags |= DOIF_OUTUSED;
2231
2232         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2233         thread_set_flags(thread, 0);
2234         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2235         if (IS_ERR(lta))
2236                 GOTO(out, rc = PTR_ERR(lta));
2237
2238         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
2239         if (IS_ERR_VALUE(rc)) {
2240                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
2241                        lfsck_lfsck2name(lfsck), rc);
2242                 lfsck_thread_args_fini(lta);
2243
2244                 GOTO(out, rc);
2245         }
2246
2247         l_wait_event(thread->t_ctl_waitq,
2248                      thread_is_running(thread) ||
2249                      thread_is_stopped(thread),
2250                      &lwi);
2251         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2252                 lfsck->li_start_unplug = 1;
2253                 wake_up_all(&thread->t_ctl_waitq);
2254
2255                 GOTO(out, rc = 0);
2256         }
2257
2258         /* release lfsck::li_mutex to avoid deadlock. */
2259         mutex_unlock(&lfsck->li_mutex);
2260         rc = lfsck_start_all(env, lfsck, start);
2261         if (rc != 0) {
2262                 spin_lock(&lfsck->li_lock);
2263                 if (thread_is_stopped(thread)) {
2264                         spin_unlock(&lfsck->li_lock);
2265                 } else {
2266                         lfsck->li_status = LS_FAILED;
2267                         lfsck->li_flags = 0;
2268                         thread_set_flags(thread, SVC_STOPPING);
2269                         spin_unlock(&lfsck->li_lock);
2270
2271                         lfsck->li_start_unplug = 1;
2272                         wake_up_all(&thread->t_ctl_waitq);
2273                         l_wait_event(thread->t_ctl_waitq,
2274                                      thread_is_stopped(thread),
2275                                      &lwi);
2276                 }
2277         } else {
2278                 lfsck->li_start_unplug = 1;
2279                 wake_up_all(&thread->t_ctl_waitq);
2280         }
2281
2282         GOTO(put, rc);
2283
2284 out:
2285         mutex_unlock(&lfsck->li_mutex);
2286
2287 put:
2288         lfsck_instance_put(env, lfsck);
2289
2290         return rc < 0 ? rc : 0;
2291 }
2292 EXPORT_SYMBOL(lfsck_start);
2293
2294 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2295                struct lfsck_stop *stop)
2296 {
2297         struct lfsck_instance   *lfsck;
2298         struct ptlrpc_thread    *thread;
2299         struct l_wait_info       lwi    = { 0 };
2300         int                      rc     = 0;
2301         int                      rc1    = 0;
2302         ENTRY;
2303
2304         lfsck = lfsck_instance_find(key, true, false);
2305         if (unlikely(lfsck == NULL))
2306                 RETURN(-ENXIO);
2307
2308         thread = &lfsck->li_thread;
2309         /* release lfsck::li_mutex to avoid deadlock. */
2310         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2311                 if (!lfsck->li_master) {
2312                         CERROR("%s: only allow to specify '-A' via MDS\n",
2313                                lfsck_lfsck2name(lfsck));
2314
2315                         GOTO(out, rc = -EPERM);
2316                 }
2317
2318                 rc1 = lfsck_stop_all(env, lfsck, stop);
2319         }
2320
2321         mutex_lock(&lfsck->li_mutex);
2322         spin_lock(&lfsck->li_lock);
2323         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2324                 spin_unlock(&lfsck->li_lock);
2325                 GOTO(out, rc = -EALREADY);
2326         }
2327
2328         if (stop != NULL) {
2329                 lfsck->li_status = stop->ls_status;
2330                 lfsck->li_flags = stop->ls_flags;
2331         } else {
2332                 lfsck->li_status = LS_STOPPED;
2333                 lfsck->li_flags = 0;
2334         }
2335
2336         thread_set_flags(thread, SVC_STOPPING);
2337         spin_unlock(&lfsck->li_lock);
2338
2339         wake_up_all(&thread->t_ctl_waitq);
2340         l_wait_event(thread->t_ctl_waitq,
2341                      thread_is_stopped(thread),
2342                      &lwi);
2343
2344         GOTO(out, rc = 0);
2345
2346 out:
2347         mutex_unlock(&lfsck->li_mutex);
2348         lfsck_instance_put(env, lfsck);
2349
2350         return rc != 0 ? rc : rc1;
2351 }
2352 EXPORT_SYMBOL(lfsck_stop);
2353
2354 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2355                     struct lfsck_request *lr)
2356 {
2357         int rc = -EOPNOTSUPP;
2358         ENTRY;
2359
2360         switch (lr->lr_event) {
2361         case LE_START: {
2362                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2363                 struct lfsck_start_param  lsp;
2364
2365                 memset(start, 0, sizeof(*start));
2366                 start->ls_valid = lr->lr_valid;
2367                 start->ls_speed_limit = lr->lr_speed;
2368                 start->ls_version = lr->lr_version;
2369                 start->ls_active = lr->lr_active;
2370                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2371                 start->ls_async_windows = lr->lr_async_windows;
2372
2373                 lsp.lsp_start = start;
2374                 lsp.lsp_index = lr->lr_index;
2375                 lsp.lsp_index_valid = 1;
2376                 rc = lfsck_start(env, key, &lsp);
2377                 break;
2378         }
2379         case LE_STOP: {
2380                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2381
2382                 memset(stop, 0, sizeof(*stop));
2383                 stop->ls_status = lr->lr_status;
2384                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2385                 rc = lfsck_stop(env, key, stop);
2386                 break;
2387         }
2388         case LE_PHASE1_DONE:
2389         case LE_PHASE2_DONE:
2390         case LE_FID_ACCESSED:
2391         case LE_PEER_EXIT:
2392         case LE_CONDITIONAL_DESTROY:
2393         case LE_PAIRS_VERIFY: {
2394                 struct lfsck_instance  *lfsck;
2395                 struct lfsck_component *com;
2396
2397                 lfsck = lfsck_instance_find(key, true, false);
2398                 if (unlikely(lfsck == NULL))
2399                         RETURN(-ENXIO);
2400
2401                 com = lfsck_component_find(lfsck, lr->lr_active);
2402                 if (likely(com != NULL)) {
2403                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2404                         lfsck_component_put(env, com);
2405                 }
2406
2407                 lfsck_instance_put(env, lfsck);
2408                 break;
2409         }
2410         default:
2411                 break;
2412         }
2413
2414         RETURN(rc);
2415 }
2416 EXPORT_SYMBOL(lfsck_in_notify);
2417
2418 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2419                 struct lfsck_request *lr)
2420 {
2421         struct lfsck_instance  *lfsck;
2422         struct lfsck_component *com;
2423         int                     rc;
2424         ENTRY;
2425
2426         lfsck = lfsck_instance_find(key, true, false);
2427         if (unlikely(lfsck == NULL))
2428                 RETURN(-ENXIO);
2429
2430         com = lfsck_component_find(lfsck, lr->lr_active);
2431         if (likely(com != NULL)) {
2432                 rc = com->lc_ops->lfsck_query(env, com);
2433                 lfsck_component_put(env, com);
2434         } else {
2435                 rc = -ENOTSUPP;
2436         }
2437
2438         lfsck_instance_put(env, lfsck);
2439
2440         RETURN(rc);
2441 }
2442 EXPORT_SYMBOL(lfsck_query);
2443
2444 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2445                              struct ldlm_namespace *ns)
2446 {
2447         struct lfsck_instance  *lfsck;
2448         int                     rc      = -ENXIO;
2449
2450         lfsck = lfsck_instance_find(key, true, false);
2451         if (likely(lfsck != NULL)) {
2452                 lfsck->li_namespace = ns;
2453                 lfsck_instance_put(env, lfsck);
2454                 rc = 0;
2455         }
2456
2457         return rc;
2458 }
2459 EXPORT_SYMBOL(lfsck_register_namespace);
2460
2461 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2462                    struct dt_device *next, struct obd_device *obd,
2463                    lfsck_out_notify notify, void *notify_data, bool master)
2464 {
2465         struct lfsck_instance   *lfsck;
2466         struct dt_object        *root  = NULL;
2467         struct dt_object        *obj;
2468         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2469         int                      rc;
2470         ENTRY;
2471
2472         lfsck = lfsck_instance_find(key, false, false);
2473         if (unlikely(lfsck != NULL))
2474                 RETURN(-EEXIST);
2475
2476         OBD_ALLOC_PTR(lfsck);
2477         if (lfsck == NULL)
2478                 RETURN(-ENOMEM);
2479
2480         mutex_init(&lfsck->li_mutex);
2481         spin_lock_init(&lfsck->li_lock);
2482         CFS_INIT_LIST_HEAD(&lfsck->li_link);
2483         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
2484         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
2485         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2486         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
2487         atomic_set(&lfsck->li_ref, 1);
2488         atomic_set(&lfsck->li_double_scan_count, 0);
2489         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2490         lfsck->li_out_notify = notify;
2491         lfsck->li_out_notify_data = notify_data;
2492         lfsck->li_next = next;
2493         lfsck->li_bottom = key;
2494         lfsck->li_obd = obd;
2495
2496         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2497         if (rc != 0)
2498                 GOTO(out, rc);
2499
2500         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2501         if (rc != 0)
2502                 GOTO(out, rc);
2503
2504         fid->f_seq = FID_SEQ_LOCAL_NAME;
2505         fid->f_oid = 1;
2506         fid->f_ver = 0;
2507         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
2508         if (rc != 0)
2509                 GOTO(out, rc);
2510
2511         rc = dt_root_get(env, key, fid);
2512         if (rc != 0)
2513                 GOTO(out, rc);
2514
2515         root = dt_locate(env, lfsck->li_bottom, fid);
2516         if (IS_ERR(root))
2517                 GOTO(out, rc = PTR_ERR(root));
2518
2519         if (unlikely(!dt_try_as_dir(env, root)))
2520                 GOTO(out, rc = -ENOTDIR);
2521
2522         lfsck->li_local_root_fid = *fid;
2523         if (master) {
2524                 lfsck->li_master = 1;
2525                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
2526                         rc = dt_lookup(env, root,
2527                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2528                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2529                         if (rc != 0)
2530                                 GOTO(out, rc);
2531                 }
2532         }
2533
2534         fid->f_seq = FID_SEQ_LOCAL_FILE;
2535         fid->f_oid = OTABLE_IT_OID;
2536         fid->f_ver = 0;
2537         obj = dt_locate(env, lfsck->li_bottom, fid);
2538         if (IS_ERR(obj))
2539                 GOTO(out, rc = PTR_ERR(obj));
2540
2541         lfsck->li_obj_oit = obj;
2542         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2543         if (rc != 0) {
2544                 if (rc == -ENOTSUPP)
2545                         GOTO(add, rc = 0);
2546
2547                 GOTO(out, rc);
2548         }
2549
2550         rc = lfsck_bookmark_setup(env, lfsck);
2551         if (rc != 0)
2552                 GOTO(out, rc);
2553
2554         if (master) {
2555                 rc = lfsck_fid_init(lfsck);
2556                 if (rc < 0)
2557                         GOTO(out, rc);
2558
2559                 rc = lfsck_namespace_setup(env, lfsck);
2560                 if (rc < 0)
2561                         GOTO(out, rc);
2562         }
2563
2564         rc = lfsck_layout_setup(env, lfsck);
2565         if (rc < 0)
2566                 GOTO(out, rc);
2567
2568         /* XXX: more LFSCK components initialization to be added here. */
2569
2570 add:
2571         rc = lfsck_instance_add(lfsck);
2572         if (rc == 0)
2573                 rc = lfsck_add_target_from_orphan(env, lfsck);
2574 out:
2575         if (root != NULL && !IS_ERR(root))
2576                 lu_object_put(env, &root->do_lu);
2577         if (rc != 0)
2578                 lfsck_instance_cleanup(env, lfsck);
2579         return rc;
2580 }
2581 EXPORT_SYMBOL(lfsck_register);
2582
2583 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2584 {
2585         struct lfsck_instance *lfsck;
2586
2587         lfsck = lfsck_instance_find(key, false, true);
2588         if (lfsck != NULL)
2589                 lfsck_instance_put(env, lfsck);
2590 }
2591 EXPORT_SYMBOL(lfsck_degister);
2592
2593 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2594                      struct dt_device *tgt, struct obd_export *exp,
2595                      __u32 index, bool for_ost)
2596 {
2597         struct lfsck_instance   *lfsck;
2598         struct lfsck_tgt_desc   *ltd;
2599         int                      rc;
2600         ENTRY;
2601
2602         OBD_ALLOC_PTR(ltd);
2603         if (ltd == NULL)
2604                 RETURN(-ENOMEM);
2605
2606         ltd->ltd_tgt = tgt;
2607         ltd->ltd_key = key;
2608         ltd->ltd_exp = exp;
2609         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2610         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2611         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2612         atomic_set(&ltd->ltd_ref, 1);
2613         ltd->ltd_index = index;
2614
2615         spin_lock(&lfsck_instance_lock);
2616         lfsck = __lfsck_instance_find(key, true, false);
2617         if (lfsck == NULL) {
2618                 if (for_ost)
2619                         list_add_tail(&ltd->ltd_orphan_list,
2620                                       &lfsck_ost_orphan_list);
2621                 else
2622                         list_add_tail(&ltd->ltd_orphan_list,
2623                                       &lfsck_mdt_orphan_list);
2624                 spin_unlock(&lfsck_instance_lock);
2625
2626                 RETURN(0);
2627         }
2628         spin_unlock(&lfsck_instance_lock);
2629
2630         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2631         if (rc != 0)
2632                 lfsck_tgt_put(ltd);
2633
2634         lfsck_instance_put(env, lfsck);
2635
2636         RETURN(rc);
2637 }
2638 EXPORT_SYMBOL(lfsck_add_target);
2639
2640 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2641                       struct dt_device *tgt, __u32 index, bool for_ost)
2642 {
2643         struct lfsck_instance   *lfsck;
2644         struct lfsck_tgt_descs  *ltds;
2645         struct lfsck_tgt_desc   *ltd    = NULL;
2646         struct list_head        *head;
2647
2648         if (for_ost)
2649                 head = &lfsck_ost_orphan_list;
2650         else
2651                 head = &lfsck_mdt_orphan_list;
2652
2653         spin_lock(&lfsck_instance_lock);
2654         list_for_each_entry(ltd, head, ltd_orphan_list) {
2655                 if (ltd->ltd_tgt == tgt) {
2656                         list_del_init(&ltd->ltd_orphan_list);
2657                         spin_unlock(&lfsck_instance_lock);
2658                         lfsck_tgt_put(ltd);
2659
2660                         return;
2661                 }
2662         }
2663
2664         lfsck = __lfsck_instance_find(key, true, false);
2665         spin_unlock(&lfsck_instance_lock);
2666         if (unlikely(lfsck == NULL))
2667                 return;
2668
2669         if (for_ost)
2670                 ltds = &lfsck->li_ost_descs;
2671         else
2672                 ltds = &lfsck->li_mdt_descs;
2673
2674         down_write(&ltds->ltd_rw_sem);
2675         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2676
2677         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2678                 goto unlock;
2679
2680         ltd = LTD_TGT(ltds, index);
2681         if (unlikely(ltd == NULL))
2682                 goto unlock;
2683
2684         LASSERT(ltds->ltd_tgtnr > 0);
2685
2686         ltds->ltd_tgtnr--;
2687         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2688         LTD_TGT(ltds, index) = NULL;
2689
2690 unlock:
2691         if (ltd == NULL) {
2692                 if (for_ost)
2693                         head = &lfsck->li_ost_descs.ltd_orphan;
2694                 else
2695                         head = &lfsck->li_ost_descs.ltd_orphan;
2696
2697                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2698                         if (ltd->ltd_tgt == tgt) {
2699                                 list_del_init(&ltd->ltd_orphan_list);
2700                                 break;
2701                         }
2702                 }
2703         }
2704
2705         up_write(&ltds->ltd_rw_sem);
2706         if (ltd != NULL) {
2707                 spin_lock(&ltds->ltd_lock);
2708                 ltd->ltd_dead = 1;
2709                 spin_unlock(&ltds->ltd_lock);
2710                 lfsck_stop_notify(env, lfsck, ltds, ltd, LT_LAYOUT);
2711                 lfsck_tgt_put(ltd);
2712         }
2713
2714         lfsck_instance_put(env, lfsck);
2715 }
2716 EXPORT_SYMBOL(lfsck_del_target);
2717
2718 static int __init lfsck_init(void)
2719 {
2720         int rc;
2721
2722         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2723         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2724         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2725         rc = lu_context_key_register(&lfsck_thread_key);
2726         if (rc == 0) {
2727                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2728                 tgt_register_lfsck_query(lfsck_query);
2729         }
2730
2731         return rc;
2732 }
2733
2734 static void __exit lfsck_exit(void)
2735 {
2736         struct lfsck_tgt_desc *ltd;
2737         struct lfsck_tgt_desc *next;
2738
2739         LASSERT(cfs_list_empty(&lfsck_instance_list));
2740
2741         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2742                                  ltd_orphan_list) {
2743                 list_del_init(&ltd->ltd_orphan_list);
2744                 lfsck_tgt_put(ltd);
2745         }
2746
2747         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2748                                  ltd_orphan_list) {
2749                 list_del_init(&ltd->ltd_orphan_list);
2750                 lfsck_tgt_put(ltd);
2751         }
2752
2753         lu_context_key_degister(&lfsck_thread_key);
2754 }
2755
2756 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2757 MODULE_DESCRIPTION("LFSCK");
2758 MODULE_LICENSE("GPL");
2759
2760 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);