Whamcloud - gitweb
LU-3950 lfsck: control LFSCK on all devices via single command
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37
38 #include <lustre/lustre_idl.h>
39 #include <lu_object.h>
40 #include <dt_object.h>
41 #include <lustre_linkea.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_target {
65         /* link into lfsck_layout_slave_data::llsd_master_list. */
66         struct list_head        llst_list;
67         __u64                   llst_gen;
68         atomic_t                llst_ref;
69         __u32                   llst_index;
70 };
71
72 struct lfsck_layout_slave_data {
73         /* list for lfsck_layout_seq */
74         struct list_head         llsd_seq_list;
75
76         /* list for the masters involve layout verification. */
77         struct list_head         llsd_master_list;
78         spinlock_t               llsd_lock;
79         __u64                    llsd_touch_gen;
80 };
81
82 struct lfsck_layout_object {
83         struct dt_object        *llo_obj;
84         struct lu_attr           llo_attr;
85         atomic_t                 llo_ref;
86         __u16                    llo_gen;
87 };
88
89 struct lfsck_layout_req {
90         struct list_head                 llr_list;
91         struct lfsck_layout_object      *llr_parent;
92         struct dt_object                *llr_child;
93         __u32                            llr_ost_idx;
94         __u32                            llr_lov_idx; /* offset in LOV EA */
95 };
96
97 struct lfsck_layout_master_data {
98         spinlock_t              llmd_lock;
99         struct list_head        llmd_req_list;
100
101         /* list for the ost targets involve layout verification. */
102         struct list_head        llmd_ost_list;
103
104         /* list for the ost targets in phase1 scanning. */
105         struct list_head        llmd_ost_phase1_list;
106
107         /* list for the ost targets in phase1 scanning. */
108         struct list_head        llmd_ost_phase2_list;
109
110         /* list for the mdt targets involve layout verification. */
111         struct list_head        llmd_mdt_list;
112
113         /* list for the mdt targets in phase1 scanning. */
114         struct list_head        llmd_mdt_phase1_list;
115
116         /* list for the mdt targets in phase1 scanning. */
117         struct list_head        llmd_mdt_phase2_list;
118
119         struct ptlrpc_thread    llmd_thread;
120         atomic_t                llmd_rpcs_in_flight;
121         __u32                   llmd_touch_gen;
122         int                     llmd_prefetched;
123         int                     llmd_assistant_status;
124         int                     llmd_post_result;
125         unsigned int            llmd_to_post:1,
126                                 llmd_to_double_scan:1,
127                                 llmd_in_double_scan:1,
128                                 llmd_exit:1;
129 };
130
131 struct lfsck_layout_slave_async_args {
132         struct obd_export                *llsaa_exp;
133         struct lfsck_component           *llsaa_com;
134         struct lfsck_layout_slave_target *llsaa_llst;
135 };
136
137 static inline void
138 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
139 {
140         if (atomic_dec_and_test(&llst->llst_ref)) {
141                 LASSERT(list_empty(&llst->llst_list));
142
143                 OBD_FREE_PTR(llst);
144         }
145 }
146
147 static inline int
148 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
149 {
150         struct lfsck_layout_slave_target *llst;
151         struct lfsck_layout_slave_target *tmp;
152         int                               rc   = 0;
153
154         OBD_ALLOC_PTR(llst);
155         if (llst == NULL)
156                 return -ENOMEM;
157
158         INIT_LIST_HEAD(&llst->llst_list);
159         llst->llst_gen = 0;
160         llst->llst_index = index;
161         atomic_set(&llst->llst_ref, 1);
162
163         spin_lock(&llsd->llsd_lock);
164         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
165                 if (tmp->llst_index == index) {
166                         rc = -EALREADY;
167                         break;
168                 }
169         }
170         if (rc == 0)
171                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
172         spin_unlock(&llsd->llsd_lock);
173
174         if (rc != 0)
175                 OBD_FREE_PTR(llst);
176
177         return rc;
178 }
179
180 static inline void
181 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
182                       struct lfsck_layout_slave_target *llst)
183 {
184         bool del = false;
185
186         spin_lock(&llsd->llsd_lock);
187         if (!list_empty(&llst->llst_list)) {
188                 list_del_init(&llst->llst_list);
189                 del = true;
190         }
191         spin_unlock(&llsd->llsd_lock);
192
193         if (del)
194                 lfsck_layout_llst_put(llst);
195 }
196
197 static inline struct lfsck_layout_slave_target *
198 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
199                                __u32 index)
200 {
201         struct lfsck_layout_slave_target *llst;
202
203         spin_lock(&llsd->llsd_lock);
204         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
205                 if (llst->llst_index == index) {
206                         list_del_init(&llst->llst_list);
207                         spin_unlock(&llsd->llsd_lock);
208
209                         return llst;
210                 }
211         }
212         spin_unlock(&llsd->llsd_lock);
213
214         return NULL;
215 }
216
217 static inline void lfsck_layout_object_put(const struct lu_env *env,
218                                            struct lfsck_layout_object *llo)
219 {
220         if (atomic_dec_and_test(&llo->llo_ref)) {
221                 lfsck_object_put(env, llo->llo_obj);
222                 OBD_FREE_PTR(llo);
223         }
224 }
225
226 static inline void lfsck_layout_req_fini(const struct lu_env *env,
227                                          struct lfsck_layout_req *llr)
228 {
229         lu_object_put(env, &llr->llr_child->do_lu);
230         lfsck_layout_object_put(env, llr->llr_parent);
231         OBD_FREE_PTR(llr);
232 }
233
234 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
235 {
236         bool empty = false;
237
238         spin_lock(&llmd->llmd_lock);
239         if (list_empty(&llmd->llmd_req_list))
240                 empty = true;
241         spin_unlock(&llmd->llmd_lock);
242
243         return empty;
244 }
245
246 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
247                                    const struct lfsck_layout *src)
248 {
249         int i;
250
251         des->ll_magic = le32_to_cpu(src->ll_magic);
252         des->ll_status = le32_to_cpu(src->ll_status);
253         des->ll_flags = le32_to_cpu(src->ll_flags);
254         des->ll_success_count = le32_to_cpu(src->ll_success_count);
255         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
256         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
257         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
258         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
259         des->ll_time_last_checkpoint =
260                                 le64_to_cpu(src->ll_time_last_checkpoint);
261         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
262         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
263         des->ll_pos_first_inconsistent =
264                         le64_to_cpu(src->ll_pos_first_inconsistent);
265         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
266         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
267         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
268         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
269         for (i = 0; i < LLIT_MAX; i++)
270                 des->ll_objs_repaired[i] =
271                                 le64_to_cpu(src->ll_objs_repaired[i]);
272         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
273 }
274
275 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
276                                    const struct lfsck_layout *src)
277 {
278         int i;
279
280         des->ll_magic = cpu_to_le32(src->ll_magic);
281         des->ll_status = cpu_to_le32(src->ll_status);
282         des->ll_flags = cpu_to_le32(src->ll_flags);
283         des->ll_success_count = cpu_to_le32(src->ll_success_count);
284         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
285         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
286         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
287         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
288         des->ll_time_last_checkpoint =
289                                 cpu_to_le64(src->ll_time_last_checkpoint);
290         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
291         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
292         des->ll_pos_first_inconsistent =
293                         cpu_to_le64(src->ll_pos_first_inconsistent);
294         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
295         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
296         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
297         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
298         for (i = 0; i < LLIT_MAX; i++)
299                 des->ll_objs_repaired[i] =
300                                 cpu_to_le64(src->ll_objs_repaired[i]);
301         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
302 }
303
304 /**
305  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
306  * \retval 0: succeed.
307  * \retval -ve: failed cases.
308  */
309 static int lfsck_layout_load(const struct lu_env *env,
310                              struct lfsck_component *com)
311 {
312         struct lfsck_layout             *lo     = com->lc_file_ram;
313         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
314         ssize_t                          size   = com->lc_file_size;
315         loff_t                           pos    = 0;
316         int                              rc;
317
318         rc = dbo->dbo_read(env, com->lc_obj,
319                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
320                            BYPASS_CAPA);
321         if (rc == 0) {
322                 return -ENOENT;
323         } else if (rc < 0) {
324                 CWARN("%s: failed to load lfsck_layout: rc = %d\n",
325                       lfsck_lfsck2name(com->lc_lfsck), rc);
326                 return rc;
327         } else if (rc != size) {
328                 CWARN("%s: crashed lfsck_layout, to be reset: rc = %d\n",
329                       lfsck_lfsck2name(com->lc_lfsck), rc);
330                 return 1;
331         }
332
333         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
334         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
335                 CWARN("%s: invalid lfsck_layout magic %#x != %#x, "
336                       "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
337                       lo->ll_magic, LFSCK_LAYOUT_MAGIC);
338                 return 1;
339         }
340
341         return 0;
342 }
343
344 static int lfsck_layout_store(const struct lu_env *env,
345                               struct lfsck_component *com)
346 {
347         struct dt_object         *obj           = com->lc_obj;
348         struct lfsck_instance    *lfsck         = com->lc_lfsck;
349         struct lfsck_layout      *lo            = com->lc_file_disk;
350         struct thandle           *handle;
351         ssize_t                   size          = com->lc_file_size;
352         loff_t                    pos           = 0;
353         int                       rc;
354         ENTRY;
355
356         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
357         handle = dt_trans_create(env, lfsck->li_bottom);
358         if (IS_ERR(handle)) {
359                 rc = PTR_ERR(handle);
360                 CERROR("%s: fail to create trans for storing lfsck_layout: "
361                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
362                 RETURN(rc);
363         }
364
365         rc = dt_declare_record_write(env, obj, size, pos, handle);
366         if (rc != 0) {
367                 CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
368                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
369                 GOTO(out, rc);
370         }
371
372         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
373         if (rc != 0) {
374                 CERROR("%s: fail to start trans for storing lfsck_layout: "
375                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
376                 GOTO(out, rc);
377         }
378
379         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
380                              handle);
381         if (rc != 0)
382                 CERROR("%s: fail to store lfsck_layout(1): size = %d, "
383                        "rc = %d\n", lfsck_lfsck2name(lfsck), (int)size, rc);
384
385         GOTO(out, rc);
386
387 out:
388         dt_trans_stop(env, lfsck->li_bottom, handle);
389
390         return rc;
391 }
392
393 static int lfsck_layout_init(const struct lu_env *env,
394                              struct lfsck_component *com)
395 {
396         struct lfsck_layout *lo = com->lc_file_ram;
397         int rc;
398
399         memset(lo, 0, com->lc_file_size);
400         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
401         lo->ll_status = LS_INIT;
402         down_write(&com->lc_sem);
403         rc = lfsck_layout_store(env, com);
404         up_write(&com->lc_sem);
405
406         return rc;
407 }
408
409 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
410                              struct dt_object *obj, const struct lu_fid *fid)
411 {
412         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
413         struct lu_seq_range      range  = { 0 };
414         struct lustre_mdt_attrs *lma;
415         int                      rc;
416
417         fld_range_set_any(&range);
418         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
419         if (rc == 0) {
420                 if (fld_range_is_ost(&range))
421                         return 1;
422
423                 return 0;
424         }
425
426         lma = &lfsck_env_info(env)->lti_lma;
427         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
428                           XATTR_NAME_LMA, BYPASS_CAPA);
429         if (rc == sizeof(*lma)) {
430                 lustre_lma_swab(lma);
431
432                 /* Generally, the low layer OSD create handler or OI scrub
433                  * will set the LMAC_FID_ON_OST for all external visible
434                  * OST-objects. But to make the otable-based iteration to
435                  * be independent from OI scrub in spite of it got failure
436                  * or not, we check the LMAC_FID_ON_OST here to guarantee
437                  * that the LFSCK will not repair something by wrong. */
438                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
439         }
440
441         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
442
443         return rc > 0;
444 }
445
446 static struct lfsck_layout_seq *
447 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
448 {
449         struct lfsck_layout_seq *lls;
450
451         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
452                 if (lls->lls_seq == seq)
453                         return lls;
454
455                 if (lls->lls_seq > seq)
456                         return NULL;
457         }
458
459         return NULL;
460 }
461
462 static void
463 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
464                         struct lfsck_layout_seq *lls)
465 {
466         struct lfsck_layout_seq *tmp;
467         struct list_head        *pos = &llsd->llsd_seq_list;
468
469         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
470                 if (lls->lls_seq < tmp->lls_seq) {
471                         pos = &tmp->lls_list;
472                         break;
473                 }
474         }
475         list_add_tail(&lls->lls_list, pos);
476 }
477
478 static int
479 lfsck_layout_lastid_create(const struct lu_env *env,
480                            struct lfsck_instance *lfsck,
481                            struct dt_object *obj)
482 {
483         struct lfsck_thread_info *info   = lfsck_env_info(env);
484         struct lu_attr           *la     = &info->lti_la;
485         struct dt_object_format  *dof    = &info->lti_dof;
486         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
487         struct dt_device         *dt     = lfsck->li_bottom;
488         struct thandle           *th;
489         __u64                     lastid = 0;
490         loff_t                    pos    = 0;
491         int                       rc;
492         ENTRY;
493
494         CDEBUG(D_LFSCK, "To create LAST_ID for <seq> "LPX64"\n",
495                fid_seq(lfsck_dto2fid(obj)));
496
497         if (bk->lb_param & LPF_DRYRUN)
498                 return 0;
499
500         memset(la, 0, sizeof(*la));
501         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
502         la->la_valid = LA_MODE | LA_UID | LA_GID;
503         dof->dof_type = dt_mode_to_dft(S_IFREG);
504
505         th = dt_trans_create(env, dt);
506         if (IS_ERR(th))
507                 RETURN(rc = PTR_ERR(th));
508
509         rc = dt_declare_create(env, obj, la, NULL, dof, th);
510         if (rc != 0)
511                 GOTO(stop, rc);
512
513         rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th);
514         if (rc != 0)
515                 GOTO(stop, rc);
516
517         rc = dt_trans_start_local(env, dt, th);
518         if (rc != 0)
519                 GOTO(stop, rc);
520
521         dt_write_lock(env, obj, 0);
522         if (likely(!dt_object_exists(obj))) {
523                 rc = dt_create(env, obj, la, NULL, dof, th);
524                 if (rc == 0)
525                         rc = dt_record_write(env, obj,
526                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
527                                 &pos, th);
528         }
529         dt_write_unlock(env, obj);
530
531         GOTO(stop, rc);
532
533 stop:
534         dt_trans_stop(env, dt, th);
535
536         return rc;
537 }
538
539 static int
540 lfsck_layout_lastid_reload(const struct lu_env *env,
541                            struct lfsck_component *com,
542                            struct lfsck_layout_seq *lls)
543 {
544         __u64   lastid;
545         loff_t  pos     = 0;
546         int     rc;
547
548         dt_read_lock(env, lls->lls_lastid_obj, 0);
549         rc = dt_record_read(env, lls->lls_lastid_obj,
550                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
551         dt_read_unlock(env, lls->lls_lastid_obj);
552         if (unlikely(rc != 0))
553                 return rc;
554
555         lastid = le64_to_cpu(lastid);
556         if (lastid < lls->lls_lastid_known) {
557                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
558                 struct lfsck_layout     *lo     = com->lc_file_ram;
559
560                 lls->lls_lastid = lls->lls_lastid_known;
561                 lls->lls_dirty = 1;
562                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
563                         LASSERT(lfsck->li_out_notify != NULL);
564
565                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
566                                              LE_LASTID_REBUILDING);
567                         lo->ll_flags |= LF_CRASHED_LASTID;
568                 }
569         } else if (lastid >= lls->lls_lastid) {
570                 lls->lls_lastid = lastid;
571                 lls->lls_dirty = 0;
572         }
573
574         return 0;
575 }
576
577 static int
578 lfsck_layout_lastid_store(const struct lu_env *env,
579                           struct lfsck_component *com)
580 {
581         struct lfsck_instance           *lfsck  = com->lc_lfsck;
582         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
583         struct dt_device                *dt     = lfsck->li_bottom;
584         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
585         struct lfsck_layout_seq         *lls;
586         struct thandle                  *th;
587         __u64                            lastid;
588         int                              rc     = 0;
589         int                              rc1    = 0;
590
591         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
592                 loff_t pos = 0;
593
594                 /* XXX: Add the code back if we really found related
595                  *      inconsistent cases in the future. */
596 #if 0
597                 if (!lls->lls_dirty) {
598                         /* In OFD, before the pre-creation, the LAST_ID
599                          * file will be updated firstly, which may hide
600                          * some potential crashed cases. For example:
601                          *
602                          * The old obj1's ID is higher than old LAST_ID
603                          * but lower than the new LAST_ID, but the LFSCK
604                          * have not touch the obj1 until the OFD updated
605                          * the LAST_ID. So the LFSCK does not regard it
606                          * as crashed case. But when OFD does not create
607                          * successfully, it will set the LAST_ID as the
608                          * real created objects' ID, then LFSCK needs to
609                          * found related inconsistency. */
610                         rc = lfsck_layout_lastid_reload(env, com, lls);
611                         if (likely(!lls->lls_dirty))
612                                 continue;
613                 }
614 #endif
615
616                 CDEBUG(D_LFSCK, "To sync the LAST_ID for <seq> "LPX64
617                        " as <oid> "LPU64"\n", lls->lls_seq, lls->lls_lastid);
618
619                 if (bk->lb_param & LPF_DRYRUN) {
620                         lls->lls_dirty = 0;
621                         continue;
622                 }
623
624                 th = dt_trans_create(env, dt);
625                 if (IS_ERR(th)) {
626                         rc1 = PTR_ERR(th);
627                         CERROR("%s: (1) failed to store "LPX64": rc = %d\n",
628                                lfsck_lfsck2name(com->lc_lfsck),
629                                lls->lls_seq, rc1);
630                         continue;
631                 }
632
633                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
634                                              sizeof(lastid), pos, th);
635                 if (rc != 0)
636                         goto stop;
637
638                 rc = dt_trans_start_local(env, dt, th);
639                 if (rc != 0)
640                         goto stop;
641
642                 lastid = cpu_to_le64(lls->lls_lastid);
643                 dt_write_lock(env, lls->lls_lastid_obj, 0);
644                 rc = dt_record_write(env, lls->lls_lastid_obj,
645                                      lfsck_buf_get(env, &lastid,
646                                      sizeof(lastid)), &pos, th);
647                 dt_write_unlock(env, lls->lls_lastid_obj);
648                 if (rc == 0)
649                         lls->lls_dirty = 0;
650
651 stop:
652                 dt_trans_stop(env, dt, th);
653                 if (rc != 0) {
654                         rc1 = rc;
655                         CERROR("%s: (2) failed to store "LPX64": rc = %d\n",
656                                lfsck_lfsck2name(com->lc_lfsck),
657                                lls->lls_seq, rc1);
658                 }
659         }
660
661         return rc1;
662 }
663
664 static int
665 lfsck_layout_lastid_load(const struct lu_env *env,
666                          struct lfsck_component *com,
667                          struct lfsck_layout_seq *lls)
668 {
669         struct lfsck_instance   *lfsck  = com->lc_lfsck;
670         struct lfsck_layout     *lo     = com->lc_file_ram;
671         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
672         struct dt_object        *obj;
673         loff_t                   pos    = 0;
674         int                      rc;
675         ENTRY;
676
677         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
678         obj = dt_locate(env, lfsck->li_bottom, fid);
679         if (IS_ERR(obj))
680                 RETURN(PTR_ERR(obj));
681
682         /* LAST_ID crashed, to be rebuilt */
683         if (!dt_object_exists(obj)) {
684                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
685                         LASSERT(lfsck->li_out_notify != NULL);
686
687                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
688                                              LE_LASTID_REBUILDING);
689                         lo->ll_flags |= LF_CRASHED_LASTID;
690
691                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
692                             cfs_fail_val > 0) {
693                                 struct l_wait_info lwi = LWI_TIMEOUT(
694                                                 cfs_time_seconds(cfs_fail_val),
695                                                 NULL, NULL);
696
697                                 up_write(&com->lc_sem);
698                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
699                                              !thread_is_running(&lfsck->li_thread),
700                                              &lwi);
701                                 down_write(&com->lc_sem);
702                         }
703                 }
704
705                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
706         } else {
707                 dt_read_lock(env, obj, 0);
708                 rc = dt_read(env, obj,
709                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
710                         &pos);
711                 dt_read_unlock(env, obj);
712                 if (rc != 0 && rc != sizeof(__u64))
713                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
714
715                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
716                         LASSERT(lfsck->li_out_notify != NULL);
717
718                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
719                                              LE_LASTID_REBUILDING);
720                         lo->ll_flags |= LF_CRASHED_LASTID;
721                 }
722
723                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
724                 rc = 0;
725         }
726
727         GOTO(out, rc);
728
729 out:
730         if (rc != 0)
731                 lfsck_object_put(env, obj);
732         else
733                 lls->lls_lastid_obj = obj;
734
735         return rc;
736 }
737
738 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
739                                                struct ptlrpc_request *req,
740                                                void *args, int rc)
741 {
742         struct lfsck_async_interpret_args *laia = args;
743         struct lfsck_component            *com  = laia->laia_com;
744         struct lfsck_layout_master_data   *llmd = com->lc_data;
745         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
746         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
747         struct lfsck_request              *lr   = laia->laia_lr;
748
749         switch (lr->lr_event) {
750         case LE_START:
751                 if (rc != 0) {
752                         struct lfsck_layout *lo = com->lc_file_ram;
753
754                         lo->ll_flags |= LF_INCOMPLETE;
755                         lfsck_tgt_put(ltd);
756                         break;
757                 }
758
759                 spin_lock(&ltds->ltd_lock);
760                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
761                         spin_unlock(&ltds->ltd_lock);
762                         lfsck_tgt_put(ltd);
763                         break;
764                 }
765
766                 if (lr->lr_flags & LEF_TO_OST) {
767                         if (list_empty(&ltd->ltd_layout_list))
768                                 list_add_tail(&ltd->ltd_layout_list,
769                                               &llmd->llmd_ost_list);
770                         if (list_empty(&ltd->ltd_layout_phase_list))
771                                 list_add_tail(&ltd->ltd_layout_phase_list,
772                                               &llmd->llmd_ost_phase1_list);
773                 } else {
774                         if (list_empty(&ltd->ltd_layout_list))
775                                 list_add_tail(&ltd->ltd_layout_list,
776                                               &llmd->llmd_mdt_list);
777                         if (list_empty(&ltd->ltd_layout_phase_list))
778                                 list_add_tail(&ltd->ltd_layout_phase_list,
779                                               &llmd->llmd_mdt_phase1_list);
780                 }
781                 spin_unlock(&ltds->ltd_lock);
782                 lfsck_tgt_put(ltd);
783                 break;
784         case LE_STOP:
785         case LE_PHASE1_DONE:
786         case LE_PHASE2_DONE:
787                 if (rc != 0)
788                         CERROR("%s: fail to notify %s %x for layout: "
789                                "event = %d, rc = %d\n",
790                                lfsck_lfsck2name(com->lc_lfsck),
791                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
792                                ltd->ltd_index, lr->lr_event, rc);
793                 break;
794         case LE_QUERY: {
795                 struct lfsck_reply *reply;
796
797                 if (rc != 0) {
798                         spin_lock(&ltds->ltd_lock);
799                         list_del_init(&ltd->ltd_layout_phase_list);
800                         list_del_init(&ltd->ltd_layout_list);
801                         spin_unlock(&ltds->ltd_lock);
802                         lfsck_tgt_put(ltd);
803                         break;
804                 }
805
806                 reply = req_capsule_server_get(&req->rq_pill,
807                                                &RMF_LFSCK_REPLY);
808                 if (reply == NULL) {
809                         rc = -EPROTO;
810                         CERROR("%s: invalid return value: rc = %d\n",
811                                lfsck_lfsck2name(com->lc_lfsck), rc);
812                         spin_lock(&ltds->ltd_lock);
813                         list_del_init(&ltd->ltd_layout_phase_list);
814                         list_del_init(&ltd->ltd_layout_list);
815                         spin_unlock(&ltds->ltd_lock);
816                         lfsck_tgt_put(ltd);
817                         break;
818                 }
819
820                 switch (reply->lr_status) {
821                 case LS_SCANNING_PHASE1:
822                         break;
823                 case LS_SCANNING_PHASE2:
824                         spin_lock(&ltds->ltd_lock);
825                         list_del_init(&ltd->ltd_layout_phase_list);
826                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
827                                 spin_unlock(&ltds->ltd_lock);
828                                 break;
829                         }
830
831                         if (lr->lr_flags & LEF_TO_OST)
832                                 list_add_tail(&ltd->ltd_layout_phase_list,
833                                               &llmd->llmd_ost_phase2_list);
834                         else
835                                 list_add_tail(&ltd->ltd_layout_phase_list,
836                                               &llmd->llmd_mdt_phase2_list);
837                         spin_unlock(&ltds->ltd_lock);
838                         break;
839                 default:
840                         spin_lock(&ltds->ltd_lock);
841                         list_del_init(&ltd->ltd_layout_phase_list);
842                         list_del_init(&ltd->ltd_layout_list);
843                         spin_unlock(&ltds->ltd_lock);
844                         break;
845                 }
846                 lfsck_tgt_put(ltd);
847                 break;
848         }
849         default:
850                 CERROR("%s: unexpected event: rc = %d\n",
851                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
852                 break;
853         }
854
855         lfsck_component_put(env, com);
856
857         return 0;
858 }
859
860 static int lfsck_layout_master_query_others(const struct lu_env *env,
861                                             struct lfsck_component *com)
862 {
863         struct lfsck_thread_info          *info  = lfsck_env_info(env);
864         struct lfsck_request              *lr    = &info->lti_lr;
865         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
866         struct lfsck_instance             *lfsck = com->lc_lfsck;
867         struct lfsck_layout_master_data   *llmd  = com->lc_data;
868         struct ptlrpc_request_set         *set;
869         struct lfsck_tgt_descs            *ltds;
870         struct lfsck_tgt_desc             *ltd;
871         struct list_head                  *head;
872         __u32                              cnt   = 0;
873         int                                rc    = 0;
874         int                                rc1   = 0;
875         ENTRY;
876
877         set = ptlrpc_prep_set();
878         if (set == NULL)
879                 RETURN(-ENOMEM);
880
881         llmd->llmd_touch_gen++;
882         memset(lr, 0, sizeof(*lr));
883         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
884         lr->lr_event = LE_QUERY;
885         lr->lr_active = LT_LAYOUT;
886         laia->laia_com = com;
887         laia->laia_lr = lr;
888
889         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
890                 ltds = &lfsck->li_mdt_descs;
891                 lr->lr_flags = 0;
892                 head = &llmd->llmd_mdt_phase1_list;
893         } else {
894
895 again:
896                 ltds = &lfsck->li_ost_descs;
897                 lr->lr_flags = LEF_TO_OST;
898                 head = &llmd->llmd_ost_phase1_list;
899         }
900
901         laia->laia_ltds = ltds;
902         spin_lock(&ltds->ltd_lock);
903         while (!list_empty(head)) {
904                 ltd = list_entry(head->next,
905                                  struct lfsck_tgt_desc,
906                                  ltd_layout_phase_list);
907                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
908                         break;
909
910                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
911                 list_del(&ltd->ltd_layout_phase_list);
912                 list_add_tail(&ltd->ltd_layout_phase_list, head);
913                 atomic_inc(&ltd->ltd_ref);
914                 laia->laia_ltd = ltd;
915                 spin_unlock(&ltds->ltd_lock);
916                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
917                                          lfsck_layout_master_async_interpret,
918                                          laia, LFSCK_QUERY);
919                 if (rc != 0) {
920                         CERROR("%s: fail to query %s %x for layout: rc = %d\n",
921                                lfsck_lfsck2name(lfsck),
922                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
923                                ltd->ltd_index, rc);
924                         lfsck_tgt_put(ltd);
925                         rc1 = rc;
926                 } else {
927                         cnt++;
928                 }
929                 spin_lock(&ltds->ltd_lock);
930         }
931         spin_unlock(&ltds->ltd_lock);
932
933         if (cnt > 0) {
934                 rc = ptlrpc_set_wait(set);
935                 if (rc < 0) {
936                         ptlrpc_set_destroy(set);
937                         RETURN(rc);
938                 }
939                 cnt = 0;
940         }
941
942         if (!(lr->lr_flags & LEF_TO_OST) &&
943             list_empty(&llmd->llmd_mdt_phase1_list))
944                 goto again;
945
946         ptlrpc_set_destroy(set);
947
948         RETURN(rc1 != 0 ? rc1 : rc);
949 }
950
951 static inline bool
952 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
953 {
954         return list_empty(&llmd->llmd_mdt_phase1_list) &&
955                (!list_empty(&llmd->llmd_ost_phase2_list) ||
956                 list_empty(&llmd->llmd_ost_phase1_list));
957 }
958
959 static int lfsck_layout_master_notify_others(const struct lu_env *env,
960                                              struct lfsck_component *com,
961                                              struct lfsck_request *lr,
962                                              __u32 flags)
963 {
964         struct lfsck_thread_info          *info  = lfsck_env_info(env);
965         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
966         struct lfsck_instance             *lfsck = com->lc_lfsck;
967         struct lfsck_layout_master_data   *llmd  = com->lc_data;
968         struct lfsck_layout               *lo    = com->lc_file_ram;
969         struct ptlrpc_request_set         *set;
970         struct lfsck_tgt_descs            *ltds;
971         struct lfsck_tgt_desc             *ltd;
972         struct lfsck_tgt_desc             *next;
973         struct list_head                  *head;
974         __u32                              idx;
975         __u32                              cnt   = 0;
976         int                                rc    = 0;
977         ENTRY;
978
979         set = ptlrpc_prep_set();
980         if (set == NULL)
981                 RETURN(-ENOMEM);
982
983         lr->lr_active = LT_LAYOUT;
984         laia->laia_com = com;
985         laia->laia_lr = lr;
986         lr->lr_flags = 0;
987         switch (lr->lr_event) {
988         case LE_START:
989                 /* Notify OSTs firstly, then other MDTs if needed. */
990                 lr->lr_flags |= LEF_TO_OST;
991                 ltds = &lfsck->li_ost_descs;
992
993 lable1:
994                 laia->laia_ltds = ltds;
995                 down_read(&ltds->ltd_rw_sem);
996                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
997                         ltd = lfsck_tgt_get(ltds, idx);
998                         LASSERT(ltd != NULL);
999
1000                         laia->laia_ltd = ltd;
1001                         ltd->ltd_layout_done = 0;
1002                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1003                                         lfsck_layout_master_async_interpret,
1004                                         laia, LFSCK_NOTIFY);
1005                         if (rc != 0) {
1006                                 CERROR("%s: fail to notify %s %x for layout "
1007                                        "start: rc = %d\n",
1008                                        lfsck_lfsck2name(lfsck),
1009                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1010                                        "MDT", idx, rc);
1011                                 lfsck_tgt_put(ltd);
1012                                 lo->ll_flags |= LF_INCOMPLETE;
1013                         } else {
1014                                 cnt++;
1015                         }
1016                 }
1017                 up_read(&ltds->ltd_rw_sem);
1018
1019                 /* Sync up */
1020                 if (cnt > 0) {
1021                         rc = ptlrpc_set_wait(set);
1022                         if (rc < 0) {
1023                                 ptlrpc_set_destroy(set);
1024                                 RETURN(rc);
1025                         }
1026                         cnt = 0;
1027                 }
1028
1029                 if (!(flags & LPF_ALL_MDT))
1030                         break;
1031
1032                 ltds = &lfsck->li_mdt_descs;
1033                 /* The sponsor broadcasts the request to other MDTs. */
1034                 if (flags & LPF_BROADCAST) {
1035                         flags &= ~LPF_ALL_MDT;
1036                         lr->lr_flags &= ~LEF_TO_OST;
1037                         goto lable1;
1038                 }
1039
1040                 /* non-sponsors link other MDT targets locallly. */
1041                 spin_lock(&ltds->ltd_lock);
1042                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1043                         ltd = LTD_TGT(ltds, idx);
1044                         LASSERT(ltd != NULL);
1045
1046                         if (!list_empty(&ltd->ltd_layout_list))
1047                                 continue;
1048
1049                         list_add_tail(&ltd->ltd_layout_list,
1050                                       &llmd->llmd_mdt_list);
1051                         list_add_tail(&ltd->ltd_layout_phase_list,
1052                                       &llmd->llmd_mdt_phase1_list);
1053                 }
1054                 spin_unlock(&ltds->ltd_lock);
1055
1056                 break;
1057         case LE_STOP:
1058                 if (flags & LPF_BROADCAST)
1059                         lr->lr_flags |= LEF_FORCE_STOP;
1060         case LE_PHASE2_DONE:
1061                 /* Notify other MDTs if needed, then the OSTs. */
1062                 if (flags & LPF_ALL_MDT) {
1063                         /* The sponsor broadcasts the request to other MDTs. */
1064                         if (flags & LPF_BROADCAST) {
1065                                 lr->lr_flags &= ~LEF_TO_OST;
1066                                 head = &llmd->llmd_mdt_list;
1067                                 ltds = &lfsck->li_mdt_descs;
1068                                 goto lable3;
1069                         }
1070
1071                         /* non-sponsors unlink other MDT targets locallly. */
1072                         ltds = &lfsck->li_mdt_descs;
1073                         spin_lock(&ltds->ltd_lock);
1074                         list_for_each_entry_safe(ltd, next,
1075                                                  &llmd->llmd_mdt_list,
1076                                                  ltd_layout_list) {
1077                                 list_del_init(&ltd->ltd_layout_phase_list);
1078                                 list_del_init(&ltd->ltd_layout_list);
1079                         }
1080                         spin_unlock(&ltds->ltd_lock);
1081                 }
1082
1083 lable2:
1084                 lr->lr_flags |= LEF_TO_OST;
1085                 head = &llmd->llmd_ost_list;
1086                 ltds = &lfsck->li_ost_descs;
1087
1088 lable3:
1089                 laia->laia_ltds = ltds;
1090                 spin_lock(&ltds->ltd_lock);
1091                 while (!list_empty(head)) {
1092                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1093                                          ltd_layout_list);
1094                         if (!list_empty(&ltd->ltd_layout_phase_list))
1095                                 list_del_init(&ltd->ltd_layout_phase_list);
1096                         list_del_init(&ltd->ltd_layout_list);
1097                         laia->laia_ltd = ltd;
1098                         spin_unlock(&ltds->ltd_lock);
1099                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1100                                         lfsck_layout_master_async_interpret,
1101                                         laia, LFSCK_NOTIFY);
1102                         if (rc != 0)
1103                                 CERROR("%s: fail to notify %s %x for layout "
1104                                        "stop/phase2: rc = %d\n",
1105                                        lfsck_lfsck2name(lfsck),
1106                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1107                                        "MDT", ltd->ltd_index, rc);
1108                         else
1109                                 cnt++;
1110                         spin_lock(&ltds->ltd_lock);
1111                 }
1112                 spin_unlock(&ltds->ltd_lock);
1113
1114                 if (!(flags & LPF_BROADCAST))
1115                         break;
1116
1117                 /* Sync up */
1118                 if (cnt > 0) {
1119                         rc = ptlrpc_set_wait(set);
1120                         if (rc < 0) {
1121                                 ptlrpc_set_destroy(set);
1122                                 RETURN(rc);
1123                         }
1124                         cnt = 0;
1125                 }
1126
1127                 flags &= ~LPF_BROADCAST;
1128                 goto lable2;
1129         case LE_PHASE1_DONE:
1130                 llmd->llmd_touch_gen++;
1131                 lr->lr_flags &= ~LEF_TO_OST;
1132                 ltds = &lfsck->li_mdt_descs;
1133                 laia->laia_ltds = ltds;
1134                 spin_lock(&ltds->ltd_lock);
1135                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1136                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1137                                          struct lfsck_tgt_desc,
1138                                          ltd_layout_phase_list);
1139                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1140                                 break;
1141
1142                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1143                         list_del_init(&ltd->ltd_layout_phase_list);
1144                         list_add_tail(&ltd->ltd_layout_phase_list,
1145                                       &llmd->llmd_mdt_phase1_list);
1146                         laia->laia_ltd = ltd;
1147                         spin_unlock(&ltds->ltd_lock);
1148                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1149                                         lfsck_layout_master_async_interpret,
1150                                         laia, LFSCK_NOTIFY);
1151                         if (rc != 0)
1152                                 CERROR("%s: fail to notify MDT %x for layout "
1153                                        "phase1 done: rc = %d\n",
1154                                        lfsck_lfsck2name(lfsck),
1155                                        ltd->ltd_index, rc);
1156                         else
1157                                 cnt++;
1158                         spin_lock(&ltds->ltd_lock);
1159                 }
1160                 spin_unlock(&ltds->ltd_lock);
1161                 break;
1162         default:
1163                 CERROR("%s: unexpected LFSCK event: rc = %d\n",
1164                        lfsck_lfsck2name(lfsck), lr->lr_event);
1165                 rc = -EINVAL;
1166                 break;
1167         }
1168
1169         if (cnt > 0)
1170                 rc = ptlrpc_set_wait(set);
1171         ptlrpc_set_destroy(set);
1172
1173         if (rc == 0 && lr->lr_event == LE_START &&
1174             list_empty(&llmd->llmd_ost_list))
1175                 rc = -ENODEV;
1176
1177         RETURN(rc);
1178 }
1179
1180 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1181                                            struct lfsck_component *com,
1182                                            int rc)
1183 {
1184         struct lfsck_instance   *lfsck = com->lc_lfsck;
1185         struct lfsck_layout     *lo    = com->lc_file_ram;
1186         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1187
1188         down_write(&com->lc_sem);
1189
1190         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1191                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1192         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1193         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1194
1195         if (rc > 0) {
1196                 com->lc_journal = 0;
1197                 if (lo->ll_flags & LF_INCOMPLETE)
1198                         lo->ll_status = LS_PARTIAL;
1199                 else
1200                         lo->ll_status = LS_COMPLETED;
1201                 if (!(bk->lb_param & LPF_DRYRUN))
1202                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1203                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1204                 lo->ll_success_count++;
1205         } else if (rc == 0) {
1206                 lo->ll_status = lfsck->li_status;
1207                 if (lo->ll_status == 0)
1208                         lo->ll_status = LS_STOPPED;
1209         } else {
1210                 lo->ll_status = LS_FAILED;
1211         }
1212
1213         if (lo->ll_status != LS_PAUSED) {
1214                 spin_lock(&lfsck->li_lock);
1215                 list_del_init(&com->lc_link);
1216                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1217                 spin_unlock(&lfsck->li_lock);
1218         }
1219
1220         rc = lfsck_layout_store(env, com);
1221
1222         up_write(&com->lc_sem);
1223
1224         return rc;
1225 }
1226
1227 static int lfsck_layout_scan_orphan(const struct lu_env *env,
1228                                     struct lfsck_component *com,
1229                                     struct lfsck_tgt_desc *ltd)
1230 {
1231         /* XXX: To be extended in other patch. */
1232
1233         return 0;
1234 }
1235
1236 static int lfsck_layout_assistant(void *args)
1237 {
1238         struct lfsck_thread_args        *lta     = args;
1239         struct lu_env                   *env     = &lta->lta_env;
1240         struct lfsck_component          *com     = lta->lta_com;
1241         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
1242         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
1243         struct lfsck_position           *pos     = &com->lc_pos_start;
1244         struct lfsck_thread_info        *info    = lfsck_env_info(env);
1245         struct lfsck_request            *lr      = &info->lti_lr;
1246         struct lfsck_layout_master_data *llmd    = com->lc_data;
1247         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1248         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1249         struct lfsck_layout_req         *llr;
1250         struct l_wait_info               lwi     = { 0 };
1251         int                              rc      = 0;
1252         int                              rc1     = 0;
1253         __u32                            flags;
1254         ENTRY;
1255
1256         if (lta->lta_lsp->lsp_start != NULL)
1257                 flags  = lta->lta_lsp->lsp_start->ls_flags;
1258         else
1259                 flags = bk->lb_param;
1260         memset(lr, 0, sizeof(*lr));
1261         lr->lr_event = LE_START;
1262         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1263         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1264                        LSV_ASYNC_WINDOWS;
1265         lr->lr_speed = bk->lb_speed_limit;
1266         lr->lr_version = bk->lb_version;
1267         lr->lr_param = bk->lb_param;
1268         lr->lr_async_windows = bk->lb_async_windows;
1269         if (pos->lp_oit_cookie <= 1)
1270                 lr->lr_param |= LPF_RESET;
1271
1272         rc = lfsck_layout_master_notify_others(env, com, lr, flags);
1273         if (rc != 0) {
1274                 CERROR("%s: fail to notify others for layout start: rc = %d\n",
1275                        lfsck_lfsck2name(lfsck), rc);
1276                 GOTO(fini, rc);
1277         }
1278
1279         spin_lock(&llmd->llmd_lock);
1280         thread_set_flags(athread, SVC_RUNNING);
1281         spin_unlock(&llmd->llmd_lock);
1282         wake_up_all(&mthread->t_ctl_waitq);
1283
1284         while (1) {
1285                 while (!list_empty(&llmd->llmd_req_list)) {
1286                         bool wakeup = false;
1287
1288                         l_wait_event(athread->t_ctl_waitq,
1289                                      bk->lb_async_windows == 0 ||
1290                                      atomic_read(&llmd->llmd_rpcs_in_flight) <
1291                                                 bk->lb_async_windows ||
1292                                      llmd->llmd_exit,
1293                                      &lwi);
1294
1295                         if (unlikely(llmd->llmd_exit))
1296                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
1297
1298                         /* XXX: To be extended in other patch.
1299                          *
1300                          * Compare the OST side attribute with local attribute,
1301                          * and fix it if found inconsistency. */
1302
1303                         spin_lock(&llmd->llmd_lock);
1304                         llr = list_entry(llmd->llmd_req_list.next,
1305                                          struct lfsck_layout_req,
1306                                          llr_list);
1307                         list_del_init(&llr->llr_list);
1308                         if (bk->lb_async_windows != 0 &&
1309                             llmd->llmd_prefetched >= bk->lb_async_windows)
1310                                 wakeup = true;
1311
1312                         llmd->llmd_prefetched--;
1313                         spin_unlock(&llmd->llmd_lock);
1314                         if (wakeup)
1315                                 wake_up_all(&mthread->t_ctl_waitq);
1316
1317                         lfsck_layout_req_fini(env, llr);
1318                 }
1319
1320                 /* Wakeup the master engine if it is waiting in checkpoint. */
1321                 if (atomic_read(&llmd->llmd_rpcs_in_flight) == 0)
1322                         wake_up_all(&mthread->t_ctl_waitq);
1323
1324                 l_wait_event(athread->t_ctl_waitq,
1325                              !lfsck_layout_req_empty(llmd) ||
1326                              llmd->llmd_exit ||
1327                              llmd->llmd_to_post ||
1328                              llmd->llmd_to_double_scan,
1329                              &lwi);
1330
1331                 if (unlikely(llmd->llmd_exit))
1332                         GOTO(cleanup1, rc = llmd->llmd_post_result);
1333
1334                 if (!list_empty(&llmd->llmd_req_list))
1335                         continue;
1336
1337                 if (llmd->llmd_to_post) {
1338                         llmd->llmd_to_post = 0;
1339                         LASSERT(llmd->llmd_post_result > 0);
1340
1341                         memset(lr, 0, sizeof(*lr));
1342                         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1343                         lr->lr_event = LE_PHASE1_DONE;
1344                         lr->lr_status = llmd->llmd_post_result;
1345                         rc = lfsck_layout_master_notify_others(env, com, lr, 0);
1346                         if (rc != 0)
1347                                 CERROR("%s: failed to notify others "
1348                                        "for layout post: rc = %d\n",
1349                                        lfsck_lfsck2name(lfsck), rc);
1350
1351                         /* Wakeup the master engine to go ahead. */
1352                         wake_up_all(&mthread->t_ctl_waitq);
1353                 }
1354
1355                 if (llmd->llmd_to_double_scan) {
1356                         llmd->llmd_to_double_scan = 0;
1357                         atomic_inc(&lfsck->li_double_scan_count);
1358                         llmd->llmd_in_double_scan = 1;
1359                         wake_up_all(&mthread->t_ctl_waitq);
1360
1361                         while (llmd->llmd_in_double_scan) {
1362                                 struct lfsck_tgt_descs  *ltds =
1363                                                         &lfsck->li_ost_descs;
1364                                 struct lfsck_tgt_desc   *ltd;
1365
1366                                 rc = lfsck_layout_master_query_others(env, com);
1367                                 if (lfsck_layout_master_to_orphan(llmd))
1368                                         goto orphan;
1369
1370                                 if (rc < 0)
1371                                         GOTO(cleanup2, rc);
1372
1373                                 /* Pull LFSCK status on related targets once
1374                                  * per 30 seconds if we are not notified. */
1375                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
1376                                                            cfs_time_seconds(1),
1377                                                            NULL, NULL);
1378                                 rc = l_wait_event(athread->t_ctl_waitq,
1379                                         lfsck_layout_master_to_orphan(llmd) ||
1380                                         llmd->llmd_exit ||
1381                                         !thread_is_running(mthread),
1382                                         &lwi);
1383
1384                                 if (unlikely(llmd->llmd_exit ||
1385                                              !thread_is_running(mthread)))
1386                                         GOTO(cleanup2, rc = 0);
1387
1388                                 if (rc == -ETIMEDOUT)
1389                                         continue;
1390
1391                                 if (rc < 0)
1392                                         GOTO(cleanup2, rc);
1393
1394 orphan:
1395                                 spin_lock(&ltds->ltd_lock);
1396                                 while (!list_empty(
1397                                                 &llmd->llmd_ost_phase2_list)) {
1398                                         ltd = list_entry(
1399                                               llmd->llmd_ost_phase2_list.next,
1400                                               struct lfsck_tgt_desc,
1401                                               ltd_layout_phase_list);
1402                                         list_del_init(
1403                                                 &ltd->ltd_layout_phase_list);
1404                                         spin_unlock(&ltds->ltd_lock);
1405
1406                                         rc = lfsck_layout_scan_orphan(env, com,
1407                                                                       ltd);
1408                                         if (rc != 0 &&
1409                                             bk->lb_param & LPF_FAILOUT)
1410                                                 GOTO(cleanup2, rc);
1411
1412                                         if (unlikely(llmd->llmd_exit ||
1413                                                 !thread_is_running(mthread)))
1414                                                 GOTO(cleanup2, rc = 0);
1415
1416                                         spin_lock(&ltds->ltd_lock);
1417                                 }
1418
1419                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
1420                                         spin_unlock(&ltds->ltd_lock);
1421                                         GOTO(cleanup2, rc = 1);
1422                                 }
1423                                 spin_unlock(&ltds->ltd_lock);
1424                         }
1425                 }
1426         }
1427
1428 cleanup1:
1429         /* Cleanup the unfinished requests. */
1430         spin_lock(&llmd->llmd_lock);
1431         while (!list_empty(&llmd->llmd_req_list)) {
1432                 llr = list_entry(llmd->llmd_req_list.next,
1433                                  struct lfsck_layout_req,
1434                                  llr_list);
1435                 list_del_init(&llr->llr_list);
1436                 llmd->llmd_prefetched--;
1437                 spin_unlock(&llmd->llmd_lock);
1438                 lfsck_layout_req_fini(env, llr);
1439                 spin_lock(&llmd->llmd_lock);
1440         }
1441         spin_unlock(&llmd->llmd_lock);
1442
1443         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
1444                  llmd->llmd_prefetched);
1445
1446         l_wait_event(athread->t_ctl_waitq,
1447                      atomic_read(&llmd->llmd_rpcs_in_flight) == 0,
1448                      &lwi);
1449
1450 cleanup2:
1451         memset(lr, 0, sizeof(*lr));
1452         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1453         if (rc > 0) {
1454                 lr->lr_event = LE_PHASE2_DONE;
1455                 flags = 0;
1456                 lr->lr_status = rc;
1457         } else if (rc == 0) {
1458                 lr->lr_event = LE_STOP;
1459                 if (lfsck->li_status == LS_PAUSED ||
1460                     lfsck->li_status == LS_CO_PAUSED) {
1461                         flags = 0;
1462                         lr->lr_status = LS_CO_PAUSED;
1463                 } else if (lfsck->li_status == LS_STOPPED ||
1464                          lfsck->li_status == LS_CO_STOPPED) {
1465                         flags = lfsck->li_flags;
1466                         if (flags & LPF_BROADCAST)
1467                                 lr->lr_status = LS_STOPPED;
1468                         else
1469                                 lr->lr_status = LS_CO_STOPPED;
1470                 } else {
1471                         LBUG();
1472                 }
1473         } else {
1474                 lr->lr_event = LE_STOP;
1475                 flags = 0;
1476                 lr->lr_status = LS_CO_FAILED;
1477         }
1478
1479         rc1 = lfsck_layout_master_notify_others(env, com, lr, flags);
1480         if (rc1 != 0) {
1481                 CERROR("%s: failed to notify others for layout quit: rc = %d\n",
1482                        lfsck_lfsck2name(lfsck), rc1);
1483                 rc = rc1;
1484         }
1485
1486         /* Under force exit case, some requests may be just freed without
1487          * verification, those objects should be re-handled when next run.
1488          * So not update the on-disk tracing file under such case. */
1489         if (!llmd->llmd_exit)
1490                 rc1 = lfsck_layout_double_scan_result(env, com, rc);
1491
1492 fini:
1493         if (llmd->llmd_in_double_scan)
1494                 atomic_dec(&lfsck->li_double_scan_count);
1495
1496         spin_lock(&llmd->llmd_lock);
1497         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
1498         thread_set_flags(athread, SVC_STOPPED);
1499         wake_up_all(&mthread->t_ctl_waitq);
1500         spin_unlock(&llmd->llmd_lock);
1501         lfsck_thread_args_fini(lta);
1502
1503         return rc;
1504 }
1505
1506 static int
1507 lfsck_layout_slave_async_interpret(const struct lu_env *env,
1508                                    struct ptlrpc_request *req,
1509                                    void *args, int rc)
1510 {
1511         struct lfsck_layout_slave_async_args *llsaa = args;
1512         struct obd_export                    *exp   = llsaa->llsaa_exp;
1513         struct lfsck_component               *com   = llsaa->llsaa_com;
1514         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
1515         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
1516         bool                                  done  = false;
1517
1518         if (rc != 0) {
1519                 /* It is quite probably caused by target crash,
1520                  * to make the LFSCK can go ahead, assume that
1521                  * the target finished the LFSCK prcoessing. */
1522                 done = true;
1523         } else {
1524                 struct lfsck_reply *lr;
1525
1526                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
1527                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
1528                     lr->lr_status != LS_SCANNING_PHASE2)
1529                         done = true;
1530         }
1531         if (done)
1532                 lfsck_layout_llst_del(llsd, llst);
1533         lfsck_layout_llst_put(llst);
1534         lfsck_component_put(env, com);
1535         class_export_put(exp);
1536
1537         return 0;
1538 }
1539
1540 static int lfsck_layout_async_query(const struct lu_env *env,
1541                                     struct lfsck_component *com,
1542                                     struct obd_export *exp,
1543                                     struct lfsck_layout_slave_target *llst,
1544                                     struct lfsck_request *lr,
1545                                     struct ptlrpc_request_set *set)
1546 {
1547         struct lfsck_layout_slave_async_args *llsaa;
1548         struct ptlrpc_request                *req;
1549         struct lfsck_request                 *tmp;
1550         int                                   rc;
1551         ENTRY;
1552
1553         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
1554         if (req == NULL)
1555                 RETURN(-ENOMEM);
1556
1557         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
1558         if (rc != 0) {
1559                 ptlrpc_request_free(req);
1560                 RETURN(rc);
1561         }
1562
1563         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1564         *tmp = *lr;
1565         ptlrpc_request_set_replen(req);
1566
1567         llsaa = ptlrpc_req_async_args(req);
1568         llsaa->llsaa_exp = exp;
1569         llsaa->llsaa_com = lfsck_component_get(com);
1570         llsaa->llsaa_llst = llst;
1571         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
1572         ptlrpc_set_add_req(set, req);
1573
1574         RETURN(0);
1575 }
1576
1577 static int lfsck_layout_async_notify(const struct lu_env *env,
1578                                      struct obd_export *exp,
1579                                      struct lfsck_request *lr,
1580                                      struct ptlrpc_request_set *set)
1581 {
1582         struct ptlrpc_request   *req;
1583         struct lfsck_request    *tmp;
1584         int                      rc;
1585         ENTRY;
1586
1587         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
1588         if (req == NULL)
1589                 RETURN(-ENOMEM);
1590
1591         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1592         if (rc != 0) {
1593                 ptlrpc_request_free(req);
1594                 RETURN(rc);
1595         }
1596
1597         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1598         *tmp = *lr;
1599         ptlrpc_request_set_replen(req);
1600         ptlrpc_set_add_req(set, req);
1601
1602         RETURN(0);
1603 }
1604
1605 static int
1606 lfsck_layout_slave_query_master(const struct lu_env *env,
1607                                 struct lfsck_component *com)
1608 {
1609         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
1610         struct lfsck_instance            *lfsck = com->lc_lfsck;
1611         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
1612         struct lfsck_layout_slave_target *llst;
1613         struct obd_export                *exp;
1614         struct ptlrpc_request_set        *set;
1615         int                               cnt   = 0;
1616         int                               rc    = 0;
1617         int                               rc1   = 0;
1618         ENTRY;
1619
1620         set = ptlrpc_prep_set();
1621         if (set == NULL)
1622                 RETURN(-ENOMEM);
1623
1624         memset(lr, 0, sizeof(*lr));
1625         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1626         lr->lr_event = LE_QUERY;
1627         lr->lr_active = LT_LAYOUT;
1628
1629         llsd->llsd_touch_gen++;
1630         spin_lock(&llsd->llsd_lock);
1631         while (!list_empty(&llsd->llsd_master_list)) {
1632                 llst = list_entry(llsd->llsd_master_list.next,
1633                                   struct lfsck_layout_slave_target,
1634                                   llst_list);
1635                 if (llst->llst_gen == llsd->llsd_touch_gen)
1636                         break;
1637
1638                 llst->llst_gen = llsd->llsd_touch_gen;
1639                 list_del(&llst->llst_list);
1640                 list_add_tail(&llst->llst_list,
1641                               &llsd->llsd_master_list);
1642                 atomic_inc(&llst->llst_ref);
1643                 spin_unlock(&llsd->llsd_lock);
1644
1645                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
1646                                                llst->llst_index);
1647                 if (exp == NULL) {
1648                         lfsck_layout_llst_del(llsd, llst);
1649                         lfsck_layout_llst_put(llst);
1650                         spin_lock(&llsd->llsd_lock);
1651                         continue;
1652                 }
1653
1654                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
1655                 if (rc != 0) {
1656                         CERROR("%s: slave fail to query %s for layout: "
1657                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1658                                exp->exp_obd->obd_name, rc);
1659                         rc1 = rc;
1660                         lfsck_layout_llst_put(llst);
1661                         class_export_put(exp);
1662                 } else {
1663                         cnt++;
1664                 }
1665                 spin_lock(&llsd->llsd_lock);
1666         }
1667         spin_unlock(&llsd->llsd_lock);
1668
1669         if (cnt > 0)
1670                 rc = ptlrpc_set_wait(set);
1671         ptlrpc_set_destroy(set);
1672
1673         RETURN(rc1 != 0 ? rc1 : rc);
1674 }
1675
1676 static void
1677 lfsck_layout_slave_notify_master(const struct lu_env *env,
1678                                  struct lfsck_component *com,
1679                                  enum lfsck_events event, int result)
1680 {
1681         struct lfsck_instance            *lfsck = com->lc_lfsck;
1682         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
1683         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
1684         struct lfsck_layout_slave_target *llst;
1685         struct obd_export                *exp;
1686         struct ptlrpc_request_set        *set;
1687         int                               cnt   = 0;
1688         int                               rc;
1689         ENTRY;
1690
1691         set = ptlrpc_prep_set();
1692         if (set == NULL)
1693                 RETURN_EXIT;
1694
1695         memset(lr, 0, sizeof(*lr));
1696         lr->lr_event = event;
1697         lr->lr_flags = LEF_FROM_OST;
1698         lr->lr_status = result;
1699         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1700         lr->lr_active = LT_LAYOUT;
1701         llsd->llsd_touch_gen++;
1702         spin_lock(&llsd->llsd_lock);
1703         while (!list_empty(&llsd->llsd_master_list)) {
1704                 llst = list_entry(llsd->llsd_master_list.next,
1705                                   struct lfsck_layout_slave_target,
1706                                   llst_list);
1707                 if (llst->llst_gen == llsd->llsd_touch_gen)
1708                         break;
1709
1710                 llst->llst_gen = llsd->llsd_touch_gen;
1711                 list_del(&llst->llst_list);
1712                 list_add_tail(&llst->llst_list,
1713                               &llsd->llsd_master_list);
1714                 atomic_inc(&llst->llst_ref);
1715                 spin_unlock(&llsd->llsd_lock);
1716
1717                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
1718                                                llst->llst_index);
1719                 if (exp == NULL) {
1720                         lfsck_layout_llst_del(llsd, llst);
1721                         lfsck_layout_llst_put(llst);
1722                         spin_lock(&llsd->llsd_lock);
1723                         continue;
1724                 }
1725
1726                 rc = lfsck_layout_async_notify(env, exp, lr, set);
1727                 if (rc != 0)
1728                         CERROR("%s: slave fail to notify %s for layout: "
1729                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1730                                exp->exp_obd->obd_name, rc);
1731                 else
1732                         cnt++;
1733                 lfsck_layout_llst_put(llst);
1734                 class_export_put(exp);
1735                 spin_lock(&llsd->llsd_lock);
1736         }
1737         spin_unlock(&llsd->llsd_lock);
1738
1739         if (cnt > 0)
1740                 rc = ptlrpc_set_wait(set);
1741
1742         ptlrpc_set_destroy(set);
1743
1744         RETURN_EXIT;
1745 }
1746
1747 /* layout APIs */
1748
1749 static int lfsck_layout_reset(const struct lu_env *env,
1750                               struct lfsck_component *com, bool init)
1751 {
1752         struct lfsck_layout     *lo    = com->lc_file_ram;
1753         int                      rc;
1754
1755         down_write(&com->lc_sem);
1756         if (init) {
1757                 memset(lo, 0, com->lc_file_size);
1758         } else {
1759                 __u32 count = lo->ll_success_count;
1760                 __u64 last_time = lo->ll_time_last_complete;
1761
1762                 memset(lo, 0, com->lc_file_size);
1763                 lo->ll_success_count = count;
1764                 lo->ll_time_last_complete = last_time;
1765         }
1766
1767         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
1768         lo->ll_status = LS_INIT;
1769
1770         rc = lfsck_layout_store(env, com);
1771         up_write(&com->lc_sem);
1772
1773         return rc;
1774 }
1775
1776 static void lfsck_layout_fail(const struct lu_env *env,
1777                               struct lfsck_component *com, bool new_checked)
1778 {
1779         struct lfsck_layout *lo = com->lc_file_ram;
1780
1781         down_write(&com->lc_sem);
1782         if (new_checked)
1783                 com->lc_new_checked++;
1784         lo->ll_objs_failed_phase1++;
1785         if (lo->ll_pos_first_inconsistent == 0) {
1786                 struct lfsck_instance *lfsck = com->lc_lfsck;
1787
1788                 lo->ll_pos_first_inconsistent =
1789                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1790                                                         lfsck->li_di_oit);
1791         }
1792         up_write(&com->lc_sem);
1793 }
1794
1795 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
1796                                           struct lfsck_component *com, bool init)
1797 {
1798         struct lfsck_instance           *lfsck   = com->lc_lfsck;
1799         struct lfsck_layout             *lo      = com->lc_file_ram;
1800         struct lfsck_layout_master_data *llmd    = com->lc_data;
1801         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1802         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1803         struct l_wait_info               lwi     = { 0 };
1804         int                              rc;
1805
1806         if (com->lc_new_checked == 0 && !init)
1807                 return 0;
1808
1809         l_wait_event(mthread->t_ctl_waitq,
1810                      (list_empty(&llmd->llmd_req_list) &&
1811                       atomic_read(&llmd->llmd_rpcs_in_flight) == 0) ||
1812                      !thread_is_running(mthread) ||
1813                      thread_is_stopped(athread),
1814                      &lwi);
1815
1816         if (!thread_is_running(mthread) || thread_is_stopped(athread))
1817                 return 0;
1818
1819         down_write(&com->lc_sem);
1820         if (init) {
1821                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
1822         } else {
1823                 lo->ll_pos_last_checkpoint =
1824                                         lfsck->li_pos_current.lp_oit_cookie;
1825                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1826                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1827                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1828                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
1829                 com->lc_new_checked = 0;
1830         }
1831
1832         rc = lfsck_layout_store(env, com);
1833         up_write(&com->lc_sem);
1834
1835         return rc;
1836 }
1837
1838 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
1839                                          struct lfsck_component *com, bool init)
1840 {
1841         struct lfsck_instance   *lfsck = com->lc_lfsck;
1842         struct lfsck_layout     *lo    = com->lc_file_ram;
1843         int                      rc;
1844
1845         if (com->lc_new_checked == 0 && !init)
1846                 return 0;
1847
1848         down_write(&com->lc_sem);
1849
1850         if (init) {
1851                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
1852         } else {
1853                 lo->ll_pos_last_checkpoint =
1854                                         lfsck->li_pos_current.lp_oit_cookie;
1855                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1856                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1857                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1858                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
1859                 com->lc_new_checked = 0;
1860         }
1861
1862         rc = lfsck_layout_store(env, com);
1863
1864         up_write(&com->lc_sem);
1865
1866         return rc;
1867 }
1868
1869 static int lfsck_layout_prep(const struct lu_env *env,
1870                              struct lfsck_component *com)
1871 {
1872         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1873         struct lfsck_layout     *lo     = com->lc_file_ram;
1874         struct lfsck_position   *pos    = &com->lc_pos_start;
1875
1876         fid_zero(&pos->lp_dir_parent);
1877         pos->lp_dir_cookie = 0;
1878         if (lo->ll_status == LS_COMPLETED ||
1879             lo->ll_status == LS_PARTIAL) {
1880                 int rc;
1881
1882                 rc = lfsck_layout_reset(env, com, false);
1883                 if (rc != 0)
1884                         return rc;
1885         }
1886
1887         down_write(&com->lc_sem);
1888
1889         lo->ll_time_latest_start = cfs_time_current_sec();
1890
1891         spin_lock(&lfsck->li_lock);
1892         if (lo->ll_flags & LF_SCANNED_ONCE) {
1893                 if (!lfsck->li_drop_dryrun ||
1894                     lo->ll_pos_first_inconsistent == 0) {
1895                         lo->ll_status = LS_SCANNING_PHASE2;
1896                         list_del_init(&com->lc_link);
1897                         list_add_tail(&com->lc_link,
1898                                       &lfsck->li_list_double_scan);
1899                         pos->lp_oit_cookie = 0;
1900                 } else {
1901                         int i;
1902
1903                         lo->ll_status = LS_SCANNING_PHASE1;
1904                         lo->ll_run_time_phase1 = 0;
1905                         lo->ll_run_time_phase2 = 0;
1906                         lo->ll_objs_checked_phase1 = 0;
1907                         lo->ll_objs_checked_phase2 = 0;
1908                         lo->ll_objs_failed_phase1 = 0;
1909                         lo->ll_objs_failed_phase2 = 0;
1910                         for (i = 0; i < LLIT_MAX; i++)
1911                                 lo->ll_objs_repaired[i] = 0;
1912
1913                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
1914                 }
1915         } else {
1916                 lo->ll_status = LS_SCANNING_PHASE1;
1917                 if (!lfsck->li_drop_dryrun ||
1918                     lo->ll_pos_first_inconsistent == 0)
1919                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
1920                 else
1921                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
1922         }
1923         spin_unlock(&lfsck->li_lock);
1924
1925         up_write(&com->lc_sem);
1926
1927         return 0;
1928 }
1929
1930 static int lfsck_layout_slave_prep(const struct lu_env *env,
1931                                    struct lfsck_component *com,
1932                                    struct lfsck_start_param *lsp)
1933 {
1934         struct lfsck_layout             *lo     = com->lc_file_ram;
1935         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1936         int                              rc;
1937
1938         /* XXX: For a new scanning, generate OST-objects
1939          *      bitmap for orphan detection. */
1940
1941         rc = lfsck_layout_prep(env, com);
1942         if (rc != 0 || lo->ll_status != LS_SCANNING_PHASE1 ||
1943             !lsp->lsp_index_valid)
1944                 return rc;
1945
1946         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
1947
1948         return rc;
1949 }
1950
1951 static int lfsck_layout_master_prep(const struct lu_env *env,
1952                                     struct lfsck_component *com,
1953                                     struct lfsck_start_param *lsp)
1954 {
1955         struct lfsck_instance           *lfsck   = com->lc_lfsck;
1956         struct lfsck_layout_master_data *llmd    = com->lc_data;
1957         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1958         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1959         struct lfsck_thread_args        *lta;
1960         long                             rc;
1961         ENTRY;
1962
1963         rc = lfsck_layout_prep(env, com);
1964         if (rc != 0)
1965                 RETURN(rc);
1966
1967         llmd->llmd_assistant_status = 0;
1968         llmd->llmd_post_result = 0;
1969         llmd->llmd_to_post = 0;
1970         llmd->llmd_to_double_scan = 0;
1971         llmd->llmd_in_double_scan = 0;
1972         llmd->llmd_exit = 0;
1973         thread_set_flags(athread, 0);
1974
1975         lta = lfsck_thread_args_init(lfsck, com, lsp);
1976         if (IS_ERR(lta))
1977                 RETURN(PTR_ERR(lta));
1978
1979         rc = PTR_ERR(kthread_run(lfsck_layout_assistant, lta, "lfsck_layout"));
1980         if (IS_ERR_VALUE(rc)) {
1981                 CERROR("%s: Cannot start LFSCK layout assistant thread: "
1982                        "rc = %ld\n", lfsck_lfsck2name(lfsck), rc);
1983                 lfsck_thread_args_fini(lta);
1984         } else {
1985                 struct l_wait_info lwi = { 0 };
1986
1987                 l_wait_event(mthread->t_ctl_waitq,
1988                              thread_is_running(athread) ||
1989                              thread_is_stopped(athread),
1990                              &lwi);
1991                 if (unlikely(!thread_is_running(athread)))
1992                         rc = llmd->llmd_assistant_status;
1993                 else
1994                         rc = 0;
1995         }
1996
1997         RETURN(rc);
1998 }
1999
2000 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
2001                                         struct lfsck_component *com,
2002                                         struct dt_object *obj)
2003 {
2004         /* XXX: To be implemented in other patches.
2005          *
2006          * For the given object, read its layout EA locally. For each stripe,
2007          * pre-fetch the OST-object's attribute and generate an structure
2008          * lfsck_layout_req on the list ::llmd_req_list.
2009          *
2010          * For each request on the ::llmd_req_list, the lfsck_layout_assistant
2011          * thread will compare the OST side attribute with local attribute,
2012          * if inconsistent, then repair it.
2013          *
2014          * All above processing is async mode with pipeline. */
2015
2016         return 0;
2017 }
2018
2019 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
2020                                        struct lfsck_component *com,
2021                                        struct dt_object *obj)
2022 {
2023         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2024         struct lfsck_layout             *lo     = com->lc_file_ram;
2025         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
2026         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
2027         struct lfsck_layout_seq         *lls;
2028         __u64                            seq;
2029         __u64                            oid;
2030         int                              rc;
2031         ENTRY;
2032
2033         /* XXX: Update OST-objects bitmap for orphan detection. */
2034
2035         LASSERT(llsd != NULL);
2036
2037         down_write(&com->lc_sem);
2038         if (fid_is_idif(fid))
2039                 seq = 0;
2040         else if (!fid_is_norm(fid) ||
2041                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
2042                 GOTO(unlock, rc = 0);
2043         else
2044                 seq = fid_seq(fid);
2045         com->lc_new_checked++;
2046
2047         lls = lfsck_layout_seq_lookup(llsd, seq);
2048         if (lls == NULL) {
2049                 OBD_ALLOC_PTR(lls);
2050                 if (unlikely(lls == NULL))
2051                         GOTO(unlock, rc = -ENOMEM);
2052
2053                 INIT_LIST_HEAD(&lls->lls_list);
2054                 lls->lls_seq = seq;
2055                 rc = lfsck_layout_lastid_load(env, com, lls);
2056                 if (rc != 0) {
2057                         lo->ll_objs_failed_phase1++;
2058                         OBD_FREE_PTR(lls);
2059                         GOTO(unlock, rc);
2060                 }
2061
2062                 lfsck_layout_seq_insert(llsd, lls);
2063         }
2064
2065         if (unlikely(fid_is_last_id(fid)))
2066                 GOTO(unlock, rc = 0);
2067
2068         oid = fid_oid(fid);
2069         if (oid > lls->lls_lastid_known)
2070                 lls->lls_lastid_known = oid;
2071
2072         if (oid > lls->lls_lastid) {
2073                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
2074                         /* OFD may create new objects during LFSCK scanning. */
2075                         rc = lfsck_layout_lastid_reload(env, com, lls);
2076                         if (unlikely(rc != 0))
2077                                 CWARN("%s: failed to reload LAST_ID for "LPX64
2078                                       ": rc = %d\n",
2079                                       lfsck_lfsck2name(com->lc_lfsck),
2080                                       lls->lls_seq, rc);
2081                         if (oid <= lls->lls_lastid)
2082                                 GOTO(unlock, rc = 0);
2083
2084                         LASSERT(lfsck->li_out_notify != NULL);
2085
2086                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
2087                                              LE_LASTID_REBUILDING);
2088                         lo->ll_flags |= LF_CRASHED_LASTID;
2089                 }
2090
2091                 lls->lls_lastid = oid;
2092                 lls->lls_dirty = 1;
2093         }
2094
2095         GOTO(unlock, rc = 0);
2096
2097 unlock:
2098         up_write(&com->lc_sem);
2099
2100         return rc;
2101 }
2102
2103 static int lfsck_layout_exec_dir(const struct lu_env *env,
2104                                  struct lfsck_component *com,
2105                                  struct dt_object *obj,
2106                                  struct lu_dirent *ent)
2107 {
2108         return 0;
2109 }
2110
2111 static int lfsck_layout_master_post(const struct lu_env *env,
2112                                     struct lfsck_component *com,
2113                                     int result, bool init)
2114 {
2115         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2116         struct lfsck_layout             *lo      = com->lc_file_ram;
2117         struct lfsck_layout_master_data *llmd    = com->lc_data;
2118         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2119         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2120         struct l_wait_info               lwi     = { 0 };
2121         int                              rc;
2122         ENTRY;
2123
2124
2125         llmd->llmd_post_result = result;
2126         llmd->llmd_to_post = 1;
2127         if (llmd->llmd_post_result <= 0)
2128                 llmd->llmd_exit = 1;
2129
2130         wake_up_all(&athread->t_ctl_waitq);
2131         l_wait_event(mthread->t_ctl_waitq,
2132                      (result > 0 && list_empty(&llmd->llmd_req_list) &&
2133                       atomic_read(&llmd->llmd_rpcs_in_flight) == 0) ||
2134                      thread_is_stopped(athread),
2135                      &lwi);
2136
2137         if (llmd->llmd_assistant_status < 0)
2138                 result = llmd->llmd_assistant_status;
2139
2140         down_write(&com->lc_sem);
2141         spin_lock(&lfsck->li_lock);
2142         /* When LFSCK failed, there may be some prefetched objects those are
2143          * not been processed yet, we do not know the exactly position, then
2144          * just restart from last check-point next time. */
2145         if (!init && !llmd->llmd_exit)
2146                 lo->ll_pos_last_checkpoint =
2147                                         lfsck->li_pos_current.lp_oit_cookie;
2148
2149         if (result > 0) {
2150                 lo->ll_status = LS_SCANNING_PHASE2;
2151                 lo->ll_flags |= LF_SCANNED_ONCE;
2152                 lo->ll_flags &= ~LF_UPGRADE;
2153                 list_del_init(&com->lc_link);
2154                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
2155         } else if (result == 0) {
2156                 lo->ll_status = lfsck->li_status;
2157                 if (lo->ll_status == 0)
2158                         lo->ll_status = LS_STOPPED;
2159                 if (lo->ll_status != LS_PAUSED) {
2160                         list_del_init(&com->lc_link);
2161                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2162                 }
2163         } else {
2164                 lo->ll_status = LS_FAILED;
2165                 list_del_init(&com->lc_link);
2166                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2167         }
2168         spin_unlock(&lfsck->li_lock);
2169
2170         if (!init) {
2171                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2172                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2173                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
2174                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
2175                 com->lc_new_checked = 0;
2176         }
2177
2178         rc = lfsck_layout_store(env, com);
2179         up_write(&com->lc_sem);
2180
2181         RETURN(rc);
2182 }
2183
2184 static int lfsck_layout_slave_post(const struct lu_env *env,
2185                                    struct lfsck_component *com,
2186                                    int result, bool init)
2187 {
2188         struct lfsck_instance   *lfsck = com->lc_lfsck;
2189         struct lfsck_layout     *lo    = com->lc_file_ram;
2190         int                      rc;
2191         bool                     done  = false;
2192
2193         rc = lfsck_layout_lastid_store(env, com);
2194         if (rc != 0)
2195                 result = rc;
2196
2197         LASSERT(lfsck->li_out_notify != NULL);
2198
2199         down_write(&com->lc_sem);
2200
2201         spin_lock(&lfsck->li_lock);
2202         if (!init)
2203                 lo->ll_pos_last_checkpoint =
2204                                         lfsck->li_pos_current.lp_oit_cookie;
2205         if (result > 0) {
2206                 lo->ll_status = LS_SCANNING_PHASE2;
2207                 lo->ll_flags |= LF_SCANNED_ONCE;
2208                 if (lo->ll_flags & LF_CRASHED_LASTID) {
2209                         done = true;
2210                         lo->ll_flags &= ~LF_CRASHED_LASTID;
2211                 }
2212                 lo->ll_flags &= ~LF_UPGRADE;
2213                 list_del_init(&com->lc_link);
2214                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
2215         } else if (result == 0) {
2216                 lo->ll_status = lfsck->li_status;
2217                 if (lo->ll_status == 0)
2218                         lo->ll_status = LS_STOPPED;
2219                 if (lo->ll_status != LS_PAUSED) {
2220                         list_del_init(&com->lc_link);
2221                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2222                 }
2223         } else {
2224                 lo->ll_status = LS_FAILED;
2225                 list_del_init(&com->lc_link);
2226                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2227         }
2228         spin_unlock(&lfsck->li_lock);
2229
2230         if (done)
2231                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
2232                                      LE_LASTID_REBUILT);
2233
2234         if (!init) {
2235                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2236                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2237                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
2238                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
2239                 com->lc_new_checked = 0;
2240         }
2241
2242         rc = lfsck_layout_store(env, com);
2243
2244         up_write(&com->lc_sem);
2245
2246         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
2247
2248         return rc;
2249 }
2250
2251 static int lfsck_layout_dump(const struct lu_env *env,
2252                              struct lfsck_component *com, char *buf, int len)
2253 {
2254         struct lfsck_instance   *lfsck = com->lc_lfsck;
2255         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
2256         struct lfsck_layout     *lo    = com->lc_file_ram;
2257         int                      save  = len;
2258         int                      ret   = -ENOSPC;
2259         int                      rc;
2260
2261         down_read(&com->lc_sem);
2262         rc = snprintf(buf, len,
2263                       "name: lfsck_layout\n"
2264                       "magic: %#x\n"
2265                       "version: %d\n"
2266                       "status: %s\n",
2267                       lo->ll_magic,
2268                       bk->lb_version,
2269                       lfsck_status2names(lo->ll_status));
2270         if (rc <= 0)
2271                 goto out;
2272
2273         buf += rc;
2274         len -= rc;
2275         rc = lfsck_bits_dump(&buf, &len, lo->ll_flags, lfsck_flags_names,
2276                              "flags");
2277         if (rc < 0)
2278                 goto out;
2279
2280         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
2281                              "param");
2282         if (rc < 0)
2283                 goto out;
2284
2285         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_complete,
2286                              "time_since_last_completed");
2287         if (rc < 0)
2288                 goto out;
2289
2290         rc = lfsck_time_dump(&buf, &len, lo->ll_time_latest_start,
2291                              "time_since_latest_start");
2292         if (rc < 0)
2293                 goto out;
2294
2295         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_checkpoint,
2296                              "time_since_last_checkpoint");
2297         if (rc < 0)
2298                 goto out;
2299
2300         rc = snprintf(buf, len,
2301                       "latest_start_position: "LPU64"\n"
2302                       "last_checkpoint_position: "LPU64"\n"
2303                       "first_failure_position: "LPU64"\n",
2304                       lo->ll_pos_latest_start,
2305                       lo->ll_pos_last_checkpoint,
2306                       lo->ll_pos_first_inconsistent);
2307         if (rc <= 0)
2308                 goto out;
2309
2310         buf += rc;
2311         len -= rc;
2312
2313         rc = snprintf(buf, len,
2314                       "success_count: %u\n"
2315                       "repaired_dangling: "LPU64"\n"
2316                       "repaired_unmatched_pair: "LPU64"\n"
2317                       "repaired_multiple_referenced: "LPU64"\n"
2318                       "repaired_orphan: "LPU64"\n"
2319                       "repaired_inconsistent_owner: "LPU64"\n"
2320                       "repaired_others: "LPU64"\n"
2321                       "skipped: "LPU64"\n"
2322                       "failed_phase1: "LPU64"\n"
2323                       "failed_phase2: "LPU64"\n",
2324                       lo->ll_success_count,
2325                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
2326                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
2327                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
2328                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
2329                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
2330                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
2331                       lo->ll_objs_skipped,
2332                       lo->ll_objs_failed_phase1,
2333                       lo->ll_objs_failed_phase2);
2334         if (rc <= 0)
2335                 goto out;
2336
2337         buf += rc;
2338         len -= rc;
2339
2340         if (lo->ll_status == LS_SCANNING_PHASE1) {
2341                 __u64 pos;
2342                 const struct dt_it_ops *iops;
2343                 cfs_duration_t duration = cfs_time_current() -
2344                                           lfsck->li_time_last_checkpoint;
2345                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
2346                 __u64 speed = checked;
2347                 __u64 new_checked = com->lc_new_checked * HZ;
2348                 __u32 rtime = lo->ll_run_time_phase1 +
2349                               cfs_duration_sec(duration + HALF_SEC);
2350
2351                 if (duration != 0)
2352                         do_div(new_checked, duration);
2353                 if (rtime != 0)
2354                         do_div(speed, rtime);
2355                 rc = snprintf(buf, len,
2356                               "checked_phase1: "LPU64"\n"
2357                               "checked_phase2: "LPU64"\n"
2358                               "run_time_phase1: %u seconds\n"
2359                               "run_time_phase2: %u seconds\n"
2360                               "average_speed_phase1: "LPU64" items/sec\n"
2361                               "average_speed_phase2: N/A\n"
2362                               "real-time_speed_phase1: "LPU64" items/sec\n"
2363                               "real-time_speed_phase2: N/A\n",
2364                               checked,
2365                               lo->ll_objs_checked_phase2,
2366                               rtime,
2367                               lo->ll_run_time_phase2,
2368                               speed,
2369                               new_checked);
2370                 if (rc <= 0)
2371                         goto out;
2372
2373                 buf += rc;
2374                 len -= rc;
2375
2376                 LASSERT(lfsck->li_di_oit != NULL);
2377
2378                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
2379
2380                 /* The low layer otable-based iteration position may NOT
2381                  * exactly match the layout-based directory traversal
2382                  * cookie. Generally, it is not a serious issue. But the
2383                  * caller should NOT make assumption on that. */
2384                 pos = iops->store(env, lfsck->li_di_oit);
2385                 if (!lfsck->li_current_oit_processed)
2386                         pos--;
2387                 rc = snprintf(buf, len, "current_position: "LPU64"\n", pos);
2388                 if (rc <= 0)
2389                         goto out;
2390
2391                 buf += rc;
2392                 len -= rc;
2393         } else {
2394                 /* XXX: LS_SCANNING_PHASE2 will be handled in the future. */
2395                 __u64 speed1 = lo->ll_objs_checked_phase1;
2396                 __u64 speed2 = lo->ll_objs_checked_phase2;
2397
2398                 if (lo->ll_run_time_phase1 != 0)
2399                         do_div(speed1, lo->ll_run_time_phase1);
2400                 if (lo->ll_run_time_phase2 != 0)
2401                         do_div(speed2, lo->ll_run_time_phase2);
2402                 rc = snprintf(buf, len,
2403                               "checked_phase1: "LPU64"\n"
2404                               "checked_phase2: "LPU64"\n"
2405                               "run_time_phase1: %u seconds\n"
2406                               "run_time_phase2: %u seconds\n"
2407                               "average_speed_phase1: "LPU64" items/sec\n"
2408                               "average_speed_phase2: "LPU64" objs/sec\n"
2409                               "real-time_speed_phase1: N/A\n"
2410                               "real-time_speed_phase2: N/A\n"
2411                               "current_position: N/A\n",
2412                               lo->ll_objs_checked_phase1,
2413                               lo->ll_objs_checked_phase2,
2414                               lo->ll_run_time_phase1,
2415                               lo->ll_run_time_phase2,
2416                               speed1,
2417                               speed2);
2418                 if (rc <= 0)
2419                         goto out;
2420
2421                 buf += rc;
2422                 len -= rc;
2423         }
2424         ret = save - len;
2425
2426 out:
2427         up_read(&com->lc_sem);
2428
2429         return ret;
2430 }
2431
2432 static int lfsck_layout_master_double_scan(const struct lu_env *env,
2433                                            struct lfsck_component *com)
2434 {
2435         struct lfsck_layout_master_data *llmd    = com->lc_data;
2436         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2437         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2438         struct lfsck_layout             *lo      = com->lc_file_ram;
2439         struct l_wait_info               lwi     = { 0 };
2440
2441         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
2442                 return 0;
2443
2444         llmd->llmd_to_double_scan = 1;
2445         wake_up_all(&athread->t_ctl_waitq);
2446         l_wait_event(mthread->t_ctl_waitq,
2447                      llmd->llmd_in_double_scan ||
2448                      thread_is_stopped(athread),
2449                      &lwi);
2450         if (llmd->llmd_assistant_status < 0)
2451                 return llmd->llmd_assistant_status;
2452
2453         return 0;
2454 }
2455
2456 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
2457                                           struct lfsck_component *com)
2458 {
2459         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2460         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
2461         struct lfsck_layout             *lo     = com->lc_file_ram;
2462         struct ptlrpc_thread            *thread = &lfsck->li_thread;
2463         int                              rc;
2464         ENTRY;
2465
2466         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
2467                 RETURN(0);
2468
2469         atomic_inc(&lfsck->li_double_scan_count);
2470
2471         com->lc_new_checked = 0;
2472         com->lc_new_scanned = 0;
2473         com->lc_time_last_checkpoint = cfs_time_current();
2474         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
2475                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
2476
2477         while (1) {
2478                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
2479                                                      NULL, NULL);
2480
2481                 rc = lfsck_layout_slave_query_master(env, com);
2482                 if (list_empty(&llsd->llsd_master_list)) {
2483                         if (unlikely(!thread_is_running(thread)))
2484                                 rc = 0;
2485                         else
2486                                 rc = 1;
2487
2488                         GOTO(done, rc);
2489                 }
2490
2491                 if (rc < 0)
2492                         GOTO(done, rc);
2493
2494                 rc = l_wait_event(thread->t_ctl_waitq,
2495                                   !thread_is_running(thread) ||
2496                                   list_empty(&llsd->llsd_master_list),
2497                                   &lwi);
2498                 if (unlikely(!thread_is_running(thread)))
2499                         GOTO(done, rc = 0);
2500
2501                 if (rc == -ETIMEDOUT)
2502                         continue;
2503
2504                 GOTO(done, rc = (rc < 0 ? rc : 1));
2505         }
2506
2507 done:
2508         rc = lfsck_layout_double_scan_result(env, com, rc);
2509
2510         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
2511                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
2512
2513         return rc;
2514 }
2515
2516 static void lfsck_layout_master_data_release(const struct lu_env *env,
2517                                              struct lfsck_component *com)
2518 {
2519         struct lfsck_layout_master_data *llmd   = com->lc_data;
2520         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2521         struct lfsck_tgt_descs          *ltds;
2522         struct lfsck_tgt_desc           *ltd;
2523         struct lfsck_tgt_desc           *next;
2524
2525         LASSERT(llmd != NULL);
2526         LASSERT(thread_is_init(&llmd->llmd_thread) ||
2527                 thread_is_stopped(&llmd->llmd_thread));
2528         LASSERT(list_empty(&llmd->llmd_req_list));
2529         LASSERT(atomic_read(&llmd->llmd_rpcs_in_flight) == 0);
2530
2531         com->lc_data = NULL;
2532
2533         ltds = &lfsck->li_ost_descs;
2534         spin_lock(&ltds->ltd_lock);
2535         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
2536                                  ltd_layout_phase_list) {
2537                 list_del_init(&ltd->ltd_layout_phase_list);
2538         }
2539         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
2540                                  ltd_layout_phase_list) {
2541                 list_del_init(&ltd->ltd_layout_phase_list);
2542         }
2543         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
2544                                  ltd_layout_list) {
2545                 list_del_init(&ltd->ltd_layout_list);
2546         }
2547         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
2548                                  ltd_layout_phase_list) {
2549                 list_del_init(&ltd->ltd_layout_phase_list);
2550         }
2551         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
2552                                  ltd_layout_phase_list) {
2553                 list_del_init(&ltd->ltd_layout_phase_list);
2554         }
2555         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
2556                                  ltd_layout_list) {
2557                 list_del_init(&ltd->ltd_layout_list);
2558         }
2559         spin_unlock(&ltds->ltd_lock);
2560
2561         OBD_FREE_PTR(llmd);
2562 }
2563
2564 static void lfsck_layout_slave_data_release(const struct lu_env *env,
2565                                             struct lfsck_component *com)
2566 {
2567         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
2568         struct lfsck_layout_seq          *lls;
2569         struct lfsck_layout_seq          *next;
2570         struct lfsck_layout_slave_target *llst;
2571         struct lfsck_layout_slave_target *tmp;
2572
2573         LASSERT(llsd != NULL);
2574
2575         com->lc_data = NULL;
2576
2577         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
2578                                      lls_list) {
2579                 list_del_init(&lls->lls_list);
2580                 lfsck_object_put(env, lls->lls_lastid_obj);
2581                 OBD_FREE_PTR(lls);
2582         }
2583
2584         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
2585                                  llst_list) {
2586                 list_del_init(&llst->llst_list);
2587                 OBD_FREE_PTR(llst);
2588         }
2589
2590         OBD_FREE_PTR(llsd);
2591 }
2592
2593 static void lfsck_layout_master_quit(const struct lu_env *env,
2594                                      struct lfsck_component *com)
2595 {
2596         struct lfsck_layout_master_data *llmd    = com->lc_data;
2597         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2598         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2599         struct l_wait_info               lwi     = { 0 };
2600
2601         llmd->llmd_exit = 1;
2602         wake_up_all(&athread->t_ctl_waitq);
2603         l_wait_event(mthread->t_ctl_waitq,
2604                      thread_is_init(athread) ||
2605                      thread_is_stopped(athread),
2606                      &lwi);
2607 }
2608
2609 static int lfsck_layout_master_in_notify(const struct lu_env *env,
2610                                          struct lfsck_component *com,
2611                                          struct lfsck_request *lr)
2612 {
2613         struct lfsck_instance           *lfsck = com->lc_lfsck;
2614         struct lfsck_layout             *lo    = com->lc_file_ram;
2615         struct lfsck_layout_master_data *llmd  = com->lc_data;
2616         struct lfsck_tgt_descs          *ltds;
2617         struct lfsck_tgt_desc           *ltd;
2618         ENTRY;
2619
2620         if (lr->lr_event != LE_PHASE1_DONE &&
2621             lr->lr_event != LE_PHASE2_DONE &&
2622             lr->lr_event != LE_STOP)
2623                 RETURN(-EINVAL);
2624
2625         if (lr->lr_flags & LEF_FROM_OST)
2626                 ltds = &lfsck->li_ost_descs;
2627         else
2628                 ltds = &lfsck->li_mdt_descs;
2629         spin_lock(&ltds->ltd_lock);
2630         ltd = LTD_TGT(ltds, lr->lr_index);
2631         if (ltd == NULL) {
2632                 spin_unlock(&ltds->ltd_lock);
2633
2634                 RETURN(-ENODEV);
2635         }
2636
2637         list_del_init(&ltd->ltd_layout_phase_list);
2638         switch (lr->lr_event) {
2639         case LE_PHASE1_DONE:
2640                 if (lr->lr_status <= 0) {
2641                         ltd->ltd_layout_done = 1;
2642                         list_del_init(&ltd->ltd_layout_list);
2643                         lo->ll_flags |= LF_INCOMPLETE;
2644                         break;
2645                 }
2646
2647                 if (lr->lr_flags & LEF_FROM_OST) {
2648                         if (list_empty(&ltd->ltd_layout_list))
2649                                 list_add_tail(&ltd->ltd_layout_list,
2650                                               &llmd->llmd_ost_list);
2651                         list_add_tail(&ltd->ltd_layout_phase_list,
2652                                       &llmd->llmd_ost_phase2_list);
2653                 } else {
2654                         if (list_empty(&ltd->ltd_layout_list))
2655                                 list_add_tail(&ltd->ltd_layout_list,
2656                                               &llmd->llmd_mdt_list);
2657                         list_add_tail(&ltd->ltd_layout_phase_list,
2658                                       &llmd->llmd_mdt_phase2_list);
2659                 }
2660                 break;
2661         case LE_PHASE2_DONE:
2662                 ltd->ltd_layout_done = 1;
2663                 list_del_init(&ltd->ltd_layout_list);
2664                 break;
2665         case LE_STOP:
2666                 ltd->ltd_layout_done = 1;
2667                 list_del_init(&ltd->ltd_layout_list);
2668                 if (!(lr->lr_flags & LEF_FORCE_STOP))
2669                         lo->ll_flags |= LF_INCOMPLETE;
2670                 break;
2671         default:
2672                 break;
2673         }
2674         spin_unlock(&ltds->ltd_lock);
2675
2676         if (lr->lr_flags & LEF_FORCE_STOP) {
2677                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2678
2679                 memset(stop, 0, sizeof(*stop));
2680                 stop->ls_status = lr->lr_status;
2681                 stop->ls_flags = lr->lr_param;
2682                 lfsck_stop(env, lfsck->li_bottom, stop);
2683         } else if (lfsck_layout_master_to_orphan(llmd)) {
2684                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
2685         }
2686
2687         RETURN(0);
2688 }
2689
2690 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
2691                                         struct lfsck_component *com,
2692                                         struct lfsck_request *lr)
2693 {
2694         struct lfsck_instance            *lfsck = com->lc_lfsck;
2695         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
2696         struct lfsck_layout_slave_target *llst;
2697         ENTRY;
2698
2699         if (lr->lr_event != LE_PHASE2_DONE &&
2700             lr->lr_event != LE_STOP)
2701                 RETURN(-EINVAL);
2702
2703         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index);
2704         if (llst == NULL)
2705                 RETURN(-ENODEV);
2706
2707         lfsck_layout_llst_put(llst);
2708         if (list_empty(&llsd->llsd_master_list)) {
2709                 switch (lr->lr_event) {
2710                 case LE_PHASE2_DONE:
2711                         wake_up_all(&lfsck->li_thread.t_ctl_waitq);
2712                         break;
2713                 case LE_STOP: {
2714                         struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2715
2716                         memset(stop, 0, sizeof(*stop));
2717                         stop->ls_status = lr->lr_status;
2718                         stop->ls_flags = lr->lr_param;
2719                         lfsck_stop(env, lfsck->li_bottom, stop);
2720                         break;
2721                 }
2722                 default:
2723                         break;
2724                 }
2725         }
2726
2727         RETURN(0);
2728 }
2729
2730 static int lfsck_layout_query(const struct lu_env *env,
2731                               struct lfsck_component *com)
2732 {
2733         struct lfsck_layout *lo = com->lc_file_ram;
2734
2735         return lo->ll_status;
2736 }
2737
2738 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
2739                                            struct lfsck_component *com,
2740                                            struct lfsck_tgt_descs *ltds,
2741                                            struct lfsck_tgt_desc *ltd,
2742                                            struct ptlrpc_request_set *set)
2743 {
2744         struct lfsck_thread_info          *info  = lfsck_env_info(env);
2745         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2746         struct lfsck_request              *lr    = &info->lti_lr;
2747         struct lfsck_instance             *lfsck = com->lc_lfsck;
2748         int                                rc;
2749
2750         LASSERT(list_empty(&ltd->ltd_layout_list));
2751         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
2752
2753         memset(lr, 0, sizeof(*lr));
2754         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2755         lr->lr_event = LE_STOP;
2756         lr->lr_active = LT_LAYOUT;
2757         if (ltds == &lfsck->li_ost_descs) {
2758                 lr->lr_flags = LEF_TO_OST;
2759         } else {
2760                 if (ltd->ltd_index == lfsck_dev_idx(lfsck->li_bottom))
2761                         return 0;
2762
2763                 lr->lr_flags = 0;
2764         }
2765         lr->lr_status = LS_CO_STOPPED;
2766
2767         laia->laia_com = com;
2768         laia->laia_ltds = ltds;
2769         laia->laia_ltd = ltd;
2770         laia->laia_lr = lr;
2771
2772         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2773                                  lfsck_layout_master_async_interpret,
2774                                  laia, LFSCK_NOTIFY);
2775         if (rc != 0)
2776                 CERROR("%s: Fail to notify %s %x for co-stop: rc = %d\n",
2777                        lfsck_lfsck2name(lfsck),
2778                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2779                        ltd->ltd_index, rc);
2780
2781         return rc;
2782 }
2783
2784 /* with lfsck::li_lock held */
2785 static int lfsck_layout_slave_join(const struct lu_env *env,
2786                                    struct lfsck_component *com,
2787                                    struct lfsck_start_param *lsp)
2788 {
2789         struct lfsck_instance            *lfsck = com->lc_lfsck;
2790         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
2791         struct lfsck_layout_slave_target *llst;
2792         struct lfsck_start               *start = lsp->lsp_start;
2793         int                               rc    = 0;
2794         ENTRY;
2795
2796         if (!lsp->lsp_index_valid || start == NULL ||
2797             !(start->ls_flags & LPF_ALL_MDT))
2798                 RETURN(-EALREADY);
2799
2800         spin_unlock(&lfsck->li_lock);
2801         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
2802         spin_lock(&lfsck->li_lock);
2803         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
2804                 spin_unlock(&lfsck->li_lock);
2805                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index);
2806                 if (llst != NULL)
2807                         lfsck_layout_llst_put(llst);
2808                 spin_lock(&lfsck->li_lock);
2809                 rc = -EAGAIN;
2810         }
2811
2812         RETURN(rc);
2813 }
2814
2815 static struct lfsck_operations lfsck_layout_master_ops = {
2816         .lfsck_reset            = lfsck_layout_reset,
2817         .lfsck_fail             = lfsck_layout_fail,
2818         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
2819         .lfsck_prep             = lfsck_layout_master_prep,
2820         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
2821         .lfsck_exec_dir         = lfsck_layout_exec_dir,
2822         .lfsck_post             = lfsck_layout_master_post,
2823         .lfsck_dump             = lfsck_layout_dump,
2824         .lfsck_double_scan      = lfsck_layout_master_double_scan,
2825         .lfsck_data_release     = lfsck_layout_master_data_release,
2826         .lfsck_quit             = lfsck_layout_master_quit,
2827         .lfsck_in_notify        = lfsck_layout_master_in_notify,
2828         .lfsck_query            = lfsck_layout_query,
2829         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
2830 };
2831
2832 static struct lfsck_operations lfsck_layout_slave_ops = {
2833         .lfsck_reset            = lfsck_layout_reset,
2834         .lfsck_fail             = lfsck_layout_fail,
2835         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
2836         .lfsck_prep             = lfsck_layout_slave_prep,
2837         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
2838         .lfsck_exec_dir         = lfsck_layout_exec_dir,
2839         .lfsck_post             = lfsck_layout_slave_post,
2840         .lfsck_dump             = lfsck_layout_dump,
2841         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
2842         .lfsck_data_release     = lfsck_layout_slave_data_release,
2843         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
2844         .lfsck_query            = lfsck_layout_query,
2845         .lfsck_join             = lfsck_layout_slave_join,
2846 };
2847
2848 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
2849 {
2850         struct lfsck_component  *com;
2851         struct lfsck_layout     *lo;
2852         struct dt_object        *root = NULL;
2853         struct dt_object        *obj;
2854         int                      rc;
2855         ENTRY;
2856
2857         OBD_ALLOC_PTR(com);
2858         if (com == NULL)
2859                 RETURN(-ENOMEM);
2860
2861         INIT_LIST_HEAD(&com->lc_link);
2862         INIT_LIST_HEAD(&com->lc_link_dir);
2863         init_rwsem(&com->lc_sem);
2864         atomic_set(&com->lc_ref, 1);
2865         com->lc_lfsck = lfsck;
2866         com->lc_type = LT_LAYOUT;
2867         if (lfsck->li_master) {
2868                 struct lfsck_layout_master_data *llmd;
2869
2870                 com->lc_ops = &lfsck_layout_master_ops;
2871                 OBD_ALLOC_PTR(llmd);
2872                 if (llmd == NULL)
2873                         GOTO(out, rc = -ENOMEM);
2874
2875                 INIT_LIST_HEAD(&llmd->llmd_req_list);
2876                 spin_lock_init(&llmd->llmd_lock);
2877                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
2878                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
2879                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
2880                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
2881                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
2882                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
2883                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
2884                 atomic_set(&llmd->llmd_rpcs_in_flight, 0);
2885                 com->lc_data = llmd;
2886         } else {
2887                 struct lfsck_layout_slave_data *llsd;
2888
2889                 com->lc_ops = &lfsck_layout_slave_ops;
2890                 OBD_ALLOC_PTR(llsd);
2891                 if (llsd == NULL)
2892                         GOTO(out, rc = -ENOMEM);
2893
2894                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
2895                 INIT_LIST_HEAD(&llsd->llsd_master_list);
2896                 spin_lock_init(&llsd->llsd_lock);
2897                 com->lc_data = llsd;
2898         }
2899         com->lc_file_size = sizeof(*lo);
2900         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
2901         if (com->lc_file_ram == NULL)
2902                 GOTO(out, rc = -ENOMEM);
2903
2904         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
2905         if (com->lc_file_disk == NULL)
2906                 GOTO(out, rc = -ENOMEM);
2907
2908         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
2909         if (IS_ERR(root))
2910                 GOTO(out, rc = PTR_ERR(root));
2911
2912         if (unlikely(!dt_try_as_dir(env, root)))
2913                 GOTO(out, rc = -ENOTDIR);
2914
2915         obj = local_file_find_or_create(env, lfsck->li_los, root,
2916                                         lfsck_layout_name,
2917                                         S_IFREG | S_IRUGO | S_IWUSR);
2918         if (IS_ERR(obj))
2919                 GOTO(out, rc = PTR_ERR(obj));
2920
2921         com->lc_obj = obj;
2922         rc = lfsck_layout_load(env, com);
2923         if (rc > 0)
2924                 rc = lfsck_layout_reset(env, com, true);
2925         else if (rc == -ENOENT)
2926                 rc = lfsck_layout_init(env, com);
2927
2928         if (rc != 0)
2929                 GOTO(out, rc);
2930
2931         lo = com->lc_file_ram;
2932         switch (lo->ll_status) {
2933         case LS_INIT:
2934         case LS_COMPLETED:
2935         case LS_FAILED:
2936         case LS_STOPPED:
2937         case LS_PARTIAL:
2938                 spin_lock(&lfsck->li_lock);
2939                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2940                 spin_unlock(&lfsck->li_lock);
2941                 break;
2942         default:
2943                 CERROR("%s: unknown lfsck_layout status: rc = %u\n",
2944                        lfsck_lfsck2name(lfsck), lo->ll_status);
2945                 /* fall through */
2946         case LS_SCANNING_PHASE1:
2947         case LS_SCANNING_PHASE2:
2948                 /* No need to store the status to disk right now.
2949                  * If the system crashed before the status stored,
2950                  * it will be loaded back when next time. */
2951                 lo->ll_status = LS_CRASHED;
2952                 lo->ll_flags |= LF_INCOMPLETE;
2953                 /* fall through */
2954         case LS_PAUSED:
2955         case LS_CRASHED:
2956         case LS_CO_FAILED:
2957         case LS_CO_STOPPED:
2958         case LS_CO_PAUSED:
2959                 spin_lock(&lfsck->li_lock);
2960                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
2961                 spin_unlock(&lfsck->li_lock);
2962                 break;
2963         }
2964
2965         if (lo->ll_flags & LF_CRASHED_LASTID) {
2966                 LASSERT(lfsck->li_out_notify != NULL);
2967
2968                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
2969                                      LE_LASTID_REBUILDING);
2970         }
2971
2972         GOTO(out, rc = 0);
2973
2974 out:
2975         if (root != NULL && !IS_ERR(root))
2976                 lu_object_put(env, &root->do_lu);
2977
2978         if (rc != 0)
2979                 lfsck_component_cleanup(env, com);
2980
2981         return rc;
2982 }