Whamcloud - gitweb
a26a2fafb1e2097f1caf014c746dff67edb614cf
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37
38 #include <lustre/lustre_idl.h>
39 #include <lu_object.h>
40 #include <dt_object.h>
41 #include <lustre_linkea.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_data {
65         /* list for lfsck_layout_seq */
66         struct list_head         llsd_seq_list;
67 };
68
69 struct lfsck_layout_object {
70         struct dt_object        *llo_obj;
71         struct lu_attr           llo_attr;
72         atomic_t                 llo_ref;
73         __u16                    llo_gen;
74 };
75
76 struct lfsck_layout_req {
77         struct list_head                 llr_list;
78         struct lfsck_layout_object      *llr_parent;
79         struct dt_object                *llr_child;
80         __u32                            llr_ost_idx;
81         __u32                            llr_lov_idx; /* offset in LOV EA */
82 };
83
84 struct lfsck_layout_master_data {
85         spinlock_t              llmd_lock;
86         struct list_head        llmd_req_list;
87
88         /* list for the ost targets involve layout verification. */
89         struct list_head        llmd_ost_list;
90
91         /* list for the ost targets in phase1 scanning. */
92         struct list_head        llmd_ost_phase1_list;
93
94         /* list for the ost targets in phase1 scanning. */
95         struct list_head        llmd_ost_phase2_list;
96
97         struct ptlrpc_thread    llmd_thread;
98         atomic_t                llmd_rpcs_in_flight;
99         __u32                   llmd_touch_gen;
100         int                     llmd_prefetched;
101         int                     llmd_assistant_status;
102         int                     llmd_post_result;
103         unsigned int            llmd_to_post:1,
104                                 llmd_to_double_scan:1,
105                                 llmd_in_double_scan:1,
106                                 llmd_exit:1;
107 };
108
109 static inline void lfsck_layout_object_put(const struct lu_env *env,
110                                            struct lfsck_layout_object *llo)
111 {
112         if (atomic_dec_and_test(&llo->llo_ref)) {
113                 lfsck_object_put(env, llo->llo_obj);
114                 OBD_FREE_PTR(llo);
115         }
116 }
117
118 static inline void lfsck_layout_req_fini(const struct lu_env *env,
119                                          struct lfsck_layout_req *llr)
120 {
121         lu_object_put(env, &llr->llr_child->do_lu);
122         lfsck_layout_object_put(env, llr->llr_parent);
123         OBD_FREE_PTR(llr);
124 }
125
126 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
127 {
128         bool empty = false;
129
130         spin_lock(&llmd->llmd_lock);
131         if (list_empty(&llmd->llmd_req_list))
132                 empty = true;
133         spin_unlock(&llmd->llmd_lock);
134
135         return empty;
136 }
137
138 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
139                                    const struct lfsck_layout *src)
140 {
141         int i;
142
143         des->ll_magic = le32_to_cpu(src->ll_magic);
144         des->ll_status = le32_to_cpu(src->ll_status);
145         des->ll_flags = le32_to_cpu(src->ll_flags);
146         des->ll_success_count = le32_to_cpu(src->ll_success_count);
147         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
148         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
149         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
150         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
151         des->ll_time_last_checkpoint =
152                                 le64_to_cpu(src->ll_time_last_checkpoint);
153         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
154         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
155         des->ll_pos_first_inconsistent =
156                         le64_to_cpu(src->ll_pos_first_inconsistent);
157         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
158         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
159         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
160         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
161         for (i = 0; i < LLIT_MAX; i++)
162                 des->ll_objs_repaired[i] =
163                                 le64_to_cpu(src->ll_objs_repaired[i]);
164         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
165 }
166
167 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
168                                    const struct lfsck_layout *src)
169 {
170         int i;
171
172         des->ll_magic = cpu_to_le32(src->ll_magic);
173         des->ll_status = cpu_to_le32(src->ll_status);
174         des->ll_flags = cpu_to_le32(src->ll_flags);
175         des->ll_success_count = cpu_to_le32(src->ll_success_count);
176         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
177         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
178         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
179         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
180         des->ll_time_last_checkpoint =
181                                 cpu_to_le64(src->ll_time_last_checkpoint);
182         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
183         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
184         des->ll_pos_first_inconsistent =
185                         cpu_to_le64(src->ll_pos_first_inconsistent);
186         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
187         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
188         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
189         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
190         for (i = 0; i < LLIT_MAX; i++)
191                 des->ll_objs_repaired[i] =
192                                 cpu_to_le64(src->ll_objs_repaired[i]);
193         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
194 }
195
196 /**
197  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
198  * \retval 0: succeed.
199  * \retval -ve: failed cases.
200  */
201 static int lfsck_layout_load(const struct lu_env *env,
202                              struct lfsck_component *com)
203 {
204         struct lfsck_layout             *lo     = com->lc_file_ram;
205         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
206         ssize_t                          size   = com->lc_file_size;
207         loff_t                           pos    = 0;
208         int                              rc;
209
210         rc = dbo->dbo_read(env, com->lc_obj,
211                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
212                            BYPASS_CAPA);
213         if (rc == 0) {
214                 return -ENOENT;
215         } else if (rc < 0) {
216                 CWARN("%s: failed to load lfsck_layout: rc = %d\n",
217                       lfsck_lfsck2name(com->lc_lfsck), rc);
218                 return rc;
219         } else if (rc != size) {
220                 CWARN("%s: crashed lfsck_layout, to be reset: rc = %d\n",
221                       lfsck_lfsck2name(com->lc_lfsck), rc);
222                 return 1;
223         }
224
225         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
226         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
227                 CWARN("%s: invalid lfsck_layout magic %#x != %#x, "
228                       "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
229                       lo->ll_magic, LFSCK_LAYOUT_MAGIC);
230                 return 1;
231         }
232
233         return 0;
234 }
235
236 static int lfsck_layout_store(const struct lu_env *env,
237                               struct lfsck_component *com)
238 {
239         struct dt_object         *obj           = com->lc_obj;
240         struct lfsck_instance    *lfsck         = com->lc_lfsck;
241         struct lfsck_layout      *lo            = com->lc_file_disk;
242         struct thandle           *handle;
243         ssize_t                   size          = com->lc_file_size;
244         loff_t                    pos           = 0;
245         int                       rc;
246         ENTRY;
247
248         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
249         handle = dt_trans_create(env, lfsck->li_bottom);
250         if (IS_ERR(handle)) {
251                 rc = PTR_ERR(handle);
252                 CERROR("%s: fail to create trans for storing lfsck_layout: "
253                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
254                 RETURN(rc);
255         }
256
257         rc = dt_declare_record_write(env, obj, size, pos, handle);
258         if (rc != 0) {
259                 CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
260                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
261                 GOTO(out, rc);
262         }
263
264         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
265         if (rc != 0) {
266                 CERROR("%s: fail to start trans for storing lfsck_layout: "
267                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
268                 GOTO(out, rc);
269         }
270
271         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
272                              handle);
273         if (rc != 0)
274                 CERROR("%s: fail to store lfsck_layout(1): size = %d, "
275                        "rc = %d\n", lfsck_lfsck2name(lfsck), (int)size, rc);
276
277         GOTO(out, rc);
278
279 out:
280         dt_trans_stop(env, lfsck->li_bottom, handle);
281
282         return rc;
283 }
284
285 static int lfsck_layout_init(const struct lu_env *env,
286                              struct lfsck_component *com)
287 {
288         struct lfsck_layout *lo = com->lc_file_ram;
289         int rc;
290
291         memset(lo, 0, com->lc_file_size);
292         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
293         lo->ll_status = LS_INIT;
294         down_write(&com->lc_sem);
295         rc = lfsck_layout_store(env, com);
296         up_write(&com->lc_sem);
297
298         return rc;
299 }
300
301 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
302                              struct dt_object *obj, const struct lu_fid *fid)
303 {
304         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
305         struct lu_seq_range      range  = { 0 };
306         struct lustre_mdt_attrs *lma;
307         int                      rc;
308
309         fld_range_set_any(&range);
310         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
311         if (rc == 0) {
312                 if (fld_range_is_ost(&range))
313                         return 1;
314
315                 return 0;
316         }
317
318         lma = &lfsck_env_info(env)->lti_lma;
319         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
320                           XATTR_NAME_LMA, BYPASS_CAPA);
321         if (rc == sizeof(*lma)) {
322                 lustre_lma_swab(lma);
323
324                 /* Generally, the low layer OSD create handler or OI scrub
325                  * will set the LMAC_FID_ON_OST for all external visible
326                  * OST-objects. But to make the otable-based iteration to
327                  * be independent from OI scrub in spite of it got failure
328                  * or not, we check the LMAC_FID_ON_OST here to guarantee
329                  * that the LFSCK will not repair something by wrong. */
330                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
331         }
332
333         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
334
335         return rc > 0;
336 }
337
338 static struct lfsck_layout_seq *
339 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
340 {
341         struct lfsck_layout_seq *lls;
342
343         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
344                 if (lls->lls_seq == seq)
345                         return lls;
346
347                 if (lls->lls_seq > seq)
348                         return NULL;
349         }
350
351         return NULL;
352 }
353
354 static void
355 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
356                         struct lfsck_layout_seq *lls)
357 {
358         struct lfsck_layout_seq *tmp;
359         struct list_head        *pos = &llsd->llsd_seq_list;
360
361         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
362                 if (lls->lls_seq < tmp->lls_seq) {
363                         pos = &tmp->lls_list;
364                         break;
365                 }
366         }
367         list_add_tail(&lls->lls_list, pos);
368 }
369
370 static int
371 lfsck_layout_lastid_create(const struct lu_env *env,
372                            struct lfsck_instance *lfsck,
373                            struct dt_object *obj)
374 {
375         struct lfsck_thread_info *info   = lfsck_env_info(env);
376         struct lu_attr           *la     = &info->lti_la;
377         struct dt_object_format  *dof    = &info->lti_dof;
378         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
379         struct dt_device         *dt     = lfsck->li_bottom;
380         struct thandle           *th;
381         __u64                     lastid = 0;
382         loff_t                    pos    = 0;
383         int                       rc;
384         ENTRY;
385
386         CDEBUG(D_LFSCK, "To create LAST_ID for <seq> "LPX64"\n",
387                fid_seq(lfsck_dto2fid(obj)));
388
389         if (bk->lb_param & LPF_DRYRUN)
390                 return 0;
391
392         memset(la, 0, sizeof(*la));
393         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
394         la->la_valid = LA_MODE | LA_UID | LA_GID;
395         dof->dof_type = dt_mode_to_dft(S_IFREG);
396
397         th = dt_trans_create(env, dt);
398         if (IS_ERR(th))
399                 RETURN(rc = PTR_ERR(th));
400
401         rc = dt_declare_create(env, obj, la, NULL, dof, th);
402         if (rc != 0)
403                 GOTO(stop, rc);
404
405         rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th);
406         if (rc != 0)
407                 GOTO(stop, rc);
408
409         rc = dt_trans_start_local(env, dt, th);
410         if (rc != 0)
411                 GOTO(stop, rc);
412
413         dt_write_lock(env, obj, 0);
414         if (likely(!dt_object_exists(obj))) {
415                 rc = dt_create(env, obj, la, NULL, dof, th);
416                 if (rc == 0)
417                         rc = dt_record_write(env, obj,
418                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
419                                 &pos, th);
420         }
421         dt_write_unlock(env, obj);
422
423         GOTO(stop, rc);
424
425 stop:
426         dt_trans_stop(env, dt, th);
427
428         return rc;
429 }
430
431 static int
432 lfsck_layout_lastid_reload(const struct lu_env *env,
433                            struct lfsck_component *com,
434                            struct lfsck_layout_seq *lls)
435 {
436         __u64   lastid;
437         loff_t  pos     = 0;
438         int     rc;
439
440         dt_read_lock(env, lls->lls_lastid_obj, 0);
441         rc = dt_record_read(env, lls->lls_lastid_obj,
442                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
443         dt_read_unlock(env, lls->lls_lastid_obj);
444         if (unlikely(rc != 0))
445                 return rc;
446
447         lastid = le64_to_cpu(lastid);
448         if (lastid < lls->lls_lastid_known) {
449                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
450                 struct lfsck_layout     *lo     = com->lc_file_ram;
451
452                 lls->lls_lastid = lls->lls_lastid_known;
453                 lls->lls_dirty = 1;
454                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
455                         LASSERT(lfsck->li_out_notify != NULL);
456
457                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
458                                              LE_LASTID_REBUILDING);
459                         lo->ll_flags |= LF_CRASHED_LASTID;
460                 }
461         } else if (lastid >= lls->lls_lastid) {
462                 lls->lls_lastid = lastid;
463                 lls->lls_dirty = 0;
464         }
465
466         return 0;
467 }
468
469 static int
470 lfsck_layout_lastid_store(const struct lu_env *env,
471                           struct lfsck_component *com)
472 {
473         struct lfsck_instance           *lfsck  = com->lc_lfsck;
474         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
475         struct dt_device                *dt     = lfsck->li_bottom;
476         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
477         struct lfsck_layout_seq         *lls;
478         struct thandle                  *th;
479         __u64                            lastid;
480         int                              rc     = 0;
481         int                              rc1    = 0;
482
483         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
484                 loff_t pos = 0;
485
486                 /* XXX: Add the code back if we really found related
487                  *      inconsistent cases in the future. */
488 #if 0
489                 if (!lls->lls_dirty) {
490                         /* In OFD, before the pre-creation, the LAST_ID
491                          * file will be updated firstly, which may hide
492                          * some potential crashed cases. For example:
493                          *
494                          * The old obj1's ID is higher than old LAST_ID
495                          * but lower than the new LAST_ID, but the LFSCK
496                          * have not touch the obj1 until the OFD updated
497                          * the LAST_ID. So the LFSCK does not regard it
498                          * as crashed case. But when OFD does not create
499                          * successfully, it will set the LAST_ID as the
500                          * real created objects' ID, then LFSCK needs to
501                          * found related inconsistency. */
502                         rc = lfsck_layout_lastid_reload(env, com, lls);
503                         if (likely(!lls->lls_dirty))
504                                 continue;
505                 }
506 #endif
507
508                 CDEBUG(D_LFSCK, "To sync the LAST_ID for <seq> "LPX64
509                        " as <oid> "LPU64"\n", lls->lls_seq, lls->lls_lastid);
510
511                 if (bk->lb_param & LPF_DRYRUN) {
512                         lls->lls_dirty = 0;
513                         continue;
514                 }
515
516                 th = dt_trans_create(env, dt);
517                 if (IS_ERR(th)) {
518                         rc1 = PTR_ERR(th);
519                         CERROR("%s: (1) failed to store "LPX64": rc = %d\n",
520                                lfsck_lfsck2name(com->lc_lfsck),
521                                lls->lls_seq, rc1);
522                         continue;
523                 }
524
525                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
526                                              sizeof(lastid), pos, th);
527                 if (rc != 0)
528                         goto stop;
529
530                 rc = dt_trans_start_local(env, dt, th);
531                 if (rc != 0)
532                         goto stop;
533
534                 lastid = cpu_to_le64(lls->lls_lastid);
535                 dt_write_lock(env, lls->lls_lastid_obj, 0);
536                 rc = dt_record_write(env, lls->lls_lastid_obj,
537                                      lfsck_buf_get(env, &lastid,
538                                      sizeof(lastid)), &pos, th);
539                 dt_write_unlock(env, lls->lls_lastid_obj);
540                 if (rc == 0)
541                         lls->lls_dirty = 0;
542
543 stop:
544                 dt_trans_stop(env, dt, th);
545                 if (rc != 0) {
546                         rc1 = rc;
547                         CERROR("%s: (2) failed to store "LPX64": rc = %d\n",
548                                lfsck_lfsck2name(com->lc_lfsck),
549                                lls->lls_seq, rc1);
550                 }
551         }
552
553         return rc1;
554 }
555
556 static int
557 lfsck_layout_lastid_load(const struct lu_env *env,
558                          struct lfsck_component *com,
559                          struct lfsck_layout_seq *lls)
560 {
561         struct lfsck_instance   *lfsck  = com->lc_lfsck;
562         struct lfsck_layout     *lo     = com->lc_file_ram;
563         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
564         struct dt_object        *obj;
565         loff_t                   pos    = 0;
566         int                      rc;
567         ENTRY;
568
569         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
570         obj = dt_locate(env, lfsck->li_bottom, fid);
571         if (IS_ERR(obj))
572                 RETURN(PTR_ERR(obj));
573
574         /* LAST_ID crashed, to be rebuilt */
575         if (!dt_object_exists(obj)) {
576                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
577                         LASSERT(lfsck->li_out_notify != NULL);
578
579                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
580                                              LE_LASTID_REBUILDING);
581                         lo->ll_flags |= LF_CRASHED_LASTID;
582
583                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
584                             cfs_fail_val > 0) {
585                                 struct l_wait_info lwi = LWI_TIMEOUT(
586                                                 cfs_time_seconds(cfs_fail_val),
587                                                 NULL, NULL);
588
589                                 up_write(&com->lc_sem);
590                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
591                                              !thread_is_running(&lfsck->li_thread),
592                                              &lwi);
593                                 down_write(&com->lc_sem);
594                         }
595                 }
596
597                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
598         } else {
599                 dt_read_lock(env, obj, 0);
600                 rc = dt_read(env, obj,
601                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
602                         &pos);
603                 dt_read_unlock(env, obj);
604                 if (rc != 0 && rc != sizeof(__u64))
605                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
606
607                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
608                         LASSERT(lfsck->li_out_notify != NULL);
609
610                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
611                                              LE_LASTID_REBUILDING);
612                         lo->ll_flags |= LF_CRASHED_LASTID;
613                 }
614
615                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
616                 rc = 0;
617         }
618
619         GOTO(out, rc);
620
621 out:
622         if (rc != 0)
623                 lfsck_object_put(env, obj);
624         else
625                 lls->lls_lastid_obj = obj;
626
627         return rc;
628 }
629
630 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
631                                                struct ptlrpc_request *req,
632                                                void *args, int rc)
633 {
634         struct lfsck_async_interpret_args *laia = args;
635         struct lfsck_component            *com  = laia->laia_com;
636         struct lfsck_layout_master_data   *llmd = com->lc_data;
637         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
638         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
639         struct lfsck_request              *lr   = laia->laia_lr;
640
641         switch (lr->lr_event) {
642         case LE_START:
643                 if (rc == 0) {
644                         LASSERT(!list_empty(&ltd->ltd_layout_list));
645
646                         spin_lock(&ltds->ltd_lock);
647                         if (!ltd->ltd_dead) {
648                                 list_add_tail(&ltd->ltd_layout_list,
649                                               &llmd->llmd_ost_list);
650                                 list_add_tail(&ltd->ltd_layout_phase_list,
651                                               &llmd->llmd_ost_phase1_list);
652                         }
653                         spin_unlock(&ltds->ltd_lock);
654                 } else {
655                         struct lfsck_layout *lo = com->lc_file_ram;
656
657                         lo->ll_flags |= LF_INCOMPLETE;
658                 }
659                 lfsck_tgt_put(ltd);
660                 break;
661         case LE_STOP:
662         case LE_PHASE2_DONE:
663                 break;
664         case LE_QUERY:
665                 spin_lock(&ltds->ltd_lock);
666                 if (rc == 0 && !ltd->ltd_dead) {
667                         struct lfsck_reply *reply;
668
669                         reply = req_capsule_server_get(&req->rq_pill,
670                                                        &RMF_LFSCK_REPLY);
671                         switch (reply->lr_status) {
672                         case LS_SCANNING_PHASE1:
673                                 break;
674                         case LS_SCANNING_PHASE2:
675                                 list_del(&ltd->ltd_layout_phase_list);
676                                 list_add_tail(&ltd->ltd_layout_phase_list,
677                                               &llmd->llmd_ost_phase2_list);
678                                 break;
679                         default:
680                                 list_del_init(&ltd->ltd_layout_phase_list);
681                                 list_del_init(&ltd->ltd_layout_list);
682                                 break;
683                         }
684                 }
685                 spin_unlock(&ltds->ltd_lock);
686                 lfsck_tgt_put(ltd);
687                 break;
688         default:
689                 CERROR("%s: unexpected event: rc = %d\n",
690                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
691                 break;
692         }
693
694         return 0;
695 }
696
697 static int lfsck_layout_master_query_others(const struct lu_env *env,
698                                             struct lfsck_component *com)
699 {
700         struct lfsck_thread_info          *info  = lfsck_env_info(env);
701         struct lfsck_request              *lr    = &info->lti_lr;
702         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
703         struct lfsck_instance             *lfsck = com->lc_lfsck;
704         struct lfsck_layout_master_data   *llmd  = com->lc_data;
705         struct ptlrpc_request_set         *set;
706         struct lfsck_tgt_descs            *ltds;
707         struct lfsck_tgt_desc             *ltd;
708         __u32                              cnt   = 0;
709         int                                rc    = 0;
710         int                                rc1   = 0;
711         ENTRY;
712
713         set = ptlrpc_prep_set();
714         if (set == NULL)
715                 RETURN(-ENOMEM);
716
717         llmd->llmd_touch_gen++;
718         ltds = &lfsck->li_ost_descs;
719         memset(lr, 0, sizeof(*lr));
720         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
721         lr->lr_event = LE_QUERY;
722         lr->lr_active = LT_LAYOUT;
723
724         laia->laia_com = com;
725         laia->laia_ltds = ltds;
726         laia->laia_lr = lr;
727         spin_lock(&ltds->ltd_lock);
728         while (!list_empty(&llmd->llmd_ost_phase1_list)) {
729                 ltd = list_entry(llmd->llmd_ost_phase1_list.next,
730                                  struct lfsck_tgt_desc,
731                                  ltd_layout_phase_list);
732                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
733                         break;
734
735                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
736                 list_del(&ltd->ltd_layout_phase_list);
737                 list_add_tail(&ltd->ltd_layout_phase_list,
738                               &llmd->llmd_ost_phase1_list);
739                 atomic_inc(&ltd->ltd_ref);
740                 laia->laia_ltd = ltd;
741                 spin_unlock(&ltds->ltd_lock);
742                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
743                                          lfsck_layout_master_async_interpret,
744                                          laia, LFSCK_QUERY);
745                 if (rc != 0) {
746                         CERROR("%s: fail to query OST %x for layout: rc = %d\n",
747                                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
748                         lfsck_tgt_put(ltd);
749                         rc1 = rc;
750                 } else {
751                         cnt++;
752                 }
753                 spin_lock(&ltds->ltd_lock);
754         }
755         spin_unlock(&ltds->ltd_lock);
756
757         if (cnt > 0)
758                 rc = ptlrpc_set_wait(set);
759         ptlrpc_set_destroy(set);
760
761         RETURN(rc1 != 0 ? rc1 : rc);
762 }
763
764 static inline bool
765 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
766 {
767         return !list_empty(&llmd->llmd_ost_phase2_list) ||
768                list_empty(&llmd->llmd_ost_phase1_list);
769 }
770
771 static int lfsck_layout_master_notify_others(const struct lu_env *env,
772                                              struct lfsck_component *com,
773                                              struct lfsck_request *lr)
774 {
775         struct lfsck_thread_info          *info  = lfsck_env_info(env);
776         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
777         struct lfsck_instance             *lfsck = com->lc_lfsck;
778         struct lfsck_layout_master_data   *llmd  = com->lc_data;
779         struct lfsck_layout               *lo    = com->lc_file_ram;
780         struct ptlrpc_request_set         *set;
781         struct lfsck_tgt_descs            *ltds;
782         struct lfsck_tgt_desc             *ltd;
783         __u32                              idx;
784         __u32                              cnt   = 0;
785         int                                rc    = 0;
786         ENTRY;
787
788         set = ptlrpc_prep_set();
789         if (set == NULL)
790                 RETURN(-ENOMEM);
791
792         lr->lr_active = LT_LAYOUT;
793         laia->laia_com = com;
794         laia->laia_lr = lr;
795         switch (lr->lr_event) {
796         case LE_START:
797                 ltds = &lfsck->li_ost_descs;
798                 laia->laia_ltds = ltds;
799                 down_read(&ltds->ltd_rw_sem);
800                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
801                         ltd = lfsck_tgt_get(ltds, idx);
802                         LASSERT(ltd != NULL);
803
804                         laia->laia_ltd = ltd;
805                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
806                                         lfsck_layout_master_async_interpret,
807                                         laia, LFSCK_NOTIFY);
808                         if (rc != 0) {
809                                 CERROR("%s: fail to notify OST %x for layout "
810                                        "start: rc = %d\n",
811                                        lfsck_lfsck2name(lfsck), idx, rc);
812                                 lfsck_tgt_put(ltd);
813                                 lo->ll_flags |= LF_INCOMPLETE;
814                         } else {
815                                 cnt++;
816                         }
817                 }
818                 up_read(&ltds->ltd_rw_sem);
819                 break;
820         case LE_STOP:
821         case LE_PHASE2_DONE:
822                 ltds = &lfsck->li_ost_descs;
823                 laia->laia_ltds = ltds;
824                 spin_lock(&ltds->ltd_lock);
825                 while (!list_empty(&llmd->llmd_ost_list)) {
826                         ltd = list_entry(llmd->llmd_ost_list.next,
827                                          struct lfsck_tgt_desc,
828                                          ltd_layout_list);
829                         list_del_init(&ltd->ltd_layout_phase_list);
830                         list_del_init(&ltd->ltd_layout_list);
831                         laia->laia_ltd = ltd;
832                         spin_unlock(&ltds->ltd_lock);
833                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
834                                         lfsck_layout_master_async_interpret,
835                                         laia, LFSCK_NOTIFY);
836                         if (rc != 0)
837                                 CERROR("%s: fail to notify OST %x for layout "
838                                        "stop/done: rc = %d\n",
839                                        lfsck_lfsck2name(lfsck),
840                                        ltd->ltd_index, rc);
841                         else
842                                 cnt++;
843                         spin_lock(&ltds->ltd_lock);
844                 }
845                 spin_unlock(&ltds->ltd_lock);
846                 break;
847         case LE_PHASE1_DONE:
848                 break;
849         default:
850                 CERROR("%s: unexpected LFSCK event: rc = %d\n",
851                        lfsck_lfsck2name(lfsck), lr->lr_event);
852                 rc = -EINVAL;
853                 break;
854         }
855
856         if (cnt > 0)
857                 rc = ptlrpc_set_wait(set);
858         ptlrpc_set_destroy(set);
859
860         if (rc == 0 && lr->lr_event == LE_START &&
861             list_empty(&llmd->llmd_ost_list))
862                 rc = -ENODEV;
863
864         RETURN(rc);
865 }
866
867 static int lfsck_layout_double_scan_result(const struct lu_env *env,
868                                            struct lfsck_component *com,
869                                            int rc)
870 {
871         struct lfsck_instance   *lfsck = com->lc_lfsck;
872         struct lfsck_layout     *lo    = com->lc_file_ram;
873         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
874
875         down_write(&com->lc_sem);
876
877         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
878                                 HALF_SEC - lfsck->li_time_last_checkpoint);
879         lo->ll_time_last_checkpoint = cfs_time_current_sec();
880         lo->ll_objs_checked_phase2 += com->lc_new_checked;
881
882         if (rc > 0) {
883                 com->lc_journal = 0;
884                 if (lo->ll_flags & LF_INCOMPLETE)
885                         lo->ll_status = LS_PARTIAL;
886                 else
887                         lo->ll_status = LS_COMPLETED;
888                 if (!(bk->lb_param & LPF_DRYRUN))
889                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
890                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
891                 lo->ll_success_count++;
892         } else if (rc == 0) {
893                 lo->ll_status = lfsck->li_status;
894                 if (lo->ll_status == 0)
895                         lo->ll_status = LS_STOPPED;
896         } else {
897                 lo->ll_status = LS_FAILED;
898         }
899
900         if (lo->ll_status != LS_PAUSED) {
901                 spin_lock(&lfsck->li_lock);
902                 list_del_init(&com->lc_link);
903                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
904                 spin_unlock(&lfsck->li_lock);
905         }
906
907         rc = lfsck_layout_store(env, com);
908
909         up_write(&com->lc_sem);
910
911         return rc;
912 }
913
914 static int lfsck_layout_scan_orphan(const struct lu_env *env,
915                                     struct lfsck_component *com,
916                                     struct lfsck_tgt_desc *ltd)
917 {
918         /* XXX: To be extended in other patch. */
919
920         return 0;
921 }
922
923 static int lfsck_layout_assistant(void *args)
924 {
925         struct lfsck_thread_args        *lta     = args;
926         struct lu_env                   *env     = &lta->lta_env;
927         struct lfsck_component          *com     = lta->lta_com;
928         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
929         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
930         struct lfsck_position           *pos     = &com->lc_pos_start;
931         struct lfsck_thread_info        *info    = lfsck_env_info(env);
932         struct lfsck_request            *lr      = &info->lti_lr;
933         struct lfsck_layout_master_data *llmd    = com->lc_data;
934         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
935         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
936         struct lfsck_layout_req         *llr;
937         struct l_wait_info               lwi     = { 0 };
938         int                              rc      = 0;
939         int                              rc1     = 0;
940         ENTRY;
941
942         memset(lr, 0, sizeof(*lr));
943         lr->lr_event = LE_START;
944         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
945         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
946                        LSV_ASYNC_WINDOWS;
947         lr->lr_speed = bk->lb_speed_limit;
948         lr->lr_version = bk->lb_version;
949         lr->lr_param = bk->lb_param;
950         lr->lr_async_windows = bk->lb_async_windows;
951         if (pos->lp_oit_cookie <= 1)
952                 lr->lr_param |= LPF_RESET;
953
954         rc = lfsck_layout_master_notify_others(env, com, lr);
955         if (rc != 0) {
956                 CERROR("%s: fail to notify others for layout start: rc = %d\n",
957                        lfsck_lfsck2name(lfsck), rc);
958                 GOTO(fini, rc);
959         }
960
961         spin_lock(&llmd->llmd_lock);
962         thread_set_flags(athread, SVC_RUNNING);
963         spin_unlock(&llmd->llmd_lock);
964         wake_up_all(&mthread->t_ctl_waitq);
965
966         while (1) {
967                 while (!list_empty(&llmd->llmd_req_list)) {
968                         bool wakeup = false;
969
970                         l_wait_event(athread->t_ctl_waitq,
971                                      bk->lb_async_windows == 0 ||
972                                      atomic_read(&llmd->llmd_rpcs_in_flight) <
973                                                 bk->lb_async_windows ||
974                                      llmd->llmd_exit,
975                                      &lwi);
976
977                         if (unlikely(llmd->llmd_exit))
978                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
979
980                         /* XXX: To be extended in other patch.
981                          *
982                          * Compare the OST side attribute with local attribute,
983                          * and fix it if found inconsistency. */
984
985                         spin_lock(&llmd->llmd_lock);
986                         llr = list_entry(llmd->llmd_req_list.next,
987                                          struct lfsck_layout_req,
988                                          llr_list);
989                         list_del_init(&llr->llr_list);
990                         if (bk->lb_async_windows != 0 &&
991                             llmd->llmd_prefetched >= bk->lb_async_windows)
992                                 wakeup = true;
993
994                         llmd->llmd_prefetched--;
995                         spin_unlock(&llmd->llmd_lock);
996                         if (wakeup)
997                                 wake_up_all(&mthread->t_ctl_waitq);
998
999                         lfsck_layout_req_fini(env, llr);
1000                 }
1001
1002                 /* Wakeup the master engine if it is waiting in checkpoint. */
1003                 if (atomic_read(&llmd->llmd_rpcs_in_flight) == 0)
1004                         wake_up_all(&mthread->t_ctl_waitq);
1005
1006                 l_wait_event(athread->t_ctl_waitq,
1007                              !lfsck_layout_req_empty(llmd) ||
1008                              llmd->llmd_exit ||
1009                              llmd->llmd_to_post ||
1010                              llmd->llmd_to_double_scan,
1011                              &lwi);
1012
1013                 if (unlikely(llmd->llmd_exit))
1014                         GOTO(cleanup1, rc = llmd->llmd_post_result);
1015
1016                 if (!list_empty(&llmd->llmd_req_list))
1017                         continue;
1018
1019                 if (llmd->llmd_to_post) {
1020                         llmd->llmd_to_post = 0;
1021                         LASSERT(llmd->llmd_post_result > 0);
1022
1023                         memset(lr, 0, sizeof(*lr));
1024                         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1025                         lr->lr_event = LE_PHASE1_DONE;
1026                         lr->lr_status = llmd->llmd_post_result;
1027                         rc = lfsck_layout_master_notify_others(env, com, lr);
1028                         if (rc != 0)
1029                                 CERROR("%s: failed to notify others "
1030                                        "for layout post: rc = %d\n",
1031                                        lfsck_lfsck2name(lfsck), rc);
1032
1033                         /* Wakeup the master engine to go ahead. */
1034                         wake_up_all(&mthread->t_ctl_waitq);
1035                 }
1036
1037                 if (llmd->llmd_to_double_scan) {
1038                         llmd->llmd_to_double_scan = 0;
1039                         atomic_inc(&lfsck->li_double_scan_count);
1040                         llmd->llmd_in_double_scan = 1;
1041                         wake_up_all(&mthread->t_ctl_waitq);
1042
1043                         while (llmd->llmd_in_double_scan) {
1044                                 struct lfsck_tgt_descs  *ltds =
1045                                                         &lfsck->li_ost_descs;
1046                                 struct lfsck_tgt_desc   *ltd;
1047
1048                                 rc = lfsck_layout_master_query_others(env, com);
1049                                 if (lfsck_layout_master_to_orphan(llmd))
1050                                         goto orphan;
1051
1052                                 if (rc < 0)
1053                                         GOTO(cleanup2, rc);
1054
1055                                 /* Pull LFSCK status on related targets once
1056                                  * per 30 seconds if we are not notified. */
1057                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
1058                                                            cfs_time_seconds(1),
1059                                                            NULL, NULL);
1060                                 rc = l_wait_event(athread->t_ctl_waitq,
1061                                         lfsck_layout_master_to_orphan(llmd) ||
1062                                         llmd->llmd_exit ||
1063                                         !thread_is_running(mthread),
1064                                         &lwi);
1065
1066                                 if (unlikely(llmd->llmd_exit ||
1067                                              !thread_is_running(mthread)))
1068                                         GOTO(cleanup2, rc = 0);
1069
1070                                 if (rc == -ETIMEDOUT)
1071                                         continue;
1072
1073                                 if (rc < 0)
1074                                         GOTO(cleanup2, rc);
1075
1076 orphan:
1077                                 spin_lock(&ltds->ltd_lock);
1078                                 while (!list_empty(
1079                                                 &llmd->llmd_ost_phase2_list)) {
1080                                         ltd = list_entry(
1081                                               llmd->llmd_ost_phase2_list.next,
1082                                               struct lfsck_tgt_desc,
1083                                               ltd_layout_phase_list);
1084                                         list_del_init(
1085                                                 &ltd->ltd_layout_phase_list);
1086                                         spin_unlock(&ltds->ltd_lock);
1087
1088                                         rc = lfsck_layout_scan_orphan(env, com,
1089                                                                       ltd);
1090                                         if (rc != 0 &&
1091                                             bk->lb_param & LPF_FAILOUT)
1092                                                 GOTO(cleanup2, rc);
1093
1094                                         if (unlikely(llmd->llmd_exit ||
1095                                                 !thread_is_running(mthread)))
1096                                                 GOTO(cleanup2, rc = 0);
1097
1098                                         spin_lock(&ltds->ltd_lock);
1099                                 }
1100
1101                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
1102                                         spin_unlock(&ltds->ltd_lock);
1103                                         GOTO(cleanup2, rc = 1);
1104                                 }
1105                                 spin_unlock(&ltds->ltd_lock);
1106                         }
1107                 }
1108         }
1109
1110 cleanup1:
1111         /* Cleanup the unfinished requests. */
1112         spin_lock(&llmd->llmd_lock);
1113         while (!list_empty(&llmd->llmd_req_list)) {
1114                 llr = list_entry(llmd->llmd_req_list.next,
1115                                  struct lfsck_layout_req,
1116                                  llr_list);
1117                 list_del_init(&llr->llr_list);
1118                 llmd->llmd_prefetched--;
1119                 spin_unlock(&llmd->llmd_lock);
1120                 lfsck_layout_req_fini(env, llr);
1121                 spin_lock(&llmd->llmd_lock);
1122         }
1123         spin_unlock(&llmd->llmd_lock);
1124
1125         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
1126                  llmd->llmd_prefetched);
1127
1128         l_wait_event(athread->t_ctl_waitq,
1129                      atomic_read(&llmd->llmd_rpcs_in_flight) == 0,
1130                      &lwi);
1131
1132 cleanup2:
1133         memset(lr, 0, sizeof(*lr));
1134         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1135         if (rc > 0) {
1136                 lr->lr_event = LE_PHASE2_DONE;
1137                 lr->lr_status = rc;
1138         } else if (rc == 0) {
1139                 lr->lr_event = LE_STOP;
1140                 if (lfsck->li_status == LS_PAUSED ||
1141                     lfsck->li_status == LS_CO_PAUSED)
1142                         lr->lr_status = LS_CO_PAUSED;
1143                 else if (lfsck->li_status == LS_STOPPED ||
1144                          lfsck->li_status == LS_CO_STOPPED)
1145                         lr->lr_status = LS_CO_STOPPED;
1146                 else
1147                         LBUG();
1148         } else {
1149                 lr->lr_event = LE_STOP;
1150                 lr->lr_status = LS_CO_FAILED;
1151         }
1152
1153         rc1 = lfsck_layout_master_notify_others(env, com, lr);
1154         if (rc1 != 0) {
1155                 CERROR("%s: failed to notify others for layout quit: rc = %d\n",
1156                        lfsck_lfsck2name(lfsck), rc1);
1157                 rc = rc1;
1158         }
1159
1160         /* Under force exit case, some requests may be just freed without
1161          * verification, those objects should be re-handled when next run.
1162          * So not update the on-disk tracing file under such case. */
1163         if (!llmd->llmd_exit)
1164                 rc1 = lfsck_layout_double_scan_result(env, com, rc);
1165
1166 fini:
1167         if (llmd->llmd_in_double_scan)
1168                 atomic_dec(&lfsck->li_double_scan_count);
1169
1170         spin_lock(&llmd->llmd_lock);
1171         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
1172         thread_set_flags(athread, SVC_STOPPED);
1173         wake_up_all(&mthread->t_ctl_waitq);
1174         spin_unlock(&llmd->llmd_lock);
1175         lfsck_thread_args_fini(lta);
1176
1177         return rc;
1178 }
1179
1180 /* layout APIs */
1181
1182 static int lfsck_layout_reset(const struct lu_env *env,
1183                               struct lfsck_component *com, bool init)
1184 {
1185         struct lfsck_layout     *lo    = com->lc_file_ram;
1186         int                      rc;
1187
1188         down_write(&com->lc_sem);
1189         if (init) {
1190                 memset(lo, 0, com->lc_file_size);
1191         } else {
1192                 __u32 count = lo->ll_success_count;
1193                 __u64 last_time = lo->ll_time_last_complete;
1194
1195                 memset(lo, 0, com->lc_file_size);
1196                 lo->ll_success_count = count;
1197                 lo->ll_time_last_complete = last_time;
1198         }
1199
1200         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
1201         lo->ll_status = LS_INIT;
1202
1203         rc = lfsck_layout_store(env, com);
1204         up_write(&com->lc_sem);
1205
1206         return rc;
1207 }
1208
1209 static void lfsck_layout_fail(const struct lu_env *env,
1210                               struct lfsck_component *com, bool new_checked)
1211 {
1212         struct lfsck_layout *lo = com->lc_file_ram;
1213
1214         down_write(&com->lc_sem);
1215         if (new_checked)
1216                 com->lc_new_checked++;
1217         lo->ll_objs_failed_phase1++;
1218         if (lo->ll_pos_first_inconsistent == 0) {
1219                 struct lfsck_instance *lfsck = com->lc_lfsck;
1220
1221                 lo->ll_pos_first_inconsistent =
1222                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1223                                                         lfsck->li_di_oit);
1224         }
1225         up_write(&com->lc_sem);
1226 }
1227
1228 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
1229                                           struct lfsck_component *com, bool init)
1230 {
1231         struct lfsck_instance           *lfsck   = com->lc_lfsck;
1232         struct lfsck_layout             *lo      = com->lc_file_ram;
1233         struct lfsck_layout_master_data *llmd    = com->lc_data;
1234         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1235         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1236         struct l_wait_info               lwi     = { 0 };
1237         int                              rc;
1238
1239         if (com->lc_new_checked == 0 && !init)
1240                 return 0;
1241
1242         l_wait_event(mthread->t_ctl_waitq,
1243                      (list_empty(&llmd->llmd_req_list) &&
1244                       atomic_read(&llmd->llmd_rpcs_in_flight) == 0) ||
1245                      !thread_is_running(mthread) ||
1246                      thread_is_stopped(athread),
1247                      &lwi);
1248
1249         if (!thread_is_running(mthread) || thread_is_stopped(athread))
1250                 return 0;
1251
1252         down_write(&com->lc_sem);
1253         if (init) {
1254                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
1255         } else {
1256                 lo->ll_pos_last_checkpoint =
1257                                         lfsck->li_pos_current.lp_oit_cookie;
1258                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1259                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1260                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1261                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
1262                 com->lc_new_checked = 0;
1263         }
1264
1265         rc = lfsck_layout_store(env, com);
1266         up_write(&com->lc_sem);
1267
1268         return rc;
1269 }
1270
1271 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
1272                                          struct lfsck_component *com, bool init)
1273 {
1274         struct lfsck_instance   *lfsck = com->lc_lfsck;
1275         struct lfsck_layout     *lo    = com->lc_file_ram;
1276         int                      rc;
1277
1278         if (com->lc_new_checked == 0 && !init)
1279                 return 0;
1280
1281         down_write(&com->lc_sem);
1282
1283         if (init) {
1284                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
1285         } else {
1286                 lo->ll_pos_last_checkpoint =
1287                                         lfsck->li_pos_current.lp_oit_cookie;
1288                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1289                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1290                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1291                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
1292                 com->lc_new_checked = 0;
1293         }
1294
1295         rc = lfsck_layout_store(env, com);
1296
1297         up_write(&com->lc_sem);
1298
1299         return rc;
1300 }
1301
1302 static int lfsck_layout_slave_prep(const struct lu_env *env,
1303                                    struct lfsck_component *com)
1304 {
1305         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1306         struct lfsck_layout     *lo     = com->lc_file_ram;
1307         struct lfsck_position   *pos    = &com->lc_pos_start;
1308
1309         /* XXX: For a new scanning, generate OST-objects
1310          *      bitmap for orphan detection. */
1311
1312         fid_zero(&pos->lp_dir_parent);
1313         pos->lp_dir_cookie = 0;
1314         if (lo->ll_status == LS_COMPLETED ||
1315             lo->ll_status == LS_PARTIAL) {
1316                 int rc;
1317
1318                 rc = lfsck_layout_reset(env, com, false);
1319                 if (rc != 0)
1320                         return rc;
1321         }
1322
1323         down_write(&com->lc_sem);
1324
1325         lo->ll_time_latest_start = cfs_time_current_sec();
1326
1327         spin_lock(&lfsck->li_lock);
1328         if (lo->ll_flags & LF_SCANNED_ONCE) {
1329                 if (!lfsck->li_drop_dryrun ||
1330                     lo->ll_pos_first_inconsistent == 0) {
1331                         lo->ll_status = LS_SCANNING_PHASE2;
1332                         list_del_init(&com->lc_link);
1333                         list_add_tail(&com->lc_link,
1334                                       &lfsck->li_list_double_scan);
1335                         pos->lp_oit_cookie = 0;
1336                 } else {
1337                         int i;
1338
1339                         lo->ll_status = LS_SCANNING_PHASE1;
1340                         lo->ll_run_time_phase1 = 0;
1341                         lo->ll_run_time_phase2 = 0;
1342                         lo->ll_objs_checked_phase1 = 0;
1343                         lo->ll_objs_checked_phase2 = 0;
1344                         lo->ll_objs_failed_phase1 = 0;
1345                         lo->ll_objs_failed_phase2 = 0;
1346                         for (i = 0; i < LLIT_MAX; i++)
1347                                 lo->ll_objs_repaired[i] = 0;
1348
1349                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
1350                 }
1351         } else {
1352                 lo->ll_status = LS_SCANNING_PHASE1;
1353                 if (!lfsck->li_drop_dryrun ||
1354                     lo->ll_pos_first_inconsistent == 0)
1355                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
1356                 else
1357                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
1358         }
1359         spin_unlock(&lfsck->li_lock);
1360
1361         up_write(&com->lc_sem);
1362
1363         return 0;
1364 }
1365
1366 static int lfsck_layout_master_prep(const struct lu_env *env,
1367                                     struct lfsck_component *com)
1368 {
1369         struct lfsck_instance           *lfsck   = com->lc_lfsck;
1370         struct lfsck_layout_master_data *llmd    = com->lc_data;
1371         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1372         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1373         struct lfsck_thread_args        *lta;
1374         long                             rc;
1375         ENTRY;
1376
1377         rc = lfsck_layout_slave_prep(env, com);
1378         if (rc != 0)
1379                 RETURN(rc);
1380
1381         llmd->llmd_assistant_status = 0;
1382         llmd->llmd_post_result = 0;
1383         llmd->llmd_to_post = 0;
1384         llmd->llmd_to_double_scan = 0;
1385         llmd->llmd_in_double_scan = 0;
1386         llmd->llmd_exit = 0;
1387         thread_set_flags(athread, 0);
1388
1389         lta = lfsck_thread_args_init(lfsck, com);
1390         if (IS_ERR(lta))
1391                 RETURN(PTR_ERR(lta));
1392
1393         rc = PTR_ERR(kthread_run(lfsck_layout_assistant, lta, "lfsck_layout"));
1394         if (IS_ERR_VALUE(rc)) {
1395                 CERROR("%s: Cannot start LFSCK layout assistant thread: "
1396                        "rc = %ld\n", lfsck_lfsck2name(lfsck), rc);
1397                 lfsck_thread_args_fini(lta);
1398         } else {
1399                 struct l_wait_info lwi = { 0 };
1400
1401                 l_wait_event(mthread->t_ctl_waitq,
1402                              thread_is_running(athread) ||
1403                              thread_is_stopped(athread),
1404                              &lwi);
1405                 if (unlikely(!thread_is_running(athread)))
1406                         rc = llmd->llmd_assistant_status;
1407                 else
1408                         rc = 0;
1409         }
1410
1411         RETURN(rc);
1412 }
1413
1414 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
1415                                         struct lfsck_component *com,
1416                                         struct dt_object *obj)
1417 {
1418         /* XXX: To be implemented in other patches.
1419          *
1420          * For the given object, read its layout EA locally. For each stripe,
1421          * pre-fetch the OST-object's attribute and generate an structure
1422          * lfsck_layout_req on the list ::llmd_req_list.
1423          *
1424          * For each request on the ::llmd_req_list, the lfsck_layout_assistant
1425          * thread will compare the OST side attribute with local attribute,
1426          * if inconsistent, then repair it.
1427          *
1428          * All above processing is async mode with pipeline. */
1429
1430         return 0;
1431 }
1432
1433 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
1434                                        struct lfsck_component *com,
1435                                        struct dt_object *obj)
1436 {
1437         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1438         struct lfsck_layout             *lo     = com->lc_file_ram;
1439         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
1440         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1441         struct lfsck_layout_seq         *lls;
1442         __u64                            seq;
1443         __u64                            oid;
1444         int                              rc;
1445         ENTRY;
1446
1447         /* XXX: Update OST-objects bitmap for orphan detection. */
1448
1449         LASSERT(llsd != NULL);
1450
1451         down_write(&com->lc_sem);
1452         if (fid_is_idif(fid))
1453                 seq = 0;
1454         else if (!fid_is_norm(fid) ||
1455                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
1456                 GOTO(unlock, rc = 0);
1457         else
1458                 seq = fid_seq(fid);
1459         com->lc_new_checked++;
1460
1461         lls = lfsck_layout_seq_lookup(llsd, seq);
1462         if (lls == NULL) {
1463                 OBD_ALLOC_PTR(lls);
1464                 if (unlikely(lls == NULL))
1465                         GOTO(unlock, rc = -ENOMEM);
1466
1467                 INIT_LIST_HEAD(&lls->lls_list);
1468                 lls->lls_seq = seq;
1469                 rc = lfsck_layout_lastid_load(env, com, lls);
1470                 if (rc != 0) {
1471                         lo->ll_objs_failed_phase1++;
1472                         OBD_FREE_PTR(lls);
1473                         GOTO(unlock, rc);
1474                 }
1475
1476                 lfsck_layout_seq_insert(llsd, lls);
1477         }
1478
1479         if (unlikely(fid_is_last_id(fid)))
1480                 GOTO(unlock, rc = 0);
1481
1482         oid = fid_oid(fid);
1483         if (oid > lls->lls_lastid_known)
1484                 lls->lls_lastid_known = oid;
1485
1486         if (oid > lls->lls_lastid) {
1487                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1488                         /* OFD may create new objects during LFSCK scanning. */
1489                         rc = lfsck_layout_lastid_reload(env, com, lls);
1490                         if (unlikely(rc != 0))
1491                                 CWARN("%s: failed to reload LAST_ID for "LPX64
1492                                       ": rc = %d\n",
1493                                       lfsck_lfsck2name(com->lc_lfsck),
1494                                       lls->lls_seq, rc);
1495                         if (oid <= lls->lls_lastid)
1496                                 GOTO(unlock, rc = 0);
1497
1498                         LASSERT(lfsck->li_out_notify != NULL);
1499
1500                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1501                                              LE_LASTID_REBUILDING);
1502                         lo->ll_flags |= LF_CRASHED_LASTID;
1503                 }
1504
1505                 lls->lls_lastid = oid;
1506                 lls->lls_dirty = 1;
1507         }
1508
1509         GOTO(unlock, rc = 0);
1510
1511 unlock:
1512         up_write(&com->lc_sem);
1513
1514         return rc;
1515 }
1516
1517 static int lfsck_layout_exec_dir(const struct lu_env *env,
1518                                  struct lfsck_component *com,
1519                                  struct dt_object *obj,
1520                                  struct lu_dirent *ent)
1521 {
1522         return 0;
1523 }
1524
1525 static int lfsck_layout_master_post(const struct lu_env *env,
1526                                     struct lfsck_component *com,
1527                                     int result, bool init)
1528 {
1529         struct lfsck_instance           *lfsck   = com->lc_lfsck;
1530         struct lfsck_layout             *lo      = com->lc_file_ram;
1531         struct lfsck_layout_master_data *llmd    = com->lc_data;
1532         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1533         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1534         struct l_wait_info               lwi     = { 0 };
1535         int                              rc;
1536         ENTRY;
1537
1538
1539         llmd->llmd_post_result = result;
1540         llmd->llmd_to_post = 1;
1541         if (llmd->llmd_post_result <= 0)
1542                 llmd->llmd_exit = 1;
1543
1544         wake_up_all(&athread->t_ctl_waitq);
1545         l_wait_event(mthread->t_ctl_waitq,
1546                      (result > 0 && list_empty(&llmd->llmd_req_list) &&
1547                       atomic_read(&llmd->llmd_rpcs_in_flight) == 0) ||
1548                      thread_is_stopped(athread),
1549                      &lwi);
1550
1551         if (llmd->llmd_assistant_status < 0)
1552                 result = llmd->llmd_assistant_status;
1553
1554         down_write(&com->lc_sem);
1555         spin_lock(&lfsck->li_lock);
1556         /* When LFSCK failed, there may be some prefetched objects those are
1557          * not been processed yet, we do not know the exactly position, then
1558          * just restart from last check-point next time. */
1559         if (!init && !llmd->llmd_exit)
1560                 lo->ll_pos_last_checkpoint =
1561                                         lfsck->li_pos_current.lp_oit_cookie;
1562
1563         if (result > 0) {
1564                 lo->ll_status = LS_SCANNING_PHASE2;
1565                 lo->ll_flags |= LF_SCANNED_ONCE;
1566                 lo->ll_flags &= ~LF_UPGRADE;
1567                 list_del_init(&com->lc_link);
1568                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
1569         } else if (result == 0) {
1570                 lo->ll_status = lfsck->li_status;
1571                 if (lo->ll_status == 0)
1572                         lo->ll_status = LS_STOPPED;
1573                 if (lo->ll_status != LS_PAUSED) {
1574                         list_del_init(&com->lc_link);
1575                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1576                 }
1577         } else {
1578                 lo->ll_status = LS_FAILED;
1579                 list_del_init(&com->lc_link);
1580                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1581         }
1582         spin_unlock(&lfsck->li_lock);
1583
1584         if (!init) {
1585                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1586                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1587                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1588                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
1589                 com->lc_new_checked = 0;
1590         }
1591
1592         rc = lfsck_layout_store(env, com);
1593         up_write(&com->lc_sem);
1594
1595         RETURN(rc);
1596 }
1597
1598 static int lfsck_layout_slave_post(const struct lu_env *env,
1599                                    struct lfsck_component *com,
1600                                    int result, bool init)
1601 {
1602         struct lfsck_instance   *lfsck = com->lc_lfsck;
1603         struct lfsck_layout     *lo    = com->lc_file_ram;
1604         int                      rc;
1605         bool                     done  = false;
1606
1607         rc = lfsck_layout_lastid_store(env, com);
1608         if (rc != 0)
1609                 result = rc;
1610
1611         LASSERT(lfsck->li_out_notify != NULL);
1612
1613         down_write(&com->lc_sem);
1614
1615         spin_lock(&lfsck->li_lock);
1616         if (!init)
1617                 lo->ll_pos_last_checkpoint =
1618                                         lfsck->li_pos_current.lp_oit_cookie;
1619         if (result > 0) {
1620                 lo->ll_status = LS_SCANNING_PHASE2;
1621                 lo->ll_flags |= LF_SCANNED_ONCE;
1622                 if (lo->ll_flags & LF_CRASHED_LASTID) {
1623                         done = true;
1624                         lo->ll_flags &= ~LF_CRASHED_LASTID;
1625                 }
1626                 lo->ll_flags &= ~LF_UPGRADE;
1627                 list_del_init(&com->lc_link);
1628                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
1629         } else if (result == 0) {
1630                 lo->ll_status = lfsck->li_status;
1631                 if (lo->ll_status == 0)
1632                         lo->ll_status = LS_STOPPED;
1633                 if (lo->ll_status != LS_PAUSED) {
1634                         list_del_init(&com->lc_link);
1635                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1636                 }
1637         } else {
1638                 lo->ll_status = LS_FAILED;
1639                 list_del_init(&com->lc_link);
1640                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1641         }
1642         spin_unlock(&lfsck->li_lock);
1643
1644         if (done)
1645                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1646                                      LE_LASTID_REBUILT);
1647
1648         if (!init) {
1649                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1650                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1651                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1652                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
1653                 com->lc_new_checked = 0;
1654         }
1655
1656         rc = lfsck_layout_store(env, com);
1657
1658         up_write(&com->lc_sem);
1659
1660         return rc;
1661 }
1662
1663 static int lfsck_layout_dump(const struct lu_env *env,
1664                              struct lfsck_component *com, char *buf, int len)
1665 {
1666         struct lfsck_instance   *lfsck = com->lc_lfsck;
1667         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1668         struct lfsck_layout     *lo    = com->lc_file_ram;
1669         int                      save  = len;
1670         int                      ret   = -ENOSPC;
1671         int                      rc;
1672
1673         down_read(&com->lc_sem);
1674         rc = snprintf(buf, len,
1675                       "name: lfsck_layout\n"
1676                       "magic: %#x\n"
1677                       "version: %d\n"
1678                       "status: %s\n",
1679                       lo->ll_magic,
1680                       bk->lb_version,
1681                       lfsck_status2names(lo->ll_status));
1682         if (rc <= 0)
1683                 goto out;
1684
1685         buf += rc;
1686         len -= rc;
1687         rc = lfsck_bits_dump(&buf, &len, lo->ll_flags, lfsck_flags_names,
1688                              "flags");
1689         if (rc < 0)
1690                 goto out;
1691
1692         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
1693                              "param");
1694         if (rc < 0)
1695                 goto out;
1696
1697         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_complete,
1698                              "time_since_last_completed");
1699         if (rc < 0)
1700                 goto out;
1701
1702         rc = lfsck_time_dump(&buf, &len, lo->ll_time_latest_start,
1703                              "time_since_latest_start");
1704         if (rc < 0)
1705                 goto out;
1706
1707         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_checkpoint,
1708                              "time_since_last_checkpoint");
1709         if (rc < 0)
1710                 goto out;
1711
1712         rc = snprintf(buf, len,
1713                       "latest_start_position: "LPU64"\n"
1714                       "last_checkpoint_position: "LPU64"\n"
1715                       "first_failure_position: "LPU64"\n",
1716                       lo->ll_pos_latest_start,
1717                       lo->ll_pos_last_checkpoint,
1718                       lo->ll_pos_first_inconsistent);
1719         if (rc <= 0)
1720                 goto out;
1721
1722         buf += rc;
1723         len -= rc;
1724
1725         rc = snprintf(buf, len,
1726                       "success_count: %u\n"
1727                       "repaired_dangling: "LPU64"\n"
1728                       "repaired_unmatched_pair: "LPU64"\n"
1729                       "repaired_multiple_referenced: "LPU64"\n"
1730                       "repaired_orphan: "LPU64"\n"
1731                       "repaired_inconsistent_owner: "LPU64"\n"
1732                       "repaired_others: "LPU64"\n"
1733                       "skipped: "LPU64"\n"
1734                       "failed_phase1: "LPU64"\n"
1735                       "failed_phase2: "LPU64"\n",
1736                       lo->ll_success_count,
1737                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
1738                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
1739                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
1740                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
1741                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
1742                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
1743                       lo->ll_objs_skipped,
1744                       lo->ll_objs_failed_phase1,
1745                       lo->ll_objs_failed_phase2);
1746         if (rc <= 0)
1747                 goto out;
1748
1749         buf += rc;
1750         len -= rc;
1751
1752         if (lo->ll_status == LS_SCANNING_PHASE1) {
1753                 __u64 pos;
1754                 const struct dt_it_ops *iops;
1755                 cfs_duration_t duration = cfs_time_current() -
1756                                           lfsck->li_time_last_checkpoint;
1757                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
1758                 __u64 speed = checked;
1759                 __u64 new_checked = com->lc_new_checked * HZ;
1760                 __u32 rtime = lo->ll_run_time_phase1 +
1761                               cfs_duration_sec(duration + HALF_SEC);
1762
1763                 if (duration != 0)
1764                         do_div(new_checked, duration);
1765                 if (rtime != 0)
1766                         do_div(speed, rtime);
1767                 rc = snprintf(buf, len,
1768                               "checked_phase1: "LPU64"\n"
1769                               "checked_phase2: "LPU64"\n"
1770                               "run_time_phase1: %u seconds\n"
1771                               "run_time_phase2: %u seconds\n"
1772                               "average_speed_phase1: "LPU64" items/sec\n"
1773                               "average_speed_phase2: N/A\n"
1774                               "real-time_speed_phase1: "LPU64" items/sec\n"
1775                               "real-time_speed_phase2: N/A\n",
1776                               checked,
1777                               lo->ll_objs_checked_phase2,
1778                               rtime,
1779                               lo->ll_run_time_phase2,
1780                               speed,
1781                               new_checked);
1782                 if (rc <= 0)
1783                         goto out;
1784
1785                 buf += rc;
1786                 len -= rc;
1787
1788                 LASSERT(lfsck->li_di_oit != NULL);
1789
1790                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1791
1792                 /* The low layer otable-based iteration position may NOT
1793                  * exactly match the layout-based directory traversal
1794                  * cookie. Generally, it is not a serious issue. But the
1795                  * caller should NOT make assumption on that. */
1796                 pos = iops->store(env, lfsck->li_di_oit);
1797                 if (!lfsck->li_current_oit_processed)
1798                         pos--;
1799                 rc = snprintf(buf, len, "current_position: "LPU64"\n", pos);
1800                 if (rc <= 0)
1801                         goto out;
1802
1803                 buf += rc;
1804                 len -= rc;
1805         } else {
1806                 /* XXX: LS_SCANNING_PHASE2 will be handled in the future. */
1807                 __u64 speed1 = lo->ll_objs_checked_phase1;
1808                 __u64 speed2 = lo->ll_objs_checked_phase2;
1809
1810                 if (lo->ll_run_time_phase1 != 0)
1811                         do_div(speed1, lo->ll_run_time_phase1);
1812                 if (lo->ll_run_time_phase2 != 0)
1813                         do_div(speed2, lo->ll_run_time_phase2);
1814                 rc = snprintf(buf, len,
1815                               "checked_phase1: "LPU64"\n"
1816                               "checked_phase2: "LPU64"\n"
1817                               "run_time_phase1: %u seconds\n"
1818                               "run_time_phase2: %u seconds\n"
1819                               "average_speed_phase1: "LPU64" items/sec\n"
1820                               "average_speed_phase2: "LPU64" objs/sec\n"
1821                               "real-time_speed_phase1: N/A\n"
1822                               "real-time_speed_phase2: N/A\n"
1823                               "current_position: N/A\n",
1824                               lo->ll_objs_checked_phase1,
1825                               lo->ll_objs_checked_phase2,
1826                               lo->ll_run_time_phase1,
1827                               lo->ll_run_time_phase2,
1828                               speed1,
1829                               speed2);
1830                 if (rc <= 0)
1831                         goto out;
1832
1833                 buf += rc;
1834                 len -= rc;
1835         }
1836         ret = save - len;
1837
1838 out:
1839         up_read(&com->lc_sem);
1840
1841         return ret;
1842 }
1843
1844 static int lfsck_layout_master_double_scan(const struct lu_env *env,
1845                                            struct lfsck_component *com)
1846 {
1847         struct lfsck_layout_master_data *llmd    = com->lc_data;
1848         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
1849         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1850         struct lfsck_layout             *lo      = com->lc_file_ram;
1851         struct l_wait_info               lwi     = { 0 };
1852
1853         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
1854                 return 0;
1855
1856         llmd->llmd_to_double_scan = 1;
1857         wake_up_all(&athread->t_ctl_waitq);
1858         l_wait_event(mthread->t_ctl_waitq,
1859                      llmd->llmd_in_double_scan ||
1860                      thread_is_stopped(athread),
1861                      &lwi);
1862         if (llmd->llmd_assistant_status < 0)
1863                 return llmd->llmd_assistant_status;
1864
1865         return 0;
1866 }
1867
1868 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
1869                                           struct lfsck_component *com)
1870 {
1871         struct lfsck_instance   *lfsck = com->lc_lfsck;
1872         struct lfsck_layout     *lo    = com->lc_file_ram;
1873         int                      rc    = 1;
1874
1875         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
1876                 return 0;
1877
1878         atomic_inc(&lfsck->li_double_scan_count);
1879
1880         com->lc_new_checked = 0;
1881         com->lc_new_scanned = 0;
1882         com->lc_time_last_checkpoint = cfs_time_current();
1883         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
1884                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1885
1886         rc = lfsck_layout_double_scan_result(env, com, rc);
1887
1888         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
1889                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
1890
1891         return rc;
1892 }
1893
1894 static void lfsck_layout_master_data_release(const struct lu_env *env,
1895                                              struct lfsck_component *com)
1896 {
1897         struct lfsck_layout_master_data *llmd   = com->lc_data;
1898         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1899         struct lfsck_tgt_descs          *ltds;
1900         struct lfsck_tgt_desc           *ltd;
1901         struct lfsck_tgt_desc           *next;
1902
1903         LASSERT(llmd != NULL);
1904         LASSERT(thread_is_init(&llmd->llmd_thread) ||
1905                 thread_is_stopped(&llmd->llmd_thread));
1906         LASSERT(list_empty(&llmd->llmd_req_list));
1907         LASSERT(atomic_read(&llmd->llmd_rpcs_in_flight) == 0);
1908
1909         com->lc_data = NULL;
1910
1911         ltds = &lfsck->li_ost_descs;
1912         spin_lock(&ltds->ltd_lock);
1913         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
1914                                  ltd_layout_phase_list) {
1915                 list_del_init(&ltd->ltd_layout_phase_list);
1916         }
1917         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
1918                                  ltd_layout_phase_list) {
1919                 list_del_init(&ltd->ltd_layout_phase_list);
1920         }
1921         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
1922                                  ltd_layout_list) {
1923                 list_del_init(&ltd->ltd_layout_list);
1924         }
1925         spin_unlock(&ltds->ltd_lock);
1926
1927         OBD_FREE_PTR(llmd);
1928 }
1929
1930 static void lfsck_layout_slave_data_release(const struct lu_env *env,
1931                                             struct lfsck_component *com)
1932 {
1933         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1934         struct lfsck_layout_seq         *lls;
1935         struct lfsck_layout_seq         *next;
1936
1937         LASSERT(llsd != NULL);
1938
1939         com->lc_data = NULL;
1940
1941         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
1942                                      lls_list) {
1943                 list_del_init(&lls->lls_list);
1944                 lfsck_object_put(env, lls->lls_lastid_obj);
1945                 OBD_FREE_PTR(lls);
1946         }
1947
1948         OBD_FREE_PTR(llsd);
1949 }
1950
1951 static void lfsck_layout_master_quit(const struct lu_env *env,
1952                                      struct lfsck_component *com)
1953 {
1954         struct lfsck_layout_master_data *llmd    = com->lc_data;
1955         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
1956         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1957         struct l_wait_info               lwi     = { 0 };
1958
1959         llmd->llmd_exit = 1;
1960         wake_up_all(&athread->t_ctl_waitq);
1961         l_wait_event(mthread->t_ctl_waitq,
1962                      thread_is_init(athread) ||
1963                      thread_is_stopped(athread),
1964                      &lwi);
1965 }
1966
1967 static int lfsck_layout_master_in_notify(const struct lu_env *env,
1968                                          struct lfsck_component *com,
1969                                          struct lfsck_request *lr)
1970 {
1971         /* XXX: to record the event from layout slave on the OST. */
1972         return 0;
1973 }
1974
1975 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
1976                                         struct lfsck_component *com,
1977                                         struct lfsck_request *lr)
1978 {
1979         /* XXX: to record the event from layout master on the MDT. */
1980         return 0;
1981 }
1982
1983 static int lfsck_layout_query(const struct lu_env *env,
1984                               struct lfsck_component *com)
1985 {
1986         struct lfsck_layout *lo = com->lc_file_ram;
1987
1988         return lo->ll_status;
1989 }
1990
1991 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
1992                                            struct lfsck_component *com,
1993                                            struct lfsck_tgt_descs *ltds,
1994                                            struct lfsck_tgt_desc *ltd,
1995                                            struct ptlrpc_request_set *set)
1996 {
1997         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1998         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1999         struct lfsck_request              *lr    = &info->lti_lr;
2000         struct lfsck_instance             *lfsck = com->lc_lfsck;
2001         int                                rc;
2002
2003         LASSERT(list_empty(&ltd->ltd_layout_list));
2004         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
2005
2006         memset(lr, 0, sizeof(*lr));
2007         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2008         lr->lr_event = LE_STOP;
2009         lr->lr_active = LT_LAYOUT;
2010         lr->lr_status = LS_CO_STOPPED;
2011
2012         laia->laia_com = com;
2013         laia->laia_ltds = ltds;
2014         laia->laia_ltd = ltd;
2015         laia->laia_lr = lr;
2016
2017         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2018                                  lfsck_layout_master_async_interpret,
2019                                  laia, LFSCK_NOTIFY);
2020         if (rc != 0)
2021                 CERROR("%s: Fail to notify OST %x for stop: rc = %d\n",
2022                        lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2023
2024         return rc;
2025 }
2026
2027 static struct lfsck_operations lfsck_layout_master_ops = {
2028         .lfsck_reset            = lfsck_layout_reset,
2029         .lfsck_fail             = lfsck_layout_fail,
2030         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
2031         .lfsck_prep             = lfsck_layout_master_prep,
2032         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
2033         .lfsck_exec_dir         = lfsck_layout_exec_dir,
2034         .lfsck_post             = lfsck_layout_master_post,
2035         .lfsck_dump             = lfsck_layout_dump,
2036         .lfsck_double_scan      = lfsck_layout_master_double_scan,
2037         .lfsck_data_release     = lfsck_layout_master_data_release,
2038         .lfsck_quit             = lfsck_layout_master_quit,
2039         .lfsck_in_notify        = lfsck_layout_master_in_notify,
2040         .lfsck_query            = lfsck_layout_query,
2041         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
2042 };
2043
2044 static struct lfsck_operations lfsck_layout_slave_ops = {
2045         .lfsck_reset            = lfsck_layout_reset,
2046         .lfsck_fail             = lfsck_layout_fail,
2047         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
2048         .lfsck_prep             = lfsck_layout_slave_prep,
2049         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
2050         .lfsck_exec_dir         = lfsck_layout_exec_dir,
2051         .lfsck_post             = lfsck_layout_slave_post,
2052         .lfsck_dump             = lfsck_layout_dump,
2053         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
2054         .lfsck_data_release     = lfsck_layout_slave_data_release,
2055         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
2056         .lfsck_query            = lfsck_layout_query,
2057 };
2058
2059 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
2060 {
2061         struct lfsck_component  *com;
2062         struct lfsck_layout     *lo;
2063         struct dt_object        *root = NULL;
2064         struct dt_object        *obj;
2065         int                      rc;
2066         ENTRY;
2067
2068         OBD_ALLOC_PTR(com);
2069         if (com == NULL)
2070                 RETURN(-ENOMEM);
2071
2072         INIT_LIST_HEAD(&com->lc_link);
2073         INIT_LIST_HEAD(&com->lc_link_dir);
2074         init_rwsem(&com->lc_sem);
2075         atomic_set(&com->lc_ref, 1);
2076         com->lc_lfsck = lfsck;
2077         com->lc_type = LT_LAYOUT;
2078         if (lfsck->li_master) {
2079                 struct lfsck_layout_master_data *llmd;
2080
2081                 com->lc_ops = &lfsck_layout_master_ops;
2082                 OBD_ALLOC_PTR(llmd);
2083                 if (llmd == NULL)
2084                         GOTO(out, rc = -ENOMEM);
2085
2086                 INIT_LIST_HEAD(&llmd->llmd_req_list);
2087                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
2088                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
2089                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
2090                 spin_lock_init(&llmd->llmd_lock);
2091                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
2092                 atomic_set(&llmd->llmd_rpcs_in_flight, 0);
2093                 com->lc_data = llmd;
2094         } else {
2095                 struct lfsck_layout_slave_data *llsd;
2096
2097                 com->lc_ops = &lfsck_layout_slave_ops;
2098                 OBD_ALLOC_PTR(llsd);
2099                 if (llsd == NULL)
2100                         GOTO(out, rc = -ENOMEM);
2101
2102                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
2103                 com->lc_data = llsd;
2104         }
2105         com->lc_file_size = sizeof(*lo);
2106         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
2107         if (com->lc_file_ram == NULL)
2108                 GOTO(out, rc = -ENOMEM);
2109
2110         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
2111         if (com->lc_file_disk == NULL)
2112                 GOTO(out, rc = -ENOMEM);
2113
2114         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
2115         if (IS_ERR(root))
2116                 GOTO(out, rc = PTR_ERR(root));
2117
2118         if (unlikely(!dt_try_as_dir(env, root)))
2119                 GOTO(out, rc = -ENOTDIR);
2120
2121         obj = local_file_find_or_create(env, lfsck->li_los, root,
2122                                         lfsck_layout_name,
2123                                         S_IFREG | S_IRUGO | S_IWUSR);
2124         if (IS_ERR(obj))
2125                 GOTO(out, rc = PTR_ERR(obj));
2126
2127         com->lc_obj = obj;
2128         rc = lfsck_layout_load(env, com);
2129         if (rc > 0)
2130                 rc = lfsck_layout_reset(env, com, true);
2131         else if (rc == -ENOENT)
2132                 rc = lfsck_layout_init(env, com);
2133
2134         if (rc != 0)
2135                 GOTO(out, rc);
2136
2137         lo = com->lc_file_ram;
2138         switch (lo->ll_status) {
2139         case LS_INIT:
2140         case LS_COMPLETED:
2141         case LS_FAILED:
2142         case LS_STOPPED:
2143         case LS_PARTIAL:
2144                 spin_lock(&lfsck->li_lock);
2145                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2146                 spin_unlock(&lfsck->li_lock);
2147                 break;
2148         default:
2149                 CERROR("%s: unknown lfsck_layout status: rc = %u\n",
2150                        lfsck_lfsck2name(lfsck), lo->ll_status);
2151                 /* fall through */
2152         case LS_SCANNING_PHASE1:
2153         case LS_SCANNING_PHASE2:
2154                 /* No need to store the status to disk right now.
2155                  * If the system crashed before the status stored,
2156                  * it will be loaded back when next time. */
2157                 lo->ll_status = LS_CRASHED;
2158                 lo->ll_flags |= LF_INCOMPLETE;
2159                 /* fall through */
2160         case LS_PAUSED:
2161         case LS_CRASHED:
2162         case LS_CO_FAILED:
2163         case LS_CO_STOPPED:
2164         case LS_CO_PAUSED:
2165                 spin_lock(&lfsck->li_lock);
2166                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
2167                 spin_unlock(&lfsck->li_lock);
2168                 break;
2169         }
2170
2171         if (lo->ll_flags & LF_CRASHED_LASTID) {
2172                 LASSERT(lfsck->li_out_notify != NULL);
2173
2174                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
2175                                      LE_LASTID_REBUILDING);
2176         }
2177
2178         GOTO(out, rc = 0);
2179
2180 out:
2181         if (root != NULL && !IS_ERR(root))
2182                 lu_object_put(env, &root->do_lu);
2183
2184         if (rc != 0)
2185                 lfsck_component_cleanup(env, com);
2186
2187         return rc;
2188 }