Whamcloud - gitweb
LU-1267 lfsck: enhance API for MDT-OST consistency
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37
38 #include <lustre/lustre_idl.h>
39 #include <lu_object.h>
40 #include <dt_object.h>
41 #include <lustre_linkea.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_target {
65         /* link into lfsck_layout_slave_data::llsd_master_list. */
66         struct list_head        llst_list;
67         __u64                   llst_gen;
68         atomic_t                llst_ref;
69         __u32                   llst_index;
70 };
71
72 struct lfsck_layout_slave_data {
73         /* list for lfsck_layout_seq */
74         struct list_head         llsd_seq_list;
75
76         /* list for the masters involve layout verification. */
77         struct list_head         llsd_master_list;
78         spinlock_t               llsd_lock;
79         __u64                    llsd_touch_gen;
80 };
81
82 struct lfsck_layout_object {
83         struct dt_object        *llo_obj;
84         struct lu_attr           llo_attr;
85         atomic_t                 llo_ref;
86         __u16                    llo_gen;
87 };
88
89 struct lfsck_layout_req {
90         struct list_head                 llr_list;
91         struct lfsck_layout_object      *llr_parent;
92         struct dt_object                *llr_child;
93         __u32                            llr_ost_idx;
94         __u32                            llr_lov_idx; /* offset in LOV EA */
95 };
96
97 struct lfsck_layout_master_data {
98         spinlock_t              llmd_lock;
99         struct list_head        llmd_req_list;
100
101         /* list for the ost targets involve layout verification. */
102         struct list_head        llmd_ost_list;
103
104         /* list for the ost targets in phase1 scanning. */
105         struct list_head        llmd_ost_phase1_list;
106
107         /* list for the ost targets in phase1 scanning. */
108         struct list_head        llmd_ost_phase2_list;
109
110         /* list for the mdt targets involve layout verification. */
111         struct list_head        llmd_mdt_list;
112
113         /* list for the mdt targets in phase1 scanning. */
114         struct list_head        llmd_mdt_phase1_list;
115
116         /* list for the mdt targets in phase1 scanning. */
117         struct list_head        llmd_mdt_phase2_list;
118
119         struct ptlrpc_thread    llmd_thread;
120         __u32                   llmd_touch_gen;
121         int                     llmd_prefetched;
122         int                     llmd_assistant_status;
123         int                     llmd_post_result;
124         unsigned int            llmd_to_post:1,
125                                 llmd_to_double_scan:1,
126                                 llmd_in_double_scan:1,
127                                 llmd_exit:1;
128 };
129
130 struct lfsck_layout_slave_async_args {
131         struct obd_export                *llsaa_exp;
132         struct lfsck_component           *llsaa_com;
133         struct lfsck_layout_slave_target *llsaa_llst;
134 };
135
136 static inline void
137 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
138 {
139         if (atomic_dec_and_test(&llst->llst_ref)) {
140                 LASSERT(list_empty(&llst->llst_list));
141
142                 OBD_FREE_PTR(llst);
143         }
144 }
145
146 static inline int
147 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
148 {
149         struct lfsck_layout_slave_target *llst;
150         struct lfsck_layout_slave_target *tmp;
151         int                               rc   = 0;
152
153         OBD_ALLOC_PTR(llst);
154         if (llst == NULL)
155                 return -ENOMEM;
156
157         INIT_LIST_HEAD(&llst->llst_list);
158         llst->llst_gen = 0;
159         llst->llst_index = index;
160         atomic_set(&llst->llst_ref, 1);
161
162         spin_lock(&llsd->llsd_lock);
163         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
164                 if (tmp->llst_index == index) {
165                         rc = -EALREADY;
166                         break;
167                 }
168         }
169         if (rc == 0)
170                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
171         spin_unlock(&llsd->llsd_lock);
172
173         if (rc != 0)
174                 OBD_FREE_PTR(llst);
175
176         return rc;
177 }
178
179 static inline void
180 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
181                       struct lfsck_layout_slave_target *llst)
182 {
183         bool del = false;
184
185         spin_lock(&llsd->llsd_lock);
186         if (!list_empty(&llst->llst_list)) {
187                 list_del_init(&llst->llst_list);
188                 del = true;
189         }
190         spin_unlock(&llsd->llsd_lock);
191
192         if (del)
193                 lfsck_layout_llst_put(llst);
194 }
195
196 static inline struct lfsck_layout_slave_target *
197 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
198                                __u32 index)
199 {
200         struct lfsck_layout_slave_target *llst;
201
202         spin_lock(&llsd->llsd_lock);
203         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
204                 if (llst->llst_index == index) {
205                         list_del_init(&llst->llst_list);
206                         spin_unlock(&llsd->llsd_lock);
207
208                         return llst;
209                 }
210         }
211         spin_unlock(&llsd->llsd_lock);
212
213         return NULL;
214 }
215
216 static inline void lfsck_layout_object_put(const struct lu_env *env,
217                                            struct lfsck_layout_object *llo)
218 {
219         if (atomic_dec_and_test(&llo->llo_ref)) {
220                 lfsck_object_put(env, llo->llo_obj);
221                 OBD_FREE_PTR(llo);
222         }
223 }
224
225 static inline void lfsck_layout_req_fini(const struct lu_env *env,
226                                          struct lfsck_layout_req *llr)
227 {
228         lu_object_put(env, &llr->llr_child->do_lu);
229         lfsck_layout_object_put(env, llr->llr_parent);
230         OBD_FREE_PTR(llr);
231 }
232
233 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
234 {
235         bool empty = false;
236
237         spin_lock(&llmd->llmd_lock);
238         if (list_empty(&llmd->llmd_req_list))
239                 empty = true;
240         spin_unlock(&llmd->llmd_lock);
241
242         return empty;
243 }
244
245 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
246                                    const struct lfsck_layout *src)
247 {
248         int i;
249
250         des->ll_magic = le32_to_cpu(src->ll_magic);
251         des->ll_status = le32_to_cpu(src->ll_status);
252         des->ll_flags = le32_to_cpu(src->ll_flags);
253         des->ll_success_count = le32_to_cpu(src->ll_success_count);
254         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
255         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
256         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
257         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
258         des->ll_time_last_checkpoint =
259                                 le64_to_cpu(src->ll_time_last_checkpoint);
260         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
261         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
262         des->ll_pos_first_inconsistent =
263                         le64_to_cpu(src->ll_pos_first_inconsistent);
264         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
265         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
266         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
267         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
268         for (i = 0; i < LLIT_MAX; i++)
269                 des->ll_objs_repaired[i] =
270                                 le64_to_cpu(src->ll_objs_repaired[i]);
271         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
272 }
273
274 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
275                                    const struct lfsck_layout *src)
276 {
277         int i;
278
279         des->ll_magic = cpu_to_le32(src->ll_magic);
280         des->ll_status = cpu_to_le32(src->ll_status);
281         des->ll_flags = cpu_to_le32(src->ll_flags);
282         des->ll_success_count = cpu_to_le32(src->ll_success_count);
283         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
284         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
285         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
286         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
287         des->ll_time_last_checkpoint =
288                                 cpu_to_le64(src->ll_time_last_checkpoint);
289         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
290         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
291         des->ll_pos_first_inconsistent =
292                         cpu_to_le64(src->ll_pos_first_inconsistent);
293         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
294         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
295         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
296         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
297         for (i = 0; i < LLIT_MAX; i++)
298                 des->ll_objs_repaired[i] =
299                                 cpu_to_le64(src->ll_objs_repaired[i]);
300         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
301 }
302
303 /**
304  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
305  * \retval 0: succeed.
306  * \retval -ve: failed cases.
307  */
308 static int lfsck_layout_load(const struct lu_env *env,
309                              struct lfsck_component *com)
310 {
311         struct lfsck_layout             *lo     = com->lc_file_ram;
312         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
313         ssize_t                          size   = com->lc_file_size;
314         loff_t                           pos    = 0;
315         int                              rc;
316
317         rc = dbo->dbo_read(env, com->lc_obj,
318                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
319                            BYPASS_CAPA);
320         if (rc == 0) {
321                 return -ENOENT;
322         } else if (rc < 0) {
323                 CWARN("%s: failed to load lfsck_layout: rc = %d\n",
324                       lfsck_lfsck2name(com->lc_lfsck), rc);
325                 return rc;
326         } else if (rc != size) {
327                 CWARN("%s: crashed lfsck_layout, to be reset: rc = %d\n",
328                       lfsck_lfsck2name(com->lc_lfsck), rc);
329                 return 1;
330         }
331
332         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
333         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
334                 CWARN("%s: invalid lfsck_layout magic %#x != %#x, "
335                       "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
336                       lo->ll_magic, LFSCK_LAYOUT_MAGIC);
337                 return 1;
338         }
339
340         return 0;
341 }
342
343 static int lfsck_layout_store(const struct lu_env *env,
344                               struct lfsck_component *com)
345 {
346         struct dt_object         *obj           = com->lc_obj;
347         struct lfsck_instance    *lfsck         = com->lc_lfsck;
348         struct lfsck_layout      *lo            = com->lc_file_disk;
349         struct thandle           *handle;
350         ssize_t                   size          = com->lc_file_size;
351         loff_t                    pos           = 0;
352         int                       rc;
353         ENTRY;
354
355         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
356         handle = dt_trans_create(env, lfsck->li_bottom);
357         if (IS_ERR(handle)) {
358                 rc = PTR_ERR(handle);
359                 CERROR("%s: fail to create trans for storing lfsck_layout: "
360                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
361                 RETURN(rc);
362         }
363
364         rc = dt_declare_record_write(env, obj, size, pos, handle);
365         if (rc != 0) {
366                 CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
367                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
368                 GOTO(out, rc);
369         }
370
371         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
372         if (rc != 0) {
373                 CERROR("%s: fail to start trans for storing lfsck_layout: "
374                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
375                 GOTO(out, rc);
376         }
377
378         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
379                              handle);
380         if (rc != 0)
381                 CERROR("%s: fail to store lfsck_layout(1): size = %d, "
382                        "rc = %d\n", lfsck_lfsck2name(lfsck), (int)size, rc);
383
384         GOTO(out, rc);
385
386 out:
387         dt_trans_stop(env, lfsck->li_bottom, handle);
388
389         return rc;
390 }
391
392 static int lfsck_layout_init(const struct lu_env *env,
393                              struct lfsck_component *com)
394 {
395         struct lfsck_layout *lo = com->lc_file_ram;
396         int rc;
397
398         memset(lo, 0, com->lc_file_size);
399         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
400         lo->ll_status = LS_INIT;
401         down_write(&com->lc_sem);
402         rc = lfsck_layout_store(env, com);
403         up_write(&com->lc_sem);
404
405         return rc;
406 }
407
408 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
409                              struct dt_object *obj, const struct lu_fid *fid)
410 {
411         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
412         struct lu_seq_range      range  = { 0 };
413         struct lustre_mdt_attrs *lma;
414         int                      rc;
415
416         fld_range_set_any(&range);
417         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
418         if (rc == 0) {
419                 if (fld_range_is_ost(&range))
420                         return 1;
421
422                 return 0;
423         }
424
425         lma = &lfsck_env_info(env)->lti_lma;
426         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
427                           XATTR_NAME_LMA, BYPASS_CAPA);
428         if (rc == sizeof(*lma)) {
429                 lustre_lma_swab(lma);
430
431                 /* Generally, the low layer OSD create handler or OI scrub
432                  * will set the LMAC_FID_ON_OST for all external visible
433                  * OST-objects. But to make the otable-based iteration to
434                  * be independent from OI scrub in spite of it got failure
435                  * or not, we check the LMAC_FID_ON_OST here to guarantee
436                  * that the LFSCK will not repair something by wrong. */
437                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
438         }
439
440         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
441
442         return rc > 0;
443 }
444
445 static struct lfsck_layout_seq *
446 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
447 {
448         struct lfsck_layout_seq *lls;
449
450         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
451                 if (lls->lls_seq == seq)
452                         return lls;
453
454                 if (lls->lls_seq > seq)
455                         return NULL;
456         }
457
458         return NULL;
459 }
460
461 static void
462 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
463                         struct lfsck_layout_seq *lls)
464 {
465         struct lfsck_layout_seq *tmp;
466         struct list_head        *pos = &llsd->llsd_seq_list;
467
468         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
469                 if (lls->lls_seq < tmp->lls_seq) {
470                         pos = &tmp->lls_list;
471                         break;
472                 }
473         }
474         list_add_tail(&lls->lls_list, pos);
475 }
476
477 static int
478 lfsck_layout_lastid_create(const struct lu_env *env,
479                            struct lfsck_instance *lfsck,
480                            struct dt_object *obj)
481 {
482         struct lfsck_thread_info *info   = lfsck_env_info(env);
483         struct lu_attr           *la     = &info->lti_la;
484         struct dt_object_format  *dof    = &info->lti_dof;
485         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
486         struct dt_device         *dt     = lfsck->li_bottom;
487         struct thandle           *th;
488         __u64                     lastid = 0;
489         loff_t                    pos    = 0;
490         int                       rc;
491         ENTRY;
492
493         CDEBUG(D_LFSCK, "To create LAST_ID for <seq> "LPX64"\n",
494                fid_seq(lfsck_dto2fid(obj)));
495
496         if (bk->lb_param & LPF_DRYRUN)
497                 return 0;
498
499         memset(la, 0, sizeof(*la));
500         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
501         la->la_valid = LA_MODE | LA_UID | LA_GID;
502         dof->dof_type = dt_mode_to_dft(S_IFREG);
503
504         th = dt_trans_create(env, dt);
505         if (IS_ERR(th))
506                 RETURN(rc = PTR_ERR(th));
507
508         rc = dt_declare_create(env, obj, la, NULL, dof, th);
509         if (rc != 0)
510                 GOTO(stop, rc);
511
512         rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th);
513         if (rc != 0)
514                 GOTO(stop, rc);
515
516         rc = dt_trans_start_local(env, dt, th);
517         if (rc != 0)
518                 GOTO(stop, rc);
519
520         dt_write_lock(env, obj, 0);
521         if (likely(!dt_object_exists(obj))) {
522                 rc = dt_create(env, obj, la, NULL, dof, th);
523                 if (rc == 0)
524                         rc = dt_record_write(env, obj,
525                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
526                                 &pos, th);
527         }
528         dt_write_unlock(env, obj);
529
530         GOTO(stop, rc);
531
532 stop:
533         dt_trans_stop(env, dt, th);
534
535         return rc;
536 }
537
538 static int
539 lfsck_layout_lastid_reload(const struct lu_env *env,
540                            struct lfsck_component *com,
541                            struct lfsck_layout_seq *lls)
542 {
543         __u64   lastid;
544         loff_t  pos     = 0;
545         int     rc;
546
547         dt_read_lock(env, lls->lls_lastid_obj, 0);
548         rc = dt_record_read(env, lls->lls_lastid_obj,
549                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
550         dt_read_unlock(env, lls->lls_lastid_obj);
551         if (unlikely(rc != 0))
552                 return rc;
553
554         lastid = le64_to_cpu(lastid);
555         if (lastid < lls->lls_lastid_known) {
556                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
557                 struct lfsck_layout     *lo     = com->lc_file_ram;
558
559                 lls->lls_lastid = lls->lls_lastid_known;
560                 lls->lls_dirty = 1;
561                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
562                         LASSERT(lfsck->li_out_notify != NULL);
563
564                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
565                                              LE_LASTID_REBUILDING);
566                         lo->ll_flags |= LF_CRASHED_LASTID;
567                 }
568         } else if (lastid >= lls->lls_lastid) {
569                 lls->lls_lastid = lastid;
570                 lls->lls_dirty = 0;
571         }
572
573         return 0;
574 }
575
576 static int
577 lfsck_layout_lastid_store(const struct lu_env *env,
578                           struct lfsck_component *com)
579 {
580         struct lfsck_instance           *lfsck  = com->lc_lfsck;
581         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
582         struct dt_device                *dt     = lfsck->li_bottom;
583         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
584         struct lfsck_layout_seq         *lls;
585         struct thandle                  *th;
586         __u64                            lastid;
587         int                              rc     = 0;
588         int                              rc1    = 0;
589
590         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
591                 loff_t pos = 0;
592
593                 /* XXX: Add the code back if we really found related
594                  *      inconsistent cases in the future. */
595 #if 0
596                 if (!lls->lls_dirty) {
597                         /* In OFD, before the pre-creation, the LAST_ID
598                          * file will be updated firstly, which may hide
599                          * some potential crashed cases. For example:
600                          *
601                          * The old obj1's ID is higher than old LAST_ID
602                          * but lower than the new LAST_ID, but the LFSCK
603                          * have not touch the obj1 until the OFD updated
604                          * the LAST_ID. So the LFSCK does not regard it
605                          * as crashed case. But when OFD does not create
606                          * successfully, it will set the LAST_ID as the
607                          * real created objects' ID, then LFSCK needs to
608                          * found related inconsistency. */
609                         rc = lfsck_layout_lastid_reload(env, com, lls);
610                         if (likely(!lls->lls_dirty))
611                                 continue;
612                 }
613 #endif
614
615                 CDEBUG(D_LFSCK, "To sync the LAST_ID for <seq> "LPX64
616                        " as <oid> "LPU64"\n", lls->lls_seq, lls->lls_lastid);
617
618                 if (bk->lb_param & LPF_DRYRUN) {
619                         lls->lls_dirty = 0;
620                         continue;
621                 }
622
623                 th = dt_trans_create(env, dt);
624                 if (IS_ERR(th)) {
625                         rc1 = PTR_ERR(th);
626                         CERROR("%s: (1) failed to store "LPX64": rc = %d\n",
627                                lfsck_lfsck2name(com->lc_lfsck),
628                                lls->lls_seq, rc1);
629                         continue;
630                 }
631
632                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
633                                              sizeof(lastid), pos, th);
634                 if (rc != 0)
635                         goto stop;
636
637                 rc = dt_trans_start_local(env, dt, th);
638                 if (rc != 0)
639                         goto stop;
640
641                 lastid = cpu_to_le64(lls->lls_lastid);
642                 dt_write_lock(env, lls->lls_lastid_obj, 0);
643                 rc = dt_record_write(env, lls->lls_lastid_obj,
644                                      lfsck_buf_get(env, &lastid,
645                                      sizeof(lastid)), &pos, th);
646                 dt_write_unlock(env, lls->lls_lastid_obj);
647                 if (rc == 0)
648                         lls->lls_dirty = 0;
649
650 stop:
651                 dt_trans_stop(env, dt, th);
652                 if (rc != 0) {
653                         rc1 = rc;
654                         CERROR("%s: (2) failed to store "LPX64": rc = %d\n",
655                                lfsck_lfsck2name(com->lc_lfsck),
656                                lls->lls_seq, rc1);
657                 }
658         }
659
660         return rc1;
661 }
662
663 static int
664 lfsck_layout_lastid_load(const struct lu_env *env,
665                          struct lfsck_component *com,
666                          struct lfsck_layout_seq *lls)
667 {
668         struct lfsck_instance   *lfsck  = com->lc_lfsck;
669         struct lfsck_layout     *lo     = com->lc_file_ram;
670         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
671         struct dt_object        *obj;
672         loff_t                   pos    = 0;
673         int                      rc;
674         ENTRY;
675
676         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
677         obj = dt_locate(env, lfsck->li_bottom, fid);
678         if (IS_ERR(obj))
679                 RETURN(PTR_ERR(obj));
680
681         /* LAST_ID crashed, to be rebuilt */
682         if (!dt_object_exists(obj)) {
683                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
684                         LASSERT(lfsck->li_out_notify != NULL);
685
686                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
687                                              LE_LASTID_REBUILDING);
688                         lo->ll_flags |= LF_CRASHED_LASTID;
689
690                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
691                             cfs_fail_val > 0) {
692                                 struct l_wait_info lwi = LWI_TIMEOUT(
693                                                 cfs_time_seconds(cfs_fail_val),
694                                                 NULL, NULL);
695
696                                 up_write(&com->lc_sem);
697                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
698                                              !thread_is_running(&lfsck->li_thread),
699                                              &lwi);
700                                 down_write(&com->lc_sem);
701                         }
702                 }
703
704                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
705         } else {
706                 dt_read_lock(env, obj, 0);
707                 rc = dt_read(env, obj,
708                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
709                         &pos);
710                 dt_read_unlock(env, obj);
711                 if (rc != 0 && rc != sizeof(__u64))
712                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
713
714                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
715                         LASSERT(lfsck->li_out_notify != NULL);
716
717                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
718                                              LE_LASTID_REBUILDING);
719                         lo->ll_flags |= LF_CRASHED_LASTID;
720                 }
721
722                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
723                 rc = 0;
724         }
725
726         GOTO(out, rc);
727
728 out:
729         if (rc != 0)
730                 lfsck_object_put(env, obj);
731         else
732                 lls->lls_lastid_obj = obj;
733
734         return rc;
735 }
736
737 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
738                                                struct ptlrpc_request *req,
739                                                void *args, int rc)
740 {
741         struct lfsck_async_interpret_args *laia = args;
742         struct lfsck_component            *com  = laia->laia_com;
743         struct lfsck_layout_master_data   *llmd = com->lc_data;
744         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
745         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
746         struct lfsck_request              *lr   = laia->laia_lr;
747
748         switch (lr->lr_event) {
749         case LE_START:
750                 if (rc != 0) {
751                         struct lfsck_layout *lo = com->lc_file_ram;
752
753                         lo->ll_flags |= LF_INCOMPLETE;
754                         lfsck_tgt_put(ltd);
755                         break;
756                 }
757
758                 spin_lock(&ltds->ltd_lock);
759                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
760                         spin_unlock(&ltds->ltd_lock);
761                         lfsck_tgt_put(ltd);
762                         break;
763                 }
764
765                 if (lr->lr_flags & LEF_TO_OST) {
766                         if (list_empty(&ltd->ltd_layout_list))
767                                 list_add_tail(&ltd->ltd_layout_list,
768                                               &llmd->llmd_ost_list);
769                         if (list_empty(&ltd->ltd_layout_phase_list))
770                                 list_add_tail(&ltd->ltd_layout_phase_list,
771                                               &llmd->llmd_ost_phase1_list);
772                 } else {
773                         if (list_empty(&ltd->ltd_layout_list))
774                                 list_add_tail(&ltd->ltd_layout_list,
775                                               &llmd->llmd_mdt_list);
776                         if (list_empty(&ltd->ltd_layout_phase_list))
777                                 list_add_tail(&ltd->ltd_layout_phase_list,
778                                               &llmd->llmd_mdt_phase1_list);
779                 }
780                 spin_unlock(&ltds->ltd_lock);
781                 lfsck_tgt_put(ltd);
782                 break;
783         case LE_STOP:
784         case LE_PHASE1_DONE:
785         case LE_PHASE2_DONE:
786                 if (rc != 0)
787                         CERROR("%s: fail to notify %s %x for layout: "
788                                "event = %d, rc = %d\n",
789                                lfsck_lfsck2name(com->lc_lfsck),
790                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
791                                ltd->ltd_index, lr->lr_event, rc);
792                 break;
793         case LE_QUERY: {
794                 struct lfsck_reply *reply;
795
796                 if (rc != 0) {
797                         spin_lock(&ltds->ltd_lock);
798                         list_del_init(&ltd->ltd_layout_phase_list);
799                         list_del_init(&ltd->ltd_layout_list);
800                         spin_unlock(&ltds->ltd_lock);
801                         lfsck_tgt_put(ltd);
802                         break;
803                 }
804
805                 reply = req_capsule_server_get(&req->rq_pill,
806                                                &RMF_LFSCK_REPLY);
807                 if (reply == NULL) {
808                         rc = -EPROTO;
809                         CERROR("%s: invalid return value: rc = %d\n",
810                                lfsck_lfsck2name(com->lc_lfsck), rc);
811                         spin_lock(&ltds->ltd_lock);
812                         list_del_init(&ltd->ltd_layout_phase_list);
813                         list_del_init(&ltd->ltd_layout_list);
814                         spin_unlock(&ltds->ltd_lock);
815                         lfsck_tgt_put(ltd);
816                         break;
817                 }
818
819                 switch (reply->lr_status) {
820                 case LS_SCANNING_PHASE1:
821                         break;
822                 case LS_SCANNING_PHASE2:
823                         spin_lock(&ltds->ltd_lock);
824                         list_del_init(&ltd->ltd_layout_phase_list);
825                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
826                                 spin_unlock(&ltds->ltd_lock);
827                                 break;
828                         }
829
830                         if (lr->lr_flags & LEF_TO_OST)
831                                 list_add_tail(&ltd->ltd_layout_phase_list,
832                                               &llmd->llmd_ost_phase2_list);
833                         else
834                                 list_add_tail(&ltd->ltd_layout_phase_list,
835                                               &llmd->llmd_mdt_phase2_list);
836                         spin_unlock(&ltds->ltd_lock);
837                         break;
838                 default:
839                         spin_lock(&ltds->ltd_lock);
840                         list_del_init(&ltd->ltd_layout_phase_list);
841                         list_del_init(&ltd->ltd_layout_list);
842                         spin_unlock(&ltds->ltd_lock);
843                         break;
844                 }
845                 lfsck_tgt_put(ltd);
846                 break;
847         }
848         default:
849                 CERROR("%s: unexpected event: rc = %d\n",
850                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
851                 break;
852         }
853
854         lfsck_component_put(env, com);
855
856         return 0;
857 }
858
859 static int lfsck_layout_master_query_others(const struct lu_env *env,
860                                             struct lfsck_component *com)
861 {
862         struct lfsck_thread_info          *info  = lfsck_env_info(env);
863         struct lfsck_request              *lr    = &info->lti_lr;
864         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
865         struct lfsck_instance             *lfsck = com->lc_lfsck;
866         struct lfsck_layout_master_data   *llmd  = com->lc_data;
867         struct ptlrpc_request_set         *set;
868         struct lfsck_tgt_descs            *ltds;
869         struct lfsck_tgt_desc             *ltd;
870         struct list_head                  *head;
871         __u32                              cnt   = 0;
872         int                                rc    = 0;
873         int                                rc1   = 0;
874         ENTRY;
875
876         set = ptlrpc_prep_set();
877         if (set == NULL)
878                 RETURN(-ENOMEM);
879
880         llmd->llmd_touch_gen++;
881         memset(lr, 0, sizeof(*lr));
882         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
883         lr->lr_event = LE_QUERY;
884         lr->lr_active = LT_LAYOUT;
885         laia->laia_com = com;
886         laia->laia_lr = lr;
887
888         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
889                 ltds = &lfsck->li_mdt_descs;
890                 lr->lr_flags = 0;
891                 head = &llmd->llmd_mdt_phase1_list;
892         } else {
893
894 again:
895                 ltds = &lfsck->li_ost_descs;
896                 lr->lr_flags = LEF_TO_OST;
897                 head = &llmd->llmd_ost_phase1_list;
898         }
899
900         laia->laia_ltds = ltds;
901         spin_lock(&ltds->ltd_lock);
902         while (!list_empty(head)) {
903                 ltd = list_entry(head->next,
904                                  struct lfsck_tgt_desc,
905                                  ltd_layout_phase_list);
906                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
907                         break;
908
909                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
910                 list_del(&ltd->ltd_layout_phase_list);
911                 list_add_tail(&ltd->ltd_layout_phase_list, head);
912                 atomic_inc(&ltd->ltd_ref);
913                 laia->laia_ltd = ltd;
914                 spin_unlock(&ltds->ltd_lock);
915                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
916                                          lfsck_layout_master_async_interpret,
917                                          laia, LFSCK_QUERY);
918                 if (rc != 0) {
919                         CERROR("%s: fail to query %s %x for layout: rc = %d\n",
920                                lfsck_lfsck2name(lfsck),
921                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
922                                ltd->ltd_index, rc);
923                         lfsck_tgt_put(ltd);
924                         rc1 = rc;
925                 } else {
926                         cnt++;
927                 }
928                 spin_lock(&ltds->ltd_lock);
929         }
930         spin_unlock(&ltds->ltd_lock);
931
932         if (cnt > 0) {
933                 rc = ptlrpc_set_wait(set);
934                 if (rc < 0) {
935                         ptlrpc_set_destroy(set);
936                         RETURN(rc);
937                 }
938                 cnt = 0;
939         }
940
941         if (!(lr->lr_flags & LEF_TO_OST) &&
942             list_empty(&llmd->llmd_mdt_phase1_list))
943                 goto again;
944
945         ptlrpc_set_destroy(set);
946
947         RETURN(rc1 != 0 ? rc1 : rc);
948 }
949
950 static inline bool
951 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
952 {
953         return list_empty(&llmd->llmd_mdt_phase1_list) &&
954                (!list_empty(&llmd->llmd_ost_phase2_list) ||
955                 list_empty(&llmd->llmd_ost_phase1_list));
956 }
957
958 static int lfsck_layout_master_notify_others(const struct lu_env *env,
959                                              struct lfsck_component *com,
960                                              struct lfsck_request *lr,
961                                              __u32 flags)
962 {
963         struct lfsck_thread_info          *info  = lfsck_env_info(env);
964         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
965         struct lfsck_instance             *lfsck = com->lc_lfsck;
966         struct lfsck_layout_master_data   *llmd  = com->lc_data;
967         struct lfsck_layout               *lo    = com->lc_file_ram;
968         struct ptlrpc_request_set         *set;
969         struct lfsck_tgt_descs            *ltds;
970         struct lfsck_tgt_desc             *ltd;
971         struct lfsck_tgt_desc             *next;
972         struct list_head                  *head;
973         __u32                              idx;
974         __u32                              cnt   = 0;
975         int                                rc    = 0;
976         ENTRY;
977
978         set = ptlrpc_prep_set();
979         if (set == NULL)
980                 RETURN(-ENOMEM);
981
982         lr->lr_active = LT_LAYOUT;
983         laia->laia_com = com;
984         laia->laia_lr = lr;
985         lr->lr_flags = 0;
986         switch (lr->lr_event) {
987         case LE_START:
988                 /* Notify OSTs firstly, then other MDTs if needed. */
989                 lr->lr_flags |= LEF_TO_OST;
990                 ltds = &lfsck->li_ost_descs;
991
992 lable1:
993                 laia->laia_ltds = ltds;
994                 down_read(&ltds->ltd_rw_sem);
995                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
996                         ltd = lfsck_tgt_get(ltds, idx);
997                         LASSERT(ltd != NULL);
998
999                         laia->laia_ltd = ltd;
1000                         ltd->ltd_layout_done = 0;
1001                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1002                                         lfsck_layout_master_async_interpret,
1003                                         laia, LFSCK_NOTIFY);
1004                         if (rc != 0) {
1005                                 CERROR("%s: fail to notify %s %x for layout "
1006                                        "start: rc = %d\n",
1007                                        lfsck_lfsck2name(lfsck),
1008                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1009                                        "MDT", idx, rc);
1010                                 lfsck_tgt_put(ltd);
1011                                 lo->ll_flags |= LF_INCOMPLETE;
1012                         } else {
1013                                 cnt++;
1014                         }
1015                 }
1016                 up_read(&ltds->ltd_rw_sem);
1017
1018                 /* Sync up */
1019                 if (cnt > 0) {
1020                         rc = ptlrpc_set_wait(set);
1021                         if (rc < 0) {
1022                                 ptlrpc_set_destroy(set);
1023                                 RETURN(rc);
1024                         }
1025                         cnt = 0;
1026                 }
1027
1028                 if (!(flags & LPF_ALL_MDT))
1029                         break;
1030
1031                 ltds = &lfsck->li_mdt_descs;
1032                 /* The sponsor broadcasts the request to other MDTs. */
1033                 if (flags & LPF_BROADCAST) {
1034                         flags &= ~LPF_ALL_MDT;
1035                         lr->lr_flags &= ~LEF_TO_OST;
1036                         goto lable1;
1037                 }
1038
1039                 /* non-sponsors link other MDT targets locallly. */
1040                 spin_lock(&ltds->ltd_lock);
1041                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1042                         ltd = LTD_TGT(ltds, idx);
1043                         LASSERT(ltd != NULL);
1044
1045                         if (!list_empty(&ltd->ltd_layout_list))
1046                                 continue;
1047
1048                         list_add_tail(&ltd->ltd_layout_list,
1049                                       &llmd->llmd_mdt_list);
1050                         list_add_tail(&ltd->ltd_layout_phase_list,
1051                                       &llmd->llmd_mdt_phase1_list);
1052                 }
1053                 spin_unlock(&ltds->ltd_lock);
1054
1055                 break;
1056         case LE_STOP:
1057                 if (flags & LPF_BROADCAST)
1058                         lr->lr_flags |= LEF_FORCE_STOP;
1059         case LE_PHASE2_DONE:
1060                 /* Notify other MDTs if needed, then the OSTs. */
1061                 if (flags & LPF_ALL_MDT) {
1062                         /* The sponsor broadcasts the request to other MDTs. */
1063                         if (flags & LPF_BROADCAST) {
1064                                 lr->lr_flags &= ~LEF_TO_OST;
1065                                 head = &llmd->llmd_mdt_list;
1066                                 ltds = &lfsck->li_mdt_descs;
1067                                 goto lable3;
1068                         }
1069
1070                         /* non-sponsors unlink other MDT targets locallly. */
1071                         ltds = &lfsck->li_mdt_descs;
1072                         spin_lock(&ltds->ltd_lock);
1073                         list_for_each_entry_safe(ltd, next,
1074                                                  &llmd->llmd_mdt_list,
1075                                                  ltd_layout_list) {
1076                                 list_del_init(&ltd->ltd_layout_phase_list);
1077                                 list_del_init(&ltd->ltd_layout_list);
1078                         }
1079                         spin_unlock(&ltds->ltd_lock);
1080                 }
1081
1082 lable2:
1083                 lr->lr_flags |= LEF_TO_OST;
1084                 head = &llmd->llmd_ost_list;
1085                 ltds = &lfsck->li_ost_descs;
1086
1087 lable3:
1088                 laia->laia_ltds = ltds;
1089                 spin_lock(&ltds->ltd_lock);
1090                 while (!list_empty(head)) {
1091                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1092                                          ltd_layout_list);
1093                         if (!list_empty(&ltd->ltd_layout_phase_list))
1094                                 list_del_init(&ltd->ltd_layout_phase_list);
1095                         list_del_init(&ltd->ltd_layout_list);
1096                         laia->laia_ltd = ltd;
1097                         spin_unlock(&ltds->ltd_lock);
1098                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1099                                         lfsck_layout_master_async_interpret,
1100                                         laia, LFSCK_NOTIFY);
1101                         if (rc != 0)
1102                                 CERROR("%s: fail to notify %s %x for layout "
1103                                        "stop/phase2: rc = %d\n",
1104                                        lfsck_lfsck2name(lfsck),
1105                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1106                                        "MDT", ltd->ltd_index, rc);
1107                         else
1108                                 cnt++;
1109                         spin_lock(&ltds->ltd_lock);
1110                 }
1111                 spin_unlock(&ltds->ltd_lock);
1112
1113                 if (!(flags & LPF_BROADCAST))
1114                         break;
1115
1116                 /* Sync up */
1117                 if (cnt > 0) {
1118                         rc = ptlrpc_set_wait(set);
1119                         if (rc < 0) {
1120                                 ptlrpc_set_destroy(set);
1121                                 RETURN(rc);
1122                         }
1123                         cnt = 0;
1124                 }
1125
1126                 flags &= ~LPF_BROADCAST;
1127                 goto lable2;
1128         case LE_PHASE1_DONE:
1129                 llmd->llmd_touch_gen++;
1130                 lr->lr_flags &= ~LEF_TO_OST;
1131                 ltds = &lfsck->li_mdt_descs;
1132                 laia->laia_ltds = ltds;
1133                 spin_lock(&ltds->ltd_lock);
1134                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1135                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1136                                          struct lfsck_tgt_desc,
1137                                          ltd_layout_phase_list);
1138                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1139                                 break;
1140
1141                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1142                         list_del_init(&ltd->ltd_layout_phase_list);
1143                         list_add_tail(&ltd->ltd_layout_phase_list,
1144                                       &llmd->llmd_mdt_phase1_list);
1145                         laia->laia_ltd = ltd;
1146                         spin_unlock(&ltds->ltd_lock);
1147                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1148                                         lfsck_layout_master_async_interpret,
1149                                         laia, LFSCK_NOTIFY);
1150                         if (rc != 0)
1151                                 CERROR("%s: fail to notify MDT %x for layout "
1152                                        "phase1 done: rc = %d\n",
1153                                        lfsck_lfsck2name(lfsck),
1154                                        ltd->ltd_index, rc);
1155                         else
1156                                 cnt++;
1157                         spin_lock(&ltds->ltd_lock);
1158                 }
1159                 spin_unlock(&ltds->ltd_lock);
1160                 break;
1161         default:
1162                 CERROR("%s: unexpected LFSCK event: rc = %d\n",
1163                        lfsck_lfsck2name(lfsck), lr->lr_event);
1164                 rc = -EINVAL;
1165                 break;
1166         }
1167
1168         if (cnt > 0)
1169                 rc = ptlrpc_set_wait(set);
1170         ptlrpc_set_destroy(set);
1171
1172         if (rc == 0 && lr->lr_event == LE_START &&
1173             list_empty(&llmd->llmd_ost_list))
1174                 rc = -ENODEV;
1175
1176         RETURN(rc);
1177 }
1178
1179 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1180                                            struct lfsck_component *com,
1181                                            int rc)
1182 {
1183         struct lfsck_instance   *lfsck = com->lc_lfsck;
1184         struct lfsck_layout     *lo    = com->lc_file_ram;
1185         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1186
1187         down_write(&com->lc_sem);
1188
1189         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1190                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1191         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1192         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1193
1194         if (rc > 0) {
1195                 com->lc_journal = 0;
1196                 if (lo->ll_flags & LF_INCOMPLETE)
1197                         lo->ll_status = LS_PARTIAL;
1198                 else
1199                         lo->ll_status = LS_COMPLETED;
1200                 if (!(bk->lb_param & LPF_DRYRUN))
1201                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1202                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1203                 lo->ll_success_count++;
1204         } else if (rc == 0) {
1205                 lo->ll_status = lfsck->li_status;
1206                 if (lo->ll_status == 0)
1207                         lo->ll_status = LS_STOPPED;
1208         } else {
1209                 lo->ll_status = LS_FAILED;
1210         }
1211
1212         if (lo->ll_status != LS_PAUSED) {
1213                 spin_lock(&lfsck->li_lock);
1214                 list_del_init(&com->lc_link);
1215                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1216                 spin_unlock(&lfsck->li_lock);
1217         }
1218
1219         rc = lfsck_layout_store(env, com);
1220
1221         up_write(&com->lc_sem);
1222
1223         return rc;
1224 }
1225
1226 static int lfsck_layout_scan_orphan(const struct lu_env *env,
1227                                     struct lfsck_component *com,
1228                                     struct lfsck_tgt_desc *ltd)
1229 {
1230         /* XXX: To be extended in other patch. */
1231
1232         return 0;
1233 }
1234
1235 static int lfsck_layout_assistant(void *args)
1236 {
1237         struct lfsck_thread_args        *lta     = args;
1238         struct lu_env                   *env     = &lta->lta_env;
1239         struct lfsck_component          *com     = lta->lta_com;
1240         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
1241         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
1242         struct lfsck_position           *pos     = &com->lc_pos_start;
1243         struct lfsck_thread_info        *info    = lfsck_env_info(env);
1244         struct lfsck_request            *lr      = &info->lti_lr;
1245         struct lfsck_layout_master_data *llmd    = com->lc_data;
1246         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1247         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1248         struct lfsck_layout_req         *llr;
1249         struct l_wait_info               lwi     = { 0 };
1250         int                              rc      = 0;
1251         int                              rc1     = 0;
1252         __u32                            flags;
1253         ENTRY;
1254
1255         if (lta->lta_lsp->lsp_start != NULL)
1256                 flags  = lta->lta_lsp->lsp_start->ls_flags;
1257         else
1258                 flags = bk->lb_param;
1259         memset(lr, 0, sizeof(*lr));
1260         lr->lr_event = LE_START;
1261         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1262         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1263                        LSV_ASYNC_WINDOWS;
1264         lr->lr_speed = bk->lb_speed_limit;
1265         lr->lr_version = bk->lb_version;
1266         lr->lr_param = bk->lb_param;
1267         lr->lr_async_windows = bk->lb_async_windows;
1268         if (pos->lp_oit_cookie <= 1)
1269                 lr->lr_param |= LPF_RESET;
1270
1271         rc = lfsck_layout_master_notify_others(env, com, lr, flags);
1272         if (rc != 0) {
1273                 CERROR("%s: fail to notify others for layout start: rc = %d\n",
1274                        lfsck_lfsck2name(lfsck), rc);
1275                 GOTO(fini, rc);
1276         }
1277
1278         spin_lock(&llmd->llmd_lock);
1279         thread_set_flags(athread, SVC_RUNNING);
1280         spin_unlock(&llmd->llmd_lock);
1281         wake_up_all(&mthread->t_ctl_waitq);
1282
1283         while (1) {
1284                 while (!list_empty(&llmd->llmd_req_list)) {
1285                         bool wakeup = false;
1286
1287                         if (unlikely(llmd->llmd_exit))
1288                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
1289
1290                         /* XXX: To be extended in other patch.
1291                          *
1292                          * Compare the OST side attribute with local attribute,
1293                          * and fix it if found inconsistency. */
1294
1295                         spin_lock(&llmd->llmd_lock);
1296                         llr = list_entry(llmd->llmd_req_list.next,
1297                                          struct lfsck_layout_req,
1298                                          llr_list);
1299                         list_del_init(&llr->llr_list);
1300                         if (bk->lb_async_windows != 0 &&
1301                             llmd->llmd_prefetched >= bk->lb_async_windows)
1302                                 wakeup = true;
1303
1304                         llmd->llmd_prefetched--;
1305                         spin_unlock(&llmd->llmd_lock);
1306                         if (wakeup)
1307                                 wake_up_all(&mthread->t_ctl_waitq);
1308
1309                         lfsck_layout_req_fini(env, llr);
1310                 }
1311
1312                 /* Wakeup the master engine if it is waiting in checkpoint. */
1313                 wake_up_all(&mthread->t_ctl_waitq);
1314
1315                 l_wait_event(athread->t_ctl_waitq,
1316                              !lfsck_layout_req_empty(llmd) ||
1317                              llmd->llmd_exit ||
1318                              llmd->llmd_to_post ||
1319                              llmd->llmd_to_double_scan,
1320                              &lwi);
1321
1322                 if (unlikely(llmd->llmd_exit))
1323                         GOTO(cleanup1, rc = llmd->llmd_post_result);
1324
1325                 if (!list_empty(&llmd->llmd_req_list))
1326                         continue;
1327
1328                 if (llmd->llmd_to_post) {
1329                         llmd->llmd_to_post = 0;
1330                         LASSERT(llmd->llmd_post_result > 0);
1331
1332                         memset(lr, 0, sizeof(*lr));
1333                         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1334                         lr->lr_event = LE_PHASE1_DONE;
1335                         lr->lr_status = llmd->llmd_post_result;
1336                         rc = lfsck_layout_master_notify_others(env, com, lr, 0);
1337                         if (rc != 0)
1338                                 CERROR("%s: failed to notify others "
1339                                        "for layout post: rc = %d\n",
1340                                        lfsck_lfsck2name(lfsck), rc);
1341
1342                         /* Wakeup the master engine to go ahead. */
1343                         wake_up_all(&mthread->t_ctl_waitq);
1344                 }
1345
1346                 if (llmd->llmd_to_double_scan) {
1347                         llmd->llmd_to_double_scan = 0;
1348                         atomic_inc(&lfsck->li_double_scan_count);
1349                         llmd->llmd_in_double_scan = 1;
1350                         wake_up_all(&mthread->t_ctl_waitq);
1351
1352                         while (llmd->llmd_in_double_scan) {
1353                                 struct lfsck_tgt_descs  *ltds =
1354                                                         &lfsck->li_ost_descs;
1355                                 struct lfsck_tgt_desc   *ltd;
1356
1357                                 rc = lfsck_layout_master_query_others(env, com);
1358                                 if (lfsck_layout_master_to_orphan(llmd))
1359                                         goto orphan;
1360
1361                                 if (rc < 0)
1362                                         GOTO(cleanup2, rc);
1363
1364                                 /* Pull LFSCK status on related targets once
1365                                  * per 30 seconds if we are not notified. */
1366                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
1367                                                            cfs_time_seconds(1),
1368                                                            NULL, NULL);
1369                                 rc = l_wait_event(athread->t_ctl_waitq,
1370                                         lfsck_layout_master_to_orphan(llmd) ||
1371                                         llmd->llmd_exit ||
1372                                         !thread_is_running(mthread),
1373                                         &lwi);
1374
1375                                 if (unlikely(llmd->llmd_exit ||
1376                                              !thread_is_running(mthread)))
1377                                         GOTO(cleanup2, rc = 0);
1378
1379                                 if (rc == -ETIMEDOUT)
1380                                         continue;
1381
1382                                 if (rc < 0)
1383                                         GOTO(cleanup2, rc);
1384
1385 orphan:
1386                                 spin_lock(&ltds->ltd_lock);
1387                                 while (!list_empty(
1388                                                 &llmd->llmd_ost_phase2_list)) {
1389                                         ltd = list_entry(
1390                                               llmd->llmd_ost_phase2_list.next,
1391                                               struct lfsck_tgt_desc,
1392                                               ltd_layout_phase_list);
1393                                         list_del_init(
1394                                                 &ltd->ltd_layout_phase_list);
1395                                         spin_unlock(&ltds->ltd_lock);
1396
1397                                         rc = lfsck_layout_scan_orphan(env, com,
1398                                                                       ltd);
1399                                         if (rc != 0 &&
1400                                             bk->lb_param & LPF_FAILOUT)
1401                                                 GOTO(cleanup2, rc);
1402
1403                                         if (unlikely(llmd->llmd_exit ||
1404                                                 !thread_is_running(mthread)))
1405                                                 GOTO(cleanup2, rc = 0);
1406
1407                                         spin_lock(&ltds->ltd_lock);
1408                                 }
1409
1410                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
1411                                         spin_unlock(&ltds->ltd_lock);
1412                                         GOTO(cleanup2, rc = 1);
1413                                 }
1414                                 spin_unlock(&ltds->ltd_lock);
1415                         }
1416                 }
1417         }
1418
1419 cleanup1:
1420         /* Cleanup the unfinished requests. */
1421         spin_lock(&llmd->llmd_lock);
1422         while (!list_empty(&llmd->llmd_req_list)) {
1423                 llr = list_entry(llmd->llmd_req_list.next,
1424                                  struct lfsck_layout_req,
1425                                  llr_list);
1426                 list_del_init(&llr->llr_list);
1427                 llmd->llmd_prefetched--;
1428                 spin_unlock(&llmd->llmd_lock);
1429                 lfsck_layout_req_fini(env, llr);
1430                 spin_lock(&llmd->llmd_lock);
1431         }
1432         spin_unlock(&llmd->llmd_lock);
1433
1434         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
1435                  llmd->llmd_prefetched);
1436
1437 cleanup2:
1438         memset(lr, 0, sizeof(*lr));
1439         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1440         if (rc > 0) {
1441                 lr->lr_event = LE_PHASE2_DONE;
1442                 flags = 0;
1443                 lr->lr_status = rc;
1444         } else if (rc == 0) {
1445                 lr->lr_event = LE_STOP;
1446                 if (lfsck->li_status == LS_PAUSED ||
1447                     lfsck->li_status == LS_CO_PAUSED) {
1448                         flags = 0;
1449                         lr->lr_status = LS_CO_PAUSED;
1450                 } else if (lfsck->li_status == LS_STOPPED ||
1451                          lfsck->li_status == LS_CO_STOPPED) {
1452                         flags = lfsck->li_flags;
1453                         if (flags & LPF_BROADCAST)
1454                                 lr->lr_status = LS_STOPPED;
1455                         else
1456                                 lr->lr_status = LS_CO_STOPPED;
1457                 } else {
1458                         LBUG();
1459                 }
1460         } else {
1461                 lr->lr_event = LE_STOP;
1462                 flags = 0;
1463                 lr->lr_status = LS_CO_FAILED;
1464         }
1465
1466         rc1 = lfsck_layout_master_notify_others(env, com, lr, flags);
1467         if (rc1 != 0) {
1468                 CERROR("%s: failed to notify others for layout quit: rc = %d\n",
1469                        lfsck_lfsck2name(lfsck), rc1);
1470                 rc = rc1;
1471         }
1472
1473         /* Under force exit case, some requests may be just freed without
1474          * verification, those objects should be re-handled when next run.
1475          * So not update the on-disk tracing file under such case. */
1476         if (!llmd->llmd_exit)
1477                 rc1 = lfsck_layout_double_scan_result(env, com, rc);
1478
1479 fini:
1480         if (llmd->llmd_in_double_scan)
1481                 atomic_dec(&lfsck->li_double_scan_count);
1482
1483         spin_lock(&llmd->llmd_lock);
1484         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
1485         thread_set_flags(athread, SVC_STOPPED);
1486         wake_up_all(&mthread->t_ctl_waitq);
1487         spin_unlock(&llmd->llmd_lock);
1488         lfsck_thread_args_fini(lta);
1489
1490         return rc;
1491 }
1492
1493 static int
1494 lfsck_layout_slave_async_interpret(const struct lu_env *env,
1495                                    struct ptlrpc_request *req,
1496                                    void *args, int rc)
1497 {
1498         struct lfsck_layout_slave_async_args *llsaa = args;
1499         struct obd_export                    *exp   = llsaa->llsaa_exp;
1500         struct lfsck_component               *com   = llsaa->llsaa_com;
1501         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
1502         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
1503         bool                                  done  = false;
1504
1505         if (rc != 0) {
1506                 /* It is quite probably caused by target crash,
1507                  * to make the LFSCK can go ahead, assume that
1508                  * the target finished the LFSCK prcoessing. */
1509                 done = true;
1510         } else {
1511                 struct lfsck_reply *lr;
1512
1513                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
1514                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
1515                     lr->lr_status != LS_SCANNING_PHASE2)
1516                         done = true;
1517         }
1518         if (done)
1519                 lfsck_layout_llst_del(llsd, llst);
1520         lfsck_layout_llst_put(llst);
1521         lfsck_component_put(env, com);
1522         class_export_put(exp);
1523
1524         return 0;
1525 }
1526
1527 static int lfsck_layout_async_query(const struct lu_env *env,
1528                                     struct lfsck_component *com,
1529                                     struct obd_export *exp,
1530                                     struct lfsck_layout_slave_target *llst,
1531                                     struct lfsck_request *lr,
1532                                     struct ptlrpc_request_set *set)
1533 {
1534         struct lfsck_layout_slave_async_args *llsaa;
1535         struct ptlrpc_request                *req;
1536         struct lfsck_request                 *tmp;
1537         int                                   rc;
1538         ENTRY;
1539
1540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
1541         if (req == NULL)
1542                 RETURN(-ENOMEM);
1543
1544         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
1545         if (rc != 0) {
1546                 ptlrpc_request_free(req);
1547                 RETURN(rc);
1548         }
1549
1550         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1551         *tmp = *lr;
1552         ptlrpc_request_set_replen(req);
1553
1554         llsaa = ptlrpc_req_async_args(req);
1555         llsaa->llsaa_exp = exp;
1556         llsaa->llsaa_com = lfsck_component_get(com);
1557         llsaa->llsaa_llst = llst;
1558         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
1559         ptlrpc_set_add_req(set, req);
1560
1561         RETURN(0);
1562 }
1563
1564 static int lfsck_layout_async_notify(const struct lu_env *env,
1565                                      struct obd_export *exp,
1566                                      struct lfsck_request *lr,
1567                                      struct ptlrpc_request_set *set)
1568 {
1569         struct ptlrpc_request   *req;
1570         struct lfsck_request    *tmp;
1571         int                      rc;
1572         ENTRY;
1573
1574         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
1575         if (req == NULL)
1576                 RETURN(-ENOMEM);
1577
1578         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1579         if (rc != 0) {
1580                 ptlrpc_request_free(req);
1581                 RETURN(rc);
1582         }
1583
1584         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1585         *tmp = *lr;
1586         ptlrpc_request_set_replen(req);
1587         ptlrpc_set_add_req(set, req);
1588
1589         RETURN(0);
1590 }
1591
1592 static int
1593 lfsck_layout_slave_query_master(const struct lu_env *env,
1594                                 struct lfsck_component *com)
1595 {
1596         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
1597         struct lfsck_instance            *lfsck = com->lc_lfsck;
1598         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
1599         struct lfsck_layout_slave_target *llst;
1600         struct obd_export                *exp;
1601         struct ptlrpc_request_set        *set;
1602         int                               cnt   = 0;
1603         int                               rc    = 0;
1604         int                               rc1   = 0;
1605         ENTRY;
1606
1607         set = ptlrpc_prep_set();
1608         if (set == NULL)
1609                 RETURN(-ENOMEM);
1610
1611         memset(lr, 0, sizeof(*lr));
1612         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1613         lr->lr_event = LE_QUERY;
1614         lr->lr_active = LT_LAYOUT;
1615
1616         llsd->llsd_touch_gen++;
1617         spin_lock(&llsd->llsd_lock);
1618         while (!list_empty(&llsd->llsd_master_list)) {
1619                 llst = list_entry(llsd->llsd_master_list.next,
1620                                   struct lfsck_layout_slave_target,
1621                                   llst_list);
1622                 if (llst->llst_gen == llsd->llsd_touch_gen)
1623                         break;
1624
1625                 llst->llst_gen = llsd->llsd_touch_gen;
1626                 list_del(&llst->llst_list);
1627                 list_add_tail(&llst->llst_list,
1628                               &llsd->llsd_master_list);
1629                 atomic_inc(&llst->llst_ref);
1630                 spin_unlock(&llsd->llsd_lock);
1631
1632                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
1633                                                llst->llst_index);
1634                 if (exp == NULL) {
1635                         lfsck_layout_llst_del(llsd, llst);
1636                         lfsck_layout_llst_put(llst);
1637                         spin_lock(&llsd->llsd_lock);
1638                         continue;
1639                 }
1640
1641                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
1642                 if (rc != 0) {
1643                         CERROR("%s: slave fail to query %s for layout: "
1644                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1645                                exp->exp_obd->obd_name, rc);
1646                         rc1 = rc;
1647                         lfsck_layout_llst_put(llst);
1648                         class_export_put(exp);
1649                 } else {
1650                         cnt++;
1651                 }
1652                 spin_lock(&llsd->llsd_lock);
1653         }
1654         spin_unlock(&llsd->llsd_lock);
1655
1656         if (cnt > 0)
1657                 rc = ptlrpc_set_wait(set);
1658         ptlrpc_set_destroy(set);
1659
1660         RETURN(rc1 != 0 ? rc1 : rc);
1661 }
1662
1663 static void
1664 lfsck_layout_slave_notify_master(const struct lu_env *env,
1665                                  struct lfsck_component *com,
1666                                  enum lfsck_events event, int result)
1667 {
1668         struct lfsck_instance            *lfsck = com->lc_lfsck;
1669         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
1670         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
1671         struct lfsck_layout_slave_target *llst;
1672         struct obd_export                *exp;
1673         struct ptlrpc_request_set        *set;
1674         int                               cnt   = 0;
1675         int                               rc;
1676         ENTRY;
1677
1678         set = ptlrpc_prep_set();
1679         if (set == NULL)
1680                 RETURN_EXIT;
1681
1682         memset(lr, 0, sizeof(*lr));
1683         lr->lr_event = event;
1684         lr->lr_flags = LEF_FROM_OST;
1685         lr->lr_status = result;
1686         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1687         lr->lr_active = LT_LAYOUT;
1688         llsd->llsd_touch_gen++;
1689         spin_lock(&llsd->llsd_lock);
1690         while (!list_empty(&llsd->llsd_master_list)) {
1691                 llst = list_entry(llsd->llsd_master_list.next,
1692                                   struct lfsck_layout_slave_target,
1693                                   llst_list);
1694                 if (llst->llst_gen == llsd->llsd_touch_gen)
1695                         break;
1696
1697                 llst->llst_gen = llsd->llsd_touch_gen;
1698                 list_del(&llst->llst_list);
1699                 list_add_tail(&llst->llst_list,
1700                               &llsd->llsd_master_list);
1701                 atomic_inc(&llst->llst_ref);
1702                 spin_unlock(&llsd->llsd_lock);
1703
1704                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
1705                                                llst->llst_index);
1706                 if (exp == NULL) {
1707                         lfsck_layout_llst_del(llsd, llst);
1708                         lfsck_layout_llst_put(llst);
1709                         spin_lock(&llsd->llsd_lock);
1710                         continue;
1711                 }
1712
1713                 rc = lfsck_layout_async_notify(env, exp, lr, set);
1714                 if (rc != 0)
1715                         CERROR("%s: slave fail to notify %s for layout: "
1716                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1717                                exp->exp_obd->obd_name, rc);
1718                 else
1719                         cnt++;
1720                 lfsck_layout_llst_put(llst);
1721                 class_export_put(exp);
1722                 spin_lock(&llsd->llsd_lock);
1723         }
1724         spin_unlock(&llsd->llsd_lock);
1725
1726         if (cnt > 0)
1727                 rc = ptlrpc_set_wait(set);
1728
1729         ptlrpc_set_destroy(set);
1730
1731         RETURN_EXIT;
1732 }
1733
1734 /* layout APIs */
1735
1736 static int lfsck_layout_reset(const struct lu_env *env,
1737                               struct lfsck_component *com, bool init)
1738 {
1739         struct lfsck_layout     *lo    = com->lc_file_ram;
1740         int                      rc;
1741
1742         down_write(&com->lc_sem);
1743         if (init) {
1744                 memset(lo, 0, com->lc_file_size);
1745         } else {
1746                 __u32 count = lo->ll_success_count;
1747                 __u64 last_time = lo->ll_time_last_complete;
1748
1749                 memset(lo, 0, com->lc_file_size);
1750                 lo->ll_success_count = count;
1751                 lo->ll_time_last_complete = last_time;
1752         }
1753
1754         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
1755         lo->ll_status = LS_INIT;
1756
1757         rc = lfsck_layout_store(env, com);
1758         up_write(&com->lc_sem);
1759
1760         return rc;
1761 }
1762
1763 static void lfsck_layout_fail(const struct lu_env *env,
1764                               struct lfsck_component *com, bool new_checked)
1765 {
1766         struct lfsck_layout *lo = com->lc_file_ram;
1767
1768         down_write(&com->lc_sem);
1769         if (new_checked)
1770                 com->lc_new_checked++;
1771         lo->ll_objs_failed_phase1++;
1772         if (lo->ll_pos_first_inconsistent == 0) {
1773                 struct lfsck_instance *lfsck = com->lc_lfsck;
1774
1775                 lo->ll_pos_first_inconsistent =
1776                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1777                                                         lfsck->li_di_oit);
1778         }
1779         up_write(&com->lc_sem);
1780 }
1781
1782 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
1783                                           struct lfsck_component *com, bool init)
1784 {
1785         struct lfsck_instance           *lfsck   = com->lc_lfsck;
1786         struct lfsck_layout             *lo      = com->lc_file_ram;
1787         struct lfsck_layout_master_data *llmd    = com->lc_data;
1788         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1789         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1790         struct l_wait_info               lwi     = { 0 };
1791         int                              rc;
1792
1793         if (com->lc_new_checked == 0 && !init)
1794                 return 0;
1795
1796         l_wait_event(mthread->t_ctl_waitq,
1797                      list_empty(&llmd->llmd_req_list) ||
1798                      !thread_is_running(mthread) ||
1799                      thread_is_stopped(athread),
1800                      &lwi);
1801
1802         if (!thread_is_running(mthread) || thread_is_stopped(athread))
1803                 return 0;
1804
1805         down_write(&com->lc_sem);
1806         if (init) {
1807                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
1808         } else {
1809                 lo->ll_pos_last_checkpoint =
1810                                         lfsck->li_pos_current.lp_oit_cookie;
1811                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1812                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1813                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1814                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
1815                 com->lc_new_checked = 0;
1816         }
1817
1818         rc = lfsck_layout_store(env, com);
1819         up_write(&com->lc_sem);
1820
1821         return rc;
1822 }
1823
1824 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
1825                                          struct lfsck_component *com, bool init)
1826 {
1827         struct lfsck_instance   *lfsck = com->lc_lfsck;
1828         struct lfsck_layout     *lo    = com->lc_file_ram;
1829         int                      rc;
1830
1831         if (com->lc_new_checked == 0 && !init)
1832                 return 0;
1833
1834         down_write(&com->lc_sem);
1835
1836         if (init) {
1837                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
1838         } else {
1839                 lo->ll_pos_last_checkpoint =
1840                                         lfsck->li_pos_current.lp_oit_cookie;
1841                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1842                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1843                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1844                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
1845                 com->lc_new_checked = 0;
1846         }
1847
1848         rc = lfsck_layout_store(env, com);
1849
1850         up_write(&com->lc_sem);
1851
1852         return rc;
1853 }
1854
1855 static int lfsck_layout_prep(const struct lu_env *env,
1856                              struct lfsck_component *com)
1857 {
1858         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1859         struct lfsck_layout     *lo     = com->lc_file_ram;
1860         struct lfsck_position   *pos    = &com->lc_pos_start;
1861
1862         fid_zero(&pos->lp_dir_parent);
1863         pos->lp_dir_cookie = 0;
1864         if (lo->ll_status == LS_COMPLETED ||
1865             lo->ll_status == LS_PARTIAL) {
1866                 int rc;
1867
1868                 rc = lfsck_layout_reset(env, com, false);
1869                 if (rc != 0)
1870                         return rc;
1871         }
1872
1873         down_write(&com->lc_sem);
1874
1875         lo->ll_time_latest_start = cfs_time_current_sec();
1876
1877         spin_lock(&lfsck->li_lock);
1878         if (lo->ll_flags & LF_SCANNED_ONCE) {
1879                 if (!lfsck->li_drop_dryrun ||
1880                     lo->ll_pos_first_inconsistent == 0) {
1881                         lo->ll_status = LS_SCANNING_PHASE2;
1882                         list_del_init(&com->lc_link);
1883                         list_add_tail(&com->lc_link,
1884                                       &lfsck->li_list_double_scan);
1885                         pos->lp_oit_cookie = 0;
1886                 } else {
1887                         int i;
1888
1889                         lo->ll_status = LS_SCANNING_PHASE1;
1890                         lo->ll_run_time_phase1 = 0;
1891                         lo->ll_run_time_phase2 = 0;
1892                         lo->ll_objs_checked_phase1 = 0;
1893                         lo->ll_objs_checked_phase2 = 0;
1894                         lo->ll_objs_failed_phase1 = 0;
1895                         lo->ll_objs_failed_phase2 = 0;
1896                         for (i = 0; i < LLIT_MAX; i++)
1897                                 lo->ll_objs_repaired[i] = 0;
1898
1899                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
1900                 }
1901         } else {
1902                 lo->ll_status = LS_SCANNING_PHASE1;
1903                 if (!lfsck->li_drop_dryrun ||
1904                     lo->ll_pos_first_inconsistent == 0)
1905                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
1906                 else
1907                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
1908         }
1909         spin_unlock(&lfsck->li_lock);
1910
1911         up_write(&com->lc_sem);
1912
1913         return 0;
1914 }
1915
1916 static int lfsck_layout_slave_prep(const struct lu_env *env,
1917                                    struct lfsck_component *com,
1918                                    struct lfsck_start_param *lsp)
1919 {
1920         struct lfsck_layout             *lo     = com->lc_file_ram;
1921         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1922         int                              rc;
1923
1924         /* XXX: For a new scanning, generate OST-objects
1925          *      bitmap for orphan detection. */
1926
1927         rc = lfsck_layout_prep(env, com);
1928         if (rc != 0 || lo->ll_status != LS_SCANNING_PHASE1 ||
1929             !lsp->lsp_index_valid)
1930                 return rc;
1931
1932         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
1933
1934         return rc;
1935 }
1936
1937 static int lfsck_layout_master_prep(const struct lu_env *env,
1938                                     struct lfsck_component *com,
1939                                     struct lfsck_start_param *lsp)
1940 {
1941         struct lfsck_instance           *lfsck   = com->lc_lfsck;
1942         struct lfsck_layout_master_data *llmd    = com->lc_data;
1943         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
1944         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
1945         struct lfsck_thread_args        *lta;
1946         long                             rc;
1947         ENTRY;
1948
1949         rc = lfsck_layout_prep(env, com);
1950         if (rc != 0)
1951                 RETURN(rc);
1952
1953         llmd->llmd_assistant_status = 0;
1954         llmd->llmd_post_result = 0;
1955         llmd->llmd_to_post = 0;
1956         llmd->llmd_to_double_scan = 0;
1957         llmd->llmd_in_double_scan = 0;
1958         llmd->llmd_exit = 0;
1959         thread_set_flags(athread, 0);
1960
1961         lta = lfsck_thread_args_init(lfsck, com, lsp);
1962         if (IS_ERR(lta))
1963                 RETURN(PTR_ERR(lta));
1964
1965         rc = PTR_ERR(kthread_run(lfsck_layout_assistant, lta, "lfsck_layout"));
1966         if (IS_ERR_VALUE(rc)) {
1967                 CERROR("%s: Cannot start LFSCK layout assistant thread: "
1968                        "rc = %ld\n", lfsck_lfsck2name(lfsck), rc);
1969                 lfsck_thread_args_fini(lta);
1970         } else {
1971                 struct l_wait_info lwi = { 0 };
1972
1973                 l_wait_event(mthread->t_ctl_waitq,
1974                              thread_is_running(athread) ||
1975                              thread_is_stopped(athread),
1976                              &lwi);
1977                 if (unlikely(!thread_is_running(athread)))
1978                         rc = llmd->llmd_assistant_status;
1979                 else
1980                         rc = 0;
1981         }
1982
1983         RETURN(rc);
1984 }
1985
1986 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
1987                                         struct lfsck_component *com,
1988                                         struct dt_object *obj)
1989 {
1990         /* XXX: To be implemented in other patches.
1991          *
1992          * For the given object, read its layout EA locally. For each stripe,
1993          * pre-fetch the OST-object's attribute and generate an structure
1994          * lfsck_layout_req on the list ::llmd_req_list.
1995          *
1996          * For each request on the ::llmd_req_list, the lfsck_layout_assistant
1997          * thread will compare the OST side attribute with local attribute,
1998          * if inconsistent, then repair it.
1999          *
2000          * All above processing is async mode with pipeline. */
2001
2002         return 0;
2003 }
2004
2005 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
2006                                        struct lfsck_component *com,
2007                                        struct dt_object *obj)
2008 {
2009         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2010         struct lfsck_layout             *lo     = com->lc_file_ram;
2011         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
2012         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
2013         struct lfsck_layout_seq         *lls;
2014         __u64                            seq;
2015         __u64                            oid;
2016         int                              rc;
2017         ENTRY;
2018
2019         /* XXX: Update OST-objects bitmap for orphan detection. */
2020
2021         LASSERT(llsd != NULL);
2022
2023         down_write(&com->lc_sem);
2024         if (fid_is_idif(fid))
2025                 seq = 0;
2026         else if (!fid_is_norm(fid) ||
2027                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
2028                 GOTO(unlock, rc = 0);
2029         else
2030                 seq = fid_seq(fid);
2031         com->lc_new_checked++;
2032
2033         lls = lfsck_layout_seq_lookup(llsd, seq);
2034         if (lls == NULL) {
2035                 OBD_ALLOC_PTR(lls);
2036                 if (unlikely(lls == NULL))
2037                         GOTO(unlock, rc = -ENOMEM);
2038
2039                 INIT_LIST_HEAD(&lls->lls_list);
2040                 lls->lls_seq = seq;
2041                 rc = lfsck_layout_lastid_load(env, com, lls);
2042                 if (rc != 0) {
2043                         lo->ll_objs_failed_phase1++;
2044                         OBD_FREE_PTR(lls);
2045                         GOTO(unlock, rc);
2046                 }
2047
2048                 lfsck_layout_seq_insert(llsd, lls);
2049         }
2050
2051         if (unlikely(fid_is_last_id(fid)))
2052                 GOTO(unlock, rc = 0);
2053
2054         oid = fid_oid(fid);
2055         if (oid > lls->lls_lastid_known)
2056                 lls->lls_lastid_known = oid;
2057
2058         if (oid > lls->lls_lastid) {
2059                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
2060                         /* OFD may create new objects during LFSCK scanning. */
2061                         rc = lfsck_layout_lastid_reload(env, com, lls);
2062                         if (unlikely(rc != 0))
2063                                 CWARN("%s: failed to reload LAST_ID for "LPX64
2064                                       ": rc = %d\n",
2065                                       lfsck_lfsck2name(com->lc_lfsck),
2066                                       lls->lls_seq, rc);
2067                         if (oid <= lls->lls_lastid)
2068                                 GOTO(unlock, rc = 0);
2069
2070                         LASSERT(lfsck->li_out_notify != NULL);
2071
2072                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
2073                                              LE_LASTID_REBUILDING);
2074                         lo->ll_flags |= LF_CRASHED_LASTID;
2075                 }
2076
2077                 lls->lls_lastid = oid;
2078                 lls->lls_dirty = 1;
2079         }
2080
2081         GOTO(unlock, rc = 0);
2082
2083 unlock:
2084         up_write(&com->lc_sem);
2085
2086         return rc;
2087 }
2088
2089 static int lfsck_layout_exec_dir(const struct lu_env *env,
2090                                  struct lfsck_component *com,
2091                                  struct dt_object *obj,
2092                                  struct lu_dirent *ent)
2093 {
2094         return 0;
2095 }
2096
2097 static int lfsck_layout_master_post(const struct lu_env *env,
2098                                     struct lfsck_component *com,
2099                                     int result, bool init)
2100 {
2101         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2102         struct lfsck_layout             *lo      = com->lc_file_ram;
2103         struct lfsck_layout_master_data *llmd    = com->lc_data;
2104         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2105         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2106         struct l_wait_info               lwi     = { 0 };
2107         int                              rc;
2108         ENTRY;
2109
2110
2111         llmd->llmd_post_result = result;
2112         llmd->llmd_to_post = 1;
2113         if (llmd->llmd_post_result <= 0)
2114                 llmd->llmd_exit = 1;
2115
2116         wake_up_all(&athread->t_ctl_waitq);
2117         l_wait_event(mthread->t_ctl_waitq,
2118                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
2119                      thread_is_stopped(athread),
2120                      &lwi);
2121
2122         if (llmd->llmd_assistant_status < 0)
2123                 result = llmd->llmd_assistant_status;
2124
2125         down_write(&com->lc_sem);
2126         spin_lock(&lfsck->li_lock);
2127         /* When LFSCK failed, there may be some prefetched objects those are
2128          * not been processed yet, we do not know the exactly position, then
2129          * just restart from last check-point next time. */
2130         if (!init && !llmd->llmd_exit)
2131                 lo->ll_pos_last_checkpoint =
2132                                         lfsck->li_pos_current.lp_oit_cookie;
2133
2134         if (result > 0) {
2135                 lo->ll_status = LS_SCANNING_PHASE2;
2136                 lo->ll_flags |= LF_SCANNED_ONCE;
2137                 lo->ll_flags &= ~LF_UPGRADE;
2138                 list_del_init(&com->lc_link);
2139                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
2140         } else if (result == 0) {
2141                 lo->ll_status = lfsck->li_status;
2142                 if (lo->ll_status == 0)
2143                         lo->ll_status = LS_STOPPED;
2144                 if (lo->ll_status != LS_PAUSED) {
2145                         list_del_init(&com->lc_link);
2146                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2147                 }
2148         } else {
2149                 lo->ll_status = LS_FAILED;
2150                 list_del_init(&com->lc_link);
2151                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2152         }
2153         spin_unlock(&lfsck->li_lock);
2154
2155         if (!init) {
2156                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2157                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2158                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
2159                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
2160                 com->lc_new_checked = 0;
2161         }
2162
2163         rc = lfsck_layout_store(env, com);
2164         up_write(&com->lc_sem);
2165
2166         RETURN(rc);
2167 }
2168
2169 static int lfsck_layout_slave_post(const struct lu_env *env,
2170                                    struct lfsck_component *com,
2171                                    int result, bool init)
2172 {
2173         struct lfsck_instance   *lfsck = com->lc_lfsck;
2174         struct lfsck_layout     *lo    = com->lc_file_ram;
2175         int                      rc;
2176         bool                     done  = false;
2177
2178         rc = lfsck_layout_lastid_store(env, com);
2179         if (rc != 0)
2180                 result = rc;
2181
2182         LASSERT(lfsck->li_out_notify != NULL);
2183
2184         down_write(&com->lc_sem);
2185
2186         spin_lock(&lfsck->li_lock);
2187         if (!init)
2188                 lo->ll_pos_last_checkpoint =
2189                                         lfsck->li_pos_current.lp_oit_cookie;
2190         if (result > 0) {
2191                 lo->ll_status = LS_SCANNING_PHASE2;
2192                 lo->ll_flags |= LF_SCANNED_ONCE;
2193                 if (lo->ll_flags & LF_CRASHED_LASTID) {
2194                         done = true;
2195                         lo->ll_flags &= ~LF_CRASHED_LASTID;
2196                 }
2197                 lo->ll_flags &= ~LF_UPGRADE;
2198                 list_del_init(&com->lc_link);
2199                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
2200         } else if (result == 0) {
2201                 lo->ll_status = lfsck->li_status;
2202                 if (lo->ll_status == 0)
2203                         lo->ll_status = LS_STOPPED;
2204                 if (lo->ll_status != LS_PAUSED) {
2205                         list_del_init(&com->lc_link);
2206                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2207                 }
2208         } else {
2209                 lo->ll_status = LS_FAILED;
2210                 list_del_init(&com->lc_link);
2211                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2212         }
2213         spin_unlock(&lfsck->li_lock);
2214
2215         if (done)
2216                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
2217                                      LE_LASTID_REBUILT);
2218
2219         if (!init) {
2220                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2221                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2222                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
2223                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
2224                 com->lc_new_checked = 0;
2225         }
2226
2227         rc = lfsck_layout_store(env, com);
2228
2229         up_write(&com->lc_sem);
2230
2231         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
2232
2233         return rc;
2234 }
2235
2236 static int lfsck_layout_dump(const struct lu_env *env,
2237                              struct lfsck_component *com, char *buf, int len)
2238 {
2239         struct lfsck_instance   *lfsck = com->lc_lfsck;
2240         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
2241         struct lfsck_layout     *lo    = com->lc_file_ram;
2242         int                      save  = len;
2243         int                      ret   = -ENOSPC;
2244         int                      rc;
2245
2246         down_read(&com->lc_sem);
2247         rc = snprintf(buf, len,
2248                       "name: lfsck_layout\n"
2249                       "magic: %#x\n"
2250                       "version: %d\n"
2251                       "status: %s\n",
2252                       lo->ll_magic,
2253                       bk->lb_version,
2254                       lfsck_status2names(lo->ll_status));
2255         if (rc <= 0)
2256                 goto out;
2257
2258         buf += rc;
2259         len -= rc;
2260         rc = lfsck_bits_dump(&buf, &len, lo->ll_flags, lfsck_flags_names,
2261                              "flags");
2262         if (rc < 0)
2263                 goto out;
2264
2265         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
2266                              "param");
2267         if (rc < 0)
2268                 goto out;
2269
2270         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_complete,
2271                              "time_since_last_completed");
2272         if (rc < 0)
2273                 goto out;
2274
2275         rc = lfsck_time_dump(&buf, &len, lo->ll_time_latest_start,
2276                              "time_since_latest_start");
2277         if (rc < 0)
2278                 goto out;
2279
2280         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_checkpoint,
2281                              "time_since_last_checkpoint");
2282         if (rc < 0)
2283                 goto out;
2284
2285         rc = snprintf(buf, len,
2286                       "latest_start_position: "LPU64"\n"
2287                       "last_checkpoint_position: "LPU64"\n"
2288                       "first_failure_position: "LPU64"\n",
2289                       lo->ll_pos_latest_start,
2290                       lo->ll_pos_last_checkpoint,
2291                       lo->ll_pos_first_inconsistent);
2292         if (rc <= 0)
2293                 goto out;
2294
2295         buf += rc;
2296         len -= rc;
2297
2298         rc = snprintf(buf, len,
2299                       "success_count: %u\n"
2300                       "repaired_dangling: "LPU64"\n"
2301                       "repaired_unmatched_pair: "LPU64"\n"
2302                       "repaired_multiple_referenced: "LPU64"\n"
2303                       "repaired_orphan: "LPU64"\n"
2304                       "repaired_inconsistent_owner: "LPU64"\n"
2305                       "repaired_others: "LPU64"\n"
2306                       "skipped: "LPU64"\n"
2307                       "failed_phase1: "LPU64"\n"
2308                       "failed_phase2: "LPU64"\n",
2309                       lo->ll_success_count,
2310                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
2311                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
2312                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
2313                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
2314                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
2315                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
2316                       lo->ll_objs_skipped,
2317                       lo->ll_objs_failed_phase1,
2318                       lo->ll_objs_failed_phase2);
2319         if (rc <= 0)
2320                 goto out;
2321
2322         buf += rc;
2323         len -= rc;
2324
2325         if (lo->ll_status == LS_SCANNING_PHASE1) {
2326                 __u64 pos;
2327                 const struct dt_it_ops *iops;
2328                 cfs_duration_t duration = cfs_time_current() -
2329                                           lfsck->li_time_last_checkpoint;
2330                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
2331                 __u64 speed = checked;
2332                 __u64 new_checked = com->lc_new_checked * HZ;
2333                 __u32 rtime = lo->ll_run_time_phase1 +
2334                               cfs_duration_sec(duration + HALF_SEC);
2335
2336                 if (duration != 0)
2337                         do_div(new_checked, duration);
2338                 if (rtime != 0)
2339                         do_div(speed, rtime);
2340                 rc = snprintf(buf, len,
2341                               "checked_phase1: "LPU64"\n"
2342                               "checked_phase2: "LPU64"\n"
2343                               "run_time_phase1: %u seconds\n"
2344                               "run_time_phase2: %u seconds\n"
2345                               "average_speed_phase1: "LPU64" items/sec\n"
2346                               "average_speed_phase2: N/A\n"
2347                               "real-time_speed_phase1: "LPU64" items/sec\n"
2348                               "real-time_speed_phase2: N/A\n",
2349                               checked,
2350                               lo->ll_objs_checked_phase2,
2351                               rtime,
2352                               lo->ll_run_time_phase2,
2353                               speed,
2354                               new_checked);
2355                 if (rc <= 0)
2356                         goto out;
2357
2358                 buf += rc;
2359                 len -= rc;
2360
2361                 LASSERT(lfsck->li_di_oit != NULL);
2362
2363                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
2364
2365                 /* The low layer otable-based iteration position may NOT
2366                  * exactly match the layout-based directory traversal
2367                  * cookie. Generally, it is not a serious issue. But the
2368                  * caller should NOT make assumption on that. */
2369                 pos = iops->store(env, lfsck->li_di_oit);
2370                 if (!lfsck->li_current_oit_processed)
2371                         pos--;
2372                 rc = snprintf(buf, len, "current_position: "LPU64"\n", pos);
2373                 if (rc <= 0)
2374                         goto out;
2375
2376                 buf += rc;
2377                 len -= rc;
2378         } else {
2379                 /* XXX: LS_SCANNING_PHASE2 will be handled in the future. */
2380                 __u64 speed1 = lo->ll_objs_checked_phase1;
2381                 __u64 speed2 = lo->ll_objs_checked_phase2;
2382
2383                 if (lo->ll_run_time_phase1 != 0)
2384                         do_div(speed1, lo->ll_run_time_phase1);
2385                 if (lo->ll_run_time_phase2 != 0)
2386                         do_div(speed2, lo->ll_run_time_phase2);
2387                 rc = snprintf(buf, len,
2388                               "checked_phase1: "LPU64"\n"
2389                               "checked_phase2: "LPU64"\n"
2390                               "run_time_phase1: %u seconds\n"
2391                               "run_time_phase2: %u seconds\n"
2392                               "average_speed_phase1: "LPU64" items/sec\n"
2393                               "average_speed_phase2: "LPU64" objs/sec\n"
2394                               "real-time_speed_phase1: N/A\n"
2395                               "real-time_speed_phase2: N/A\n"
2396                               "current_position: N/A\n",
2397                               lo->ll_objs_checked_phase1,
2398                               lo->ll_objs_checked_phase2,
2399                               lo->ll_run_time_phase1,
2400                               lo->ll_run_time_phase2,
2401                               speed1,
2402                               speed2);
2403                 if (rc <= 0)
2404                         goto out;
2405
2406                 buf += rc;
2407                 len -= rc;
2408         }
2409         ret = save - len;
2410
2411 out:
2412         up_read(&com->lc_sem);
2413
2414         return ret;
2415 }
2416
2417 static int lfsck_layout_master_double_scan(const struct lu_env *env,
2418                                            struct lfsck_component *com)
2419 {
2420         struct lfsck_layout_master_data *llmd    = com->lc_data;
2421         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2422         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2423         struct lfsck_layout             *lo      = com->lc_file_ram;
2424         struct l_wait_info               lwi     = { 0 };
2425
2426         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
2427                 return 0;
2428
2429         llmd->llmd_to_double_scan = 1;
2430         wake_up_all(&athread->t_ctl_waitq);
2431         l_wait_event(mthread->t_ctl_waitq,
2432                      llmd->llmd_in_double_scan ||
2433                      thread_is_stopped(athread),
2434                      &lwi);
2435         if (llmd->llmd_assistant_status < 0)
2436                 return llmd->llmd_assistant_status;
2437
2438         return 0;
2439 }
2440
2441 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
2442                                           struct lfsck_component *com)
2443 {
2444         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2445         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
2446         struct lfsck_layout             *lo     = com->lc_file_ram;
2447         struct ptlrpc_thread            *thread = &lfsck->li_thread;
2448         int                              rc;
2449         ENTRY;
2450
2451         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
2452                 RETURN(0);
2453
2454         atomic_inc(&lfsck->li_double_scan_count);
2455
2456         com->lc_new_checked = 0;
2457         com->lc_new_scanned = 0;
2458         com->lc_time_last_checkpoint = cfs_time_current();
2459         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
2460                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
2461
2462         while (1) {
2463                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
2464                                                      NULL, NULL);
2465
2466                 rc = lfsck_layout_slave_query_master(env, com);
2467                 if (list_empty(&llsd->llsd_master_list)) {
2468                         if (unlikely(!thread_is_running(thread)))
2469                                 rc = 0;
2470                         else
2471                                 rc = 1;
2472
2473                         GOTO(done, rc);
2474                 }
2475
2476                 if (rc < 0)
2477                         GOTO(done, rc);
2478
2479                 rc = l_wait_event(thread->t_ctl_waitq,
2480                                   !thread_is_running(thread) ||
2481                                   list_empty(&llsd->llsd_master_list),
2482                                   &lwi);
2483                 if (unlikely(!thread_is_running(thread)))
2484                         GOTO(done, rc = 0);
2485
2486                 if (rc == -ETIMEDOUT)
2487                         continue;
2488
2489                 GOTO(done, rc = (rc < 0 ? rc : 1));
2490         }
2491
2492 done:
2493         rc = lfsck_layout_double_scan_result(env, com, rc);
2494
2495         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
2496                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
2497
2498         return rc;
2499 }
2500
2501 static void lfsck_layout_master_data_release(const struct lu_env *env,
2502                                              struct lfsck_component *com)
2503 {
2504         struct lfsck_layout_master_data *llmd   = com->lc_data;
2505         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2506         struct lfsck_tgt_descs          *ltds;
2507         struct lfsck_tgt_desc           *ltd;
2508         struct lfsck_tgt_desc           *next;
2509
2510         LASSERT(llmd != NULL);
2511         LASSERT(thread_is_init(&llmd->llmd_thread) ||
2512                 thread_is_stopped(&llmd->llmd_thread));
2513         LASSERT(list_empty(&llmd->llmd_req_list));
2514
2515         com->lc_data = NULL;
2516
2517         ltds = &lfsck->li_ost_descs;
2518         spin_lock(&ltds->ltd_lock);
2519         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
2520                                  ltd_layout_phase_list) {
2521                 list_del_init(&ltd->ltd_layout_phase_list);
2522         }
2523         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
2524                                  ltd_layout_phase_list) {
2525                 list_del_init(&ltd->ltd_layout_phase_list);
2526         }
2527         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
2528                                  ltd_layout_list) {
2529                 list_del_init(&ltd->ltd_layout_list);
2530         }
2531         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
2532                                  ltd_layout_phase_list) {
2533                 list_del_init(&ltd->ltd_layout_phase_list);
2534         }
2535         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
2536                                  ltd_layout_phase_list) {
2537                 list_del_init(&ltd->ltd_layout_phase_list);
2538         }
2539         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
2540                                  ltd_layout_list) {
2541                 list_del_init(&ltd->ltd_layout_list);
2542         }
2543         spin_unlock(&ltds->ltd_lock);
2544
2545         OBD_FREE_PTR(llmd);
2546 }
2547
2548 static void lfsck_layout_slave_data_release(const struct lu_env *env,
2549                                             struct lfsck_component *com)
2550 {
2551         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
2552         struct lfsck_layout_seq          *lls;
2553         struct lfsck_layout_seq          *next;
2554         struct lfsck_layout_slave_target *llst;
2555         struct lfsck_layout_slave_target *tmp;
2556
2557         LASSERT(llsd != NULL);
2558
2559         com->lc_data = NULL;
2560
2561         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
2562                                      lls_list) {
2563                 list_del_init(&lls->lls_list);
2564                 lfsck_object_put(env, lls->lls_lastid_obj);
2565                 OBD_FREE_PTR(lls);
2566         }
2567
2568         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
2569                                  llst_list) {
2570                 list_del_init(&llst->llst_list);
2571                 OBD_FREE_PTR(llst);
2572         }
2573
2574         OBD_FREE_PTR(llsd);
2575 }
2576
2577 static void lfsck_layout_master_quit(const struct lu_env *env,
2578                                      struct lfsck_component *com)
2579 {
2580         struct lfsck_layout_master_data *llmd    = com->lc_data;
2581         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2582         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2583         struct l_wait_info               lwi     = { 0 };
2584
2585         llmd->llmd_exit = 1;
2586         wake_up_all(&athread->t_ctl_waitq);
2587         l_wait_event(mthread->t_ctl_waitq,
2588                      thread_is_init(athread) ||
2589                      thread_is_stopped(athread),
2590                      &lwi);
2591 }
2592
2593 static int lfsck_layout_master_in_notify(const struct lu_env *env,
2594                                          struct lfsck_component *com,
2595                                          struct lfsck_request *lr)
2596 {
2597         struct lfsck_instance           *lfsck = com->lc_lfsck;
2598         struct lfsck_layout             *lo    = com->lc_file_ram;
2599         struct lfsck_layout_master_data *llmd  = com->lc_data;
2600         struct lfsck_tgt_descs          *ltds;
2601         struct lfsck_tgt_desc           *ltd;
2602         ENTRY;
2603
2604         if (lr->lr_event != LE_PHASE1_DONE &&
2605             lr->lr_event != LE_PHASE2_DONE &&
2606             lr->lr_event != LE_STOP)
2607                 RETURN(-EINVAL);
2608
2609         if (lr->lr_flags & LEF_FROM_OST)
2610                 ltds = &lfsck->li_ost_descs;
2611         else
2612                 ltds = &lfsck->li_mdt_descs;
2613         spin_lock(&ltds->ltd_lock);
2614         ltd = LTD_TGT(ltds, lr->lr_index);
2615         if (ltd == NULL) {
2616                 spin_unlock(&ltds->ltd_lock);
2617
2618                 RETURN(-ENODEV);
2619         }
2620
2621         list_del_init(&ltd->ltd_layout_phase_list);
2622         switch (lr->lr_event) {
2623         case LE_PHASE1_DONE:
2624                 if (lr->lr_status <= 0) {
2625                         ltd->ltd_layout_done = 1;
2626                         list_del_init(&ltd->ltd_layout_list);
2627                         lo->ll_flags |= LF_INCOMPLETE;
2628                         break;
2629                 }
2630
2631                 if (lr->lr_flags & LEF_FROM_OST) {
2632                         if (list_empty(&ltd->ltd_layout_list))
2633                                 list_add_tail(&ltd->ltd_layout_list,
2634                                               &llmd->llmd_ost_list);
2635                         list_add_tail(&ltd->ltd_layout_phase_list,
2636                                       &llmd->llmd_ost_phase2_list);
2637                 } else {
2638                         if (list_empty(&ltd->ltd_layout_list))
2639                                 list_add_tail(&ltd->ltd_layout_list,
2640                                               &llmd->llmd_mdt_list);
2641                         list_add_tail(&ltd->ltd_layout_phase_list,
2642                                       &llmd->llmd_mdt_phase2_list);
2643                 }
2644                 break;
2645         case LE_PHASE2_DONE:
2646                 ltd->ltd_layout_done = 1;
2647                 list_del_init(&ltd->ltd_layout_list);
2648                 break;
2649         case LE_STOP:
2650                 ltd->ltd_layout_done = 1;
2651                 list_del_init(&ltd->ltd_layout_list);
2652                 if (!(lr->lr_flags & LEF_FORCE_STOP))
2653                         lo->ll_flags |= LF_INCOMPLETE;
2654                 break;
2655         default:
2656                 break;
2657         }
2658         spin_unlock(&ltds->ltd_lock);
2659
2660         if (lr->lr_flags & LEF_FORCE_STOP) {
2661                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2662
2663                 memset(stop, 0, sizeof(*stop));
2664                 stop->ls_status = lr->lr_status;
2665                 stop->ls_flags = lr->lr_param;
2666                 lfsck_stop(env, lfsck->li_bottom, stop);
2667         } else if (lfsck_layout_master_to_orphan(llmd)) {
2668                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
2669         }
2670
2671         RETURN(0);
2672 }
2673
2674 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
2675                                         struct lfsck_component *com,
2676                                         struct lfsck_request *lr)
2677 {
2678         struct lfsck_instance            *lfsck = com->lc_lfsck;
2679         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
2680         struct lfsck_layout_slave_target *llst;
2681         ENTRY;
2682
2683         if (lr->lr_event != LE_PHASE2_DONE &&
2684             lr->lr_event != LE_STOP)
2685                 RETURN(-EINVAL);
2686
2687         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index);
2688         if (llst == NULL)
2689                 RETURN(-ENODEV);
2690
2691         lfsck_layout_llst_put(llst);
2692         if (list_empty(&llsd->llsd_master_list)) {
2693                 switch (lr->lr_event) {
2694                 case LE_PHASE2_DONE:
2695                         wake_up_all(&lfsck->li_thread.t_ctl_waitq);
2696                         break;
2697                 case LE_STOP: {
2698                         struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2699
2700                         memset(stop, 0, sizeof(*stop));
2701                         stop->ls_status = lr->lr_status;
2702                         stop->ls_flags = lr->lr_param;
2703                         lfsck_stop(env, lfsck->li_bottom, stop);
2704                         break;
2705                 }
2706                 default:
2707                         break;
2708                 }
2709         }
2710
2711         RETURN(0);
2712 }
2713
2714 static int lfsck_layout_query(const struct lu_env *env,
2715                               struct lfsck_component *com)
2716 {
2717         struct lfsck_layout *lo = com->lc_file_ram;
2718
2719         return lo->ll_status;
2720 }
2721
2722 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
2723                                            struct lfsck_component *com,
2724                                            struct lfsck_tgt_descs *ltds,
2725                                            struct lfsck_tgt_desc *ltd,
2726                                            struct ptlrpc_request_set *set)
2727 {
2728         struct lfsck_thread_info          *info  = lfsck_env_info(env);
2729         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2730         struct lfsck_request              *lr    = &info->lti_lr;
2731         struct lfsck_instance             *lfsck = com->lc_lfsck;
2732         int                                rc;
2733
2734         LASSERT(list_empty(&ltd->ltd_layout_list));
2735         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
2736
2737         memset(lr, 0, sizeof(*lr));
2738         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2739         lr->lr_event = LE_STOP;
2740         lr->lr_active = LT_LAYOUT;
2741         if (ltds == &lfsck->li_ost_descs) {
2742                 lr->lr_flags = LEF_TO_OST;
2743         } else {
2744                 if (ltd->ltd_index == lfsck_dev_idx(lfsck->li_bottom))
2745                         return 0;
2746
2747                 lr->lr_flags = 0;
2748         }
2749         lr->lr_status = LS_CO_STOPPED;
2750
2751         laia->laia_com = com;
2752         laia->laia_ltds = ltds;
2753         laia->laia_ltd = ltd;
2754         laia->laia_lr = lr;
2755
2756         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2757                                  lfsck_layout_master_async_interpret,
2758                                  laia, LFSCK_NOTIFY);
2759         if (rc != 0)
2760                 CERROR("%s: Fail to notify %s %x for co-stop: rc = %d\n",
2761                        lfsck_lfsck2name(lfsck),
2762                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2763                        ltd->ltd_index, rc);
2764
2765         return rc;
2766 }
2767
2768 /* with lfsck::li_lock held */
2769 static int lfsck_layout_slave_join(const struct lu_env *env,
2770                                    struct lfsck_component *com,
2771                                    struct lfsck_start_param *lsp)
2772 {
2773         struct lfsck_instance            *lfsck = com->lc_lfsck;
2774         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
2775         struct lfsck_layout_slave_target *llst;
2776         struct lfsck_start               *start = lsp->lsp_start;
2777         int                               rc    = 0;
2778         ENTRY;
2779
2780         if (!lsp->lsp_index_valid || start == NULL ||
2781             !(start->ls_flags & LPF_ALL_MDT))
2782                 RETURN(-EALREADY);
2783
2784         spin_unlock(&lfsck->li_lock);
2785         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
2786         spin_lock(&lfsck->li_lock);
2787         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
2788                 spin_unlock(&lfsck->li_lock);
2789                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index);
2790                 if (llst != NULL)
2791                         lfsck_layout_llst_put(llst);
2792                 spin_lock(&lfsck->li_lock);
2793                 rc = -EAGAIN;
2794         }
2795
2796         RETURN(rc);
2797 }
2798
2799 static struct lfsck_operations lfsck_layout_master_ops = {
2800         .lfsck_reset            = lfsck_layout_reset,
2801         .lfsck_fail             = lfsck_layout_fail,
2802         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
2803         .lfsck_prep             = lfsck_layout_master_prep,
2804         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
2805         .lfsck_exec_dir         = lfsck_layout_exec_dir,
2806         .lfsck_post             = lfsck_layout_master_post,
2807         .lfsck_dump             = lfsck_layout_dump,
2808         .lfsck_double_scan      = lfsck_layout_master_double_scan,
2809         .lfsck_data_release     = lfsck_layout_master_data_release,
2810         .lfsck_quit             = lfsck_layout_master_quit,
2811         .lfsck_in_notify        = lfsck_layout_master_in_notify,
2812         .lfsck_query            = lfsck_layout_query,
2813         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
2814 };
2815
2816 static struct lfsck_operations lfsck_layout_slave_ops = {
2817         .lfsck_reset            = lfsck_layout_reset,
2818         .lfsck_fail             = lfsck_layout_fail,
2819         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
2820         .lfsck_prep             = lfsck_layout_slave_prep,
2821         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
2822         .lfsck_exec_dir         = lfsck_layout_exec_dir,
2823         .lfsck_post             = lfsck_layout_slave_post,
2824         .lfsck_dump             = lfsck_layout_dump,
2825         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
2826         .lfsck_data_release     = lfsck_layout_slave_data_release,
2827         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
2828         .lfsck_query            = lfsck_layout_query,
2829         .lfsck_join             = lfsck_layout_slave_join,
2830 };
2831
2832 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
2833 {
2834         struct lfsck_component  *com;
2835         struct lfsck_layout     *lo;
2836         struct dt_object        *root = NULL;
2837         struct dt_object        *obj;
2838         int                      rc;
2839         ENTRY;
2840
2841         OBD_ALLOC_PTR(com);
2842         if (com == NULL)
2843                 RETURN(-ENOMEM);
2844
2845         INIT_LIST_HEAD(&com->lc_link);
2846         INIT_LIST_HEAD(&com->lc_link_dir);
2847         init_rwsem(&com->lc_sem);
2848         atomic_set(&com->lc_ref, 1);
2849         com->lc_lfsck = lfsck;
2850         com->lc_type = LT_LAYOUT;
2851         if (lfsck->li_master) {
2852                 struct lfsck_layout_master_data *llmd;
2853
2854                 com->lc_ops = &lfsck_layout_master_ops;
2855                 OBD_ALLOC_PTR(llmd);
2856                 if (llmd == NULL)
2857                         GOTO(out, rc = -ENOMEM);
2858
2859                 INIT_LIST_HEAD(&llmd->llmd_req_list);
2860                 spin_lock_init(&llmd->llmd_lock);
2861                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
2862                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
2863                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
2864                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
2865                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
2866                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
2867                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
2868                 com->lc_data = llmd;
2869         } else {
2870                 struct lfsck_layout_slave_data *llsd;
2871
2872                 com->lc_ops = &lfsck_layout_slave_ops;
2873                 OBD_ALLOC_PTR(llsd);
2874                 if (llsd == NULL)
2875                         GOTO(out, rc = -ENOMEM);
2876
2877                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
2878                 INIT_LIST_HEAD(&llsd->llsd_master_list);
2879                 spin_lock_init(&llsd->llsd_lock);
2880                 com->lc_data = llsd;
2881         }
2882         com->lc_file_size = sizeof(*lo);
2883         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
2884         if (com->lc_file_ram == NULL)
2885                 GOTO(out, rc = -ENOMEM);
2886
2887         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
2888         if (com->lc_file_disk == NULL)
2889                 GOTO(out, rc = -ENOMEM);
2890
2891         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
2892         if (IS_ERR(root))
2893                 GOTO(out, rc = PTR_ERR(root));
2894
2895         if (unlikely(!dt_try_as_dir(env, root)))
2896                 GOTO(out, rc = -ENOTDIR);
2897
2898         obj = local_file_find_or_create(env, lfsck->li_los, root,
2899                                         lfsck_layout_name,
2900                                         S_IFREG | S_IRUGO | S_IWUSR);
2901         if (IS_ERR(obj))
2902                 GOTO(out, rc = PTR_ERR(obj));
2903
2904         com->lc_obj = obj;
2905         rc = lfsck_layout_load(env, com);
2906         if (rc > 0)
2907                 rc = lfsck_layout_reset(env, com, true);
2908         else if (rc == -ENOENT)
2909                 rc = lfsck_layout_init(env, com);
2910
2911         if (rc != 0)
2912                 GOTO(out, rc);
2913
2914         lo = com->lc_file_ram;
2915         switch (lo->ll_status) {
2916         case LS_INIT:
2917         case LS_COMPLETED:
2918         case LS_FAILED:
2919         case LS_STOPPED:
2920         case LS_PARTIAL:
2921                 spin_lock(&lfsck->li_lock);
2922                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2923                 spin_unlock(&lfsck->li_lock);
2924                 break;
2925         default:
2926                 CERROR("%s: unknown lfsck_layout status: rc = %u\n",
2927                        lfsck_lfsck2name(lfsck), lo->ll_status);
2928                 /* fall through */
2929         case LS_SCANNING_PHASE1:
2930         case LS_SCANNING_PHASE2:
2931                 /* No need to store the status to disk right now.
2932                  * If the system crashed before the status stored,
2933                  * it will be loaded back when next time. */
2934                 lo->ll_status = LS_CRASHED;
2935                 lo->ll_flags |= LF_INCOMPLETE;
2936                 /* fall through */
2937         case LS_PAUSED:
2938         case LS_CRASHED:
2939         case LS_CO_FAILED:
2940         case LS_CO_STOPPED:
2941         case LS_CO_PAUSED:
2942                 spin_lock(&lfsck->li_lock);
2943                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
2944                 spin_unlock(&lfsck->li_lock);
2945                 break;
2946         }
2947
2948         if (lo->ll_flags & LF_CRASHED_LASTID) {
2949                 LASSERT(lfsck->li_out_notify != NULL);
2950
2951                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
2952                                      LE_LASTID_REBUILDING);
2953         }
2954
2955         GOTO(out, rc = 0);
2956
2957 out:
2958         if (root != NULL && !IS_ERR(root))
2959                 lu_object_put(env, &root->do_lu);
2960
2961         if (rc != 0)
2962                 lfsck_component_cleanup(env, com);
2963
2964         return rc;
2965 }