Whamcloud - gitweb
LU-8301 lfsck: handle ROOT fid properly
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2015, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <linux/kthread.h>
34 #include <linux/sched.h>
35 #include <linux/list.h>
36 #include <lu_object.h>
37 #include <dt_object.h>
38 #include <md_object.h>
39 #include <lustre_fld.h>
40 #include <lustre_lib.h>
41 #include <lustre_net.h>
42 #include <lustre_lfsck.h>
43 #include <lustre/lustre_lfsck_user.h>
44
45 #include "lfsck_internal.h"
46
47 #define LFSCK_CHECKPOINT_SKIP   1
48
49 /* define lfsck thread key */
50 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
51
52 static void lfsck_key_fini(const struct lu_context *ctx,
53                            struct lu_context_key *key, void *data)
54 {
55         struct lfsck_thread_info *info = data;
56
57         lu_buf_free(&info->lti_linkea_buf);
58         lu_buf_free(&info->lti_linkea_buf2);
59         lu_buf_free(&info->lti_big_buf);
60         OBD_FREE_PTR(info);
61 }
62
63 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
64 LU_KEY_INIT_GENERIC(lfsck);
65
66 static struct list_head lfsck_instance_list;
67 static struct list_head lfsck_ost_orphan_list;
68 static struct list_head lfsck_mdt_orphan_list;
69 static DEFINE_SPINLOCK(lfsck_instance_lock);
70
71 const char *lfsck_flags_names[] = {
72         "scanned-once",
73         "inconsistent",
74         "upgrade",
75         "incomplete",
76         "crashed_lastid",
77         NULL
78 };
79
80 const char *lfsck_param_names[] = {
81         NULL,
82         "failout",
83         "dryrun",
84         "all_targets",
85         "broadcast",
86         "orphan",
87         "create_ostobj",
88         "create_mdtobj",
89         NULL
90 };
91
92 enum lfsck_verify_lpf_types {
93         LVLT_BY_BOOKMARK        = 0,
94         LVLT_BY_NAMEENTRY       = 1,
95 };
96
97 static inline void
98 lfsck_reset_ltd_status(struct lfsck_tgt_desc *ltd, enum lfsck_type type)
99 {
100         if (type == LFSCK_TYPE_LAYOUT) {
101                 ltd->ltd_layout_status = LS_MAX;
102                 ltd->ltd_layout_repaired = 0;
103         } else {
104                 ltd->ltd_namespace_status = LS_MAX;
105                 ltd->ltd_namespace_repaired = 0;
106         }
107 }
108
109 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
110 {
111         spin_lock_init(&ltds->ltd_lock);
112         init_rwsem(&ltds->ltd_rw_sem);
113         INIT_LIST_HEAD(&ltds->ltd_orphan);
114         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
115         if (ltds->ltd_tgts_bitmap == NULL)
116                 return -ENOMEM;
117
118         return 0;
119 }
120
121 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
122 {
123         struct lfsck_tgt_desc   *ltd;
124         struct lfsck_tgt_desc   *next;
125         int                      idx;
126
127         down_write(&ltds->ltd_rw_sem);
128
129         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
130                                  ltd_orphan_list) {
131                 list_del_init(&ltd->ltd_orphan_list);
132                 lfsck_tgt_put(ltd);
133         }
134
135         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
136                 up_write(&ltds->ltd_rw_sem);
137
138                 return;
139         }
140
141         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
142                 ltd = lfsck_ltd2tgt(ltds, idx);
143                 if (likely(ltd != NULL)) {
144                         LASSERT(list_empty(&ltd->ltd_layout_list));
145                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
146                         LASSERT(list_empty(&ltd->ltd_namespace_list));
147                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
148
149                         ltds->ltd_tgtnr--;
150                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
151                         lfsck_assign_tgt(ltds, NULL, idx);
152                         lfsck_tgt_put(ltd);
153                 }
154         }
155
156         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
157                  ltds->ltd_tgtnr);
158
159         for (idx = 0; idx < TGT_PTRS; idx++) {
160                 if (ltds->ltd_tgts_idx[idx] != NULL) {
161                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
162                         ltds->ltd_tgts_idx[idx] = NULL;
163                 }
164         }
165
166         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
167         ltds->ltd_tgts_bitmap = NULL;
168         up_write(&ltds->ltd_rw_sem);
169 }
170
171 static int __lfsck_add_target(const struct lu_env *env,
172                               struct lfsck_instance *lfsck,
173                               struct lfsck_tgt_desc *ltd,
174                               bool for_ost, bool locked)
175 {
176         struct lfsck_tgt_descs *ltds;
177         __u32                   index = ltd->ltd_index;
178         int                     rc    = 0;
179         ENTRY;
180
181         if (for_ost)
182                 ltds = &lfsck->li_ost_descs;
183         else
184                 ltds = &lfsck->li_mdt_descs;
185
186         if (!locked)
187                 down_write(&ltds->ltd_rw_sem);
188
189         LASSERT(ltds->ltd_tgts_bitmap != NULL);
190
191         if (index >= ltds->ltd_tgts_bitmap->size) {
192                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
193                                     (__u32)BITS_PER_LONG);
194                 struct cfs_bitmap *old_bitmap = ltds->ltd_tgts_bitmap;
195                 struct cfs_bitmap *new_bitmap;
196
197                 while (newsize < index + 1)
198                         newsize <<= 1;
199
200                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
201                 if (new_bitmap == NULL)
202                         GOTO(unlock, rc = -ENOMEM);
203
204                 if (ltds->ltd_tgtnr > 0)
205                         cfs_bitmap_copy(new_bitmap, old_bitmap);
206                 ltds->ltd_tgts_bitmap = new_bitmap;
207                 CFS_FREE_BITMAP(old_bitmap);
208         }
209
210         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
211                 CERROR("%s: the device %s (%u) is registered already\n",
212                        lfsck_lfsck2name(lfsck),
213                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
214                 GOTO(unlock, rc = -EEXIST);
215         }
216
217         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
218                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
219                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
220                         GOTO(unlock, rc = -ENOMEM);
221         }
222
223         lfsck_assign_tgt(ltds, ltd, index);
224         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
225         ltds->ltd_tgtnr++;
226
227         GOTO(unlock, rc = 0);
228
229 unlock:
230         if (!locked)
231                 up_write(&ltds->ltd_rw_sem);
232
233         return rc;
234 }
235
236 static int lfsck_add_target_from_orphan(const struct lu_env *env,
237                                         struct lfsck_instance *lfsck)
238 {
239         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
240         struct lfsck_tgt_desc   *ltd;
241         struct lfsck_tgt_desc   *next;
242         struct list_head        *head    = &lfsck_ost_orphan_list;
243         int                      rc;
244         bool                     for_ost = true;
245
246 again:
247         spin_lock(&lfsck_instance_lock);
248         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
249                 if (ltd->ltd_key == lfsck->li_bottom)
250                         list_move_tail(&ltd->ltd_orphan_list,
251                                        &ltds->ltd_orphan);
252         }
253         spin_unlock(&lfsck_instance_lock);
254
255         down_write(&ltds->ltd_rw_sem);
256         while (!list_empty(&ltds->ltd_orphan)) {
257                 ltd = list_entry(ltds->ltd_orphan.next,
258                                  struct lfsck_tgt_desc,
259                                  ltd_orphan_list);
260                 list_del_init(&ltd->ltd_orphan_list);
261                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
262                 /* Do not hold the semaphore for too long time. */
263                 up_write(&ltds->ltd_rw_sem);
264                 if (rc != 0)
265                         return rc;
266
267                 down_write(&ltds->ltd_rw_sem);
268         }
269         up_write(&ltds->ltd_rw_sem);
270
271         if (for_ost) {
272                 ltds = &lfsck->li_mdt_descs;
273                 head = &lfsck_mdt_orphan_list;
274                 for_ost = false;
275                 goto again;
276         }
277
278         return 0;
279 }
280
281 static inline struct lfsck_component *
282 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
283                        struct list_head *list)
284 {
285         struct lfsck_component *com;
286
287         list_for_each_entry(com, list, lc_link) {
288                 if (com->lc_type == type)
289                         return com;
290         }
291         return NULL;
292 }
293
294 struct lfsck_component *
295 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
296 {
297         struct lfsck_component *com;
298
299         spin_lock(&lfsck->li_lock);
300         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
301         if (com != NULL)
302                 goto unlock;
303
304         com = __lfsck_component_find(lfsck, type,
305                                      &lfsck->li_list_double_scan);
306         if (com != NULL)
307                 goto unlock;
308
309         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
310
311 unlock:
312         if (com != NULL)
313                 lfsck_component_get(com);
314         spin_unlock(&lfsck->li_lock);
315         return com;
316 }
317
318 void lfsck_component_cleanup(const struct lu_env *env,
319                              struct lfsck_component *com)
320 {
321         if (!list_empty(&com->lc_link))
322                 list_del_init(&com->lc_link);
323         if (!list_empty(&com->lc_link_dir))
324                 list_del_init(&com->lc_link_dir);
325
326         lfsck_component_put(env, com);
327 }
328
329 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
330                     struct lu_fid *fid, bool locked)
331 {
332         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
333         int                      rc = 0;
334         ENTRY;
335
336         if (!locked)
337                 mutex_lock(&lfsck->li_mutex);
338
339         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
340         if (rc >= 0) {
341                 bk->lb_last_fid = *fid;
342                 /* We do not care about whether the subsequent sub-operations
343                  * failed or not. The worst case is that one FID is lost that
344                  * is not a big issue for the LFSCK since it is relative rare
345                  * for LFSCK create. */
346                 rc = lfsck_bookmark_store(env, lfsck);
347         }
348
349         if (!locked)
350                 mutex_unlock(&lfsck->li_mutex);
351
352         RETURN(rc);
353 }
354
355 static int __lfsck_ibits_lock(const struct lu_env *env,
356                               struct lfsck_instance *lfsck,
357                               struct dt_object *obj, struct ldlm_res_id *resid,
358                               struct lustre_handle *lh, __u64 bits,
359                               enum ldlm_mode mode)
360 {
361         struct lfsck_thread_info        *info   = lfsck_env_info(env);
362         union ldlm_policy_data          *policy = &info->lti_policy;
363         __u64                            flags  = LDLM_FL_ATOMIC_CB;
364         int                              rc;
365
366         LASSERT(lfsck->li_namespace != NULL);
367
368         memset(policy, 0, sizeof(*policy));
369         policy->l_inodebits.bits = bits;
370         if (dt_object_remote(obj)) {
371                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
372
373                 memset(einfo, 0, sizeof(*einfo));
374                 einfo->ei_type = LDLM_IBITS;
375                 einfo->ei_mode = mode;
376                 einfo->ei_cb_bl = ldlm_blocking_ast;
377                 einfo->ei_cb_cp = ldlm_completion_ast;
378                 einfo->ei_res_id = resid;
379
380                 rc = dt_object_lock(env, obj, lh, einfo, policy);
381         } else {
382                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
383                                             LDLM_IBITS, policy, mode,
384                                             &flags, ldlm_blocking_ast,
385                                             ldlm_completion_ast, NULL, NULL,
386                                             0, LVB_T_NONE, NULL, lh);
387         }
388
389         if (rc == ELDLM_OK) {
390                 rc = 0;
391         } else {
392                 memset(lh, 0, sizeof(*lh));
393                 rc = -EIO;
394         }
395
396         return rc;
397 }
398
399 /**
400  * Request the specified ibits lock for the given object.
401  *
402  * Before the LFSCK modifying on the namespace visible object,
403  * it needs to acquire related ibits ldlm lock.
404  *
405  * \param[in] env       pointer to the thread context
406  * \param[in] lfsck     pointer to the lfsck instance
407  * \param[in] obj       pointer to the dt_object to be locked
408  * \param[out] lh       pointer to the lock handle
409  * \param[in] bits      the bits for the ldlm lock to be acquired
410  * \param[in] mode      the mode for the ldlm lock to be acquired
411  *
412  * \retval              0 for success
413  * \retval              negative error number on failure
414  */
415 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
416                      struct dt_object *obj, struct lustre_handle *lh,
417                      __u64 bits, enum ldlm_mode mode)
418 {
419         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
420
421         LASSERT(!lustre_handle_is_used(lh));
422
423         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
424         return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
425 }
426
427 /**
428  * Release the the specified ibits lock.
429  *
430  * If the lock has been acquired before, release it
431  * and cleanup the handle. Otherwise, do nothing.
432  *
433  * \param[in] lh        pointer to the lock handle
434  * \param[in] mode      the mode for the ldlm lock to be released
435  */
436 void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
437 {
438         if (lustre_handle_is_used(lh)) {
439                 ldlm_lock_decref(lh, mode);
440                 memset(lh, 0, sizeof(*lh));
441         }
442 }
443
444 /**
445  * Request compound ibits locks for the given <obj, name> pairs.
446  *
447  * Before the LFSCK modifying on the namespace visible object, it needs to
448  * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
449  * the lock purpose. But the simple lfsck_ibits_lock for directory-based
450  * modificationis (such as insert name entry to the directory) may be too
451  * coarse-grained and not efficient.
452  *
453  * The lfsck_lock() will request compound ibits locks on the specified
454  * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
455  * lock on the directory object, and the regular ibits lock on the name hash.
456  *
457  * \param[in] env       pointer to the thread context
458  * \param[in] lfsck     pointer to the lfsck instance
459  * \param[in] obj       pointer to the dt_object to be locked
460  * \param[in] name      used for building the PDO lock resource
461  * \param[out] llh      pointer to the lfsck_lock_handle
462  * \param[in] bits      the bits for the ldlm lock to be acquired
463  * \param[in] mode      the mode for the ldlm lock to be acquired
464  *
465  * \retval              0 for success
466  * \retval              negative error number on failure
467  */
468 int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
469                struct dt_object *obj, const char *name,
470                struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode)
471 {
472         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
473         int                 rc;
474
475         LASSERT(S_ISDIR(lfsck_object_type(obj)));
476         LASSERT(name != NULL);
477         LASSERT(name[0] != 0);
478         LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
479         LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
480
481         switch (mode) {
482         case LCK_EX:
483                 llh->llh_pdo_mode = LCK_EX;
484                 break;
485         case LCK_PW:
486                 llh->llh_pdo_mode = LCK_CW;
487                 break;
488         case LCK_PR:
489                 llh->llh_pdo_mode = LCK_CR;
490                 break;
491         default:
492                 CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
493                        DFID"\n", lfsck_lfsck2name(lfsck), mode,
494                        PFID(lfsck_dto2fid(obj)));
495                 LBUG();
496         }
497
498         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
499         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
500                                 MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
501         if (rc != 0)
502                 return rc;
503
504         llh->llh_reg_mode = mode;
505         resid->name[LUSTRE_RES_ID_HSH_OFF] = full_name_hash(name, strlen(name));
506         LASSERT(resid->name[LUSTRE_RES_ID_HSH_OFF] != 0);
507         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
508                                 bits, llh->llh_reg_mode);
509         if (rc != 0)
510                 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
511
512         return rc;
513 }
514
515 /**
516  * Release the the compound ibits locks.
517  *
518  * \param[in] llh       pointer to the lfsck_lock_handle to be released
519  */
520 void lfsck_unlock(struct lfsck_lock_handle *llh)
521 {
522         lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
523         lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
524 }
525
526 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
527                               struct lfsck_instance *lfsck,
528                               const struct lu_fid *fid)
529 {
530         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
531         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
532         int                      rc;
533
534         if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
535                 /* "ROOT" is always on the MDT0. */
536                 if (lu_fid_eq(fid, &lfsck->li_global_root_fid))
537                         return 0;
538
539                 return lfsck_dev_idx(lfsck);
540         }
541
542         fld_range_set_mdt(range);
543         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
544         if (rc == 0)
545                 rc = range->lsr_index;
546
547         return rc;
548 }
549
550 const char dot[] = ".";
551 const char dotdot[] = "..";
552 static const char dotlustre[] = ".lustre";
553 static const char lostfound[] = "lost+found";
554
555 /**
556  * Remove the name entry from the .lustre/lost+found directory.
557  *
558  * No need to care about the object referenced by the name entry,
559  * either the name entry is invalid or redundant, or the referenced
560  * object has been processed or will be handled by others.
561  *
562  * \param[in] env       pointer to the thread context
563  * \param[in] lfsck     pointer to the lfsck instance
564  * \param[in] name      the name for the name entry to be removed
565  *
566  * \retval              0 for success
567  * \retval              negative error number on failure
568  */
569 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
570                                        struct lfsck_instance *lfsck,
571                                        const char *name)
572 {
573         struct dt_object        *parent = lfsck->li_lpf_root_obj;
574         struct dt_device        *dev    = lfsck_obj2dev(parent);
575         struct thandle          *th;
576         struct lfsck_lock_handle *llh   = &lfsck_env_info(env)->lti_llh;
577         int                      rc;
578         ENTRY;
579
580         rc = lfsck_lock(env, lfsck, parent, name, llh,
581                         MDS_INODELOCK_UPDATE, LCK_PW);
582         if (rc != 0)
583                 RETURN(rc);
584
585         th = dt_trans_create(env, dev);
586         if (IS_ERR(th))
587                 GOTO(unlock, rc = PTR_ERR(th));
588
589         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
590         if (rc != 0)
591                 GOTO(stop, rc);
592
593         rc = dt_declare_ref_del(env, parent, th);
594         if (rc != 0)
595                 GOTO(stop, rc);
596
597         rc = dt_trans_start_local(env, dev, th);
598         if (rc != 0)
599                 GOTO(stop, rc);
600
601         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
602         if (rc != 0)
603                 GOTO(stop, rc);
604
605         dt_write_lock(env, parent, 0);
606         rc = dt_ref_del(env, parent, th);
607         dt_write_unlock(env, parent);
608
609         GOTO(stop, rc);
610
611 stop:
612         dt_trans_stop(env, dev, th);
613
614 unlock:
615         lfsck_unlock(llh);
616
617         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
618                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
619
620         return rc;
621 }
622
623 static int lfsck_create_lpf_local(const struct lu_env *env,
624                                   struct lfsck_instance *lfsck,
625                                   struct dt_object *child,
626                                   struct lu_attr *la,
627                                   struct dt_object_format *dof,
628                                   const char *name)
629 {
630         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
631         struct dt_object        *parent = lfsck->li_lpf_root_obj;
632         struct dt_device        *dev    = lfsck_obj2dev(child);
633         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
634         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
635         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
636         struct thandle          *th     = NULL;
637         struct linkea_data       ldata  = { NULL };
638         struct lu_buf            linkea_buf;
639         const struct lu_name    *cname;
640         loff_t                   pos    = 0;
641         int                      len    = sizeof(struct lfsck_bookmark);
642         int                      rc;
643         ENTRY;
644
645         rc = linkea_data_new(&ldata,
646                              &lfsck_env_info(env)->lti_linkea_buf2);
647         if (rc != 0)
648                 RETURN(rc);
649
650         cname = lfsck_name_get_const(env, name, strlen(name));
651         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
652         if (rc != 0)
653                 RETURN(rc);
654
655         th = dt_trans_create(env, dev);
656         if (IS_ERR(th))
657                 RETURN(PTR_ERR(th));
658
659         /* 1a. create child */
660         rc = dt_declare_create(env, child, la, NULL, dof, th);
661         if (rc != 0)
662                 GOTO(stop, rc);
663
664         if (!dt_try_as_dir(env, child))
665                 GOTO(stop, rc = -ENOTDIR);
666
667         /* 2a. increase child nlink */
668         rc = dt_declare_ref_add(env, child, th);
669         if (rc != 0)
670                 GOTO(stop, rc);
671
672         /* 3a. insert dot into child dir */
673         rec->rec_type = S_IFDIR;
674         rec->rec_fid = cfid;
675         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
676                                (const struct dt_key *)dot, th);
677         if (rc != 0)
678                 GOTO(stop, rc);
679
680         /* 4a. insert dotdot into child dir */
681         rec->rec_fid = &LU_LPF_FID;
682         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
683                                (const struct dt_key *)dotdot, th);
684         if (rc != 0)
685                 GOTO(stop, rc);
686
687         /* 5a. insert linkEA for child */
688         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
689                        ldata.ld_leh->leh_len);
690         rc = dt_declare_xattr_set(env, child, &linkea_buf,
691                                   XATTR_NAME_LINK, 0, th);
692         if (rc != 0)
693                 GOTO(stop, rc);
694
695         /* 6a. insert name into parent dir */
696         rec->rec_type = S_IFDIR;
697         rec->rec_fid = cfid;
698         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
699                                (const struct dt_key *)name, th);
700         if (rc != 0)
701                 GOTO(stop, rc);
702
703         /* 7a. increase parent nlink */
704         rc = dt_declare_ref_add(env, parent, th);
705         if (rc != 0)
706                 GOTO(stop, rc);
707
708         /* 8a. update bookmark */
709         rc = dt_declare_record_write(env, bk_obj,
710                                      lfsck_buf_get(env, bk, len), 0, th);
711         if (rc != 0)
712                 GOTO(stop, rc);
713
714         rc = dt_trans_start_local(env, dev, th);
715         if (rc != 0)
716                 GOTO(stop, rc);
717
718         dt_write_lock(env, child, 0);
719         /* 1b. create child */
720         rc = dt_create(env, child, la, NULL, dof, th);
721         if (rc != 0)
722                 GOTO(unlock, rc);
723
724         /* 2b. increase child nlink */
725         rc = dt_ref_add(env, child, th);
726         if (rc != 0)
727                 GOTO(unlock, rc);
728
729         /* 3b. insert dot into child dir */
730         rec->rec_fid = cfid;
731         rc = dt_insert(env, child, (const struct dt_rec *)rec,
732                        (const struct dt_key *)dot, th, 1);
733         if (rc != 0)
734                 GOTO(unlock, rc);
735
736         /* 4b. insert dotdot into child dir */
737         rec->rec_fid = &LU_LPF_FID;
738         rc = dt_insert(env, child, (const struct dt_rec *)rec,
739                        (const struct dt_key *)dotdot, th, 1);
740         if (rc != 0)
741                 GOTO(unlock, rc);
742
743         /* 5b. insert linkEA for child. */
744         rc = dt_xattr_set(env, child, &linkea_buf,
745                           XATTR_NAME_LINK, 0, th);
746         dt_write_unlock(env, child);
747         if (rc != 0)
748                 GOTO(stop, rc);
749
750         /* 6b. insert name into parent dir */
751         rec->rec_fid = cfid;
752         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
753                        (const struct dt_key *)name, th, 1);
754         if (rc != 0)
755                 GOTO(stop, rc);
756
757         dt_write_lock(env, parent, 0);
758         /* 7b. increase parent nlink */
759         rc = dt_ref_add(env, parent, th);
760         dt_write_unlock(env, parent);
761         if (rc != 0)
762                 GOTO(stop, rc);
763
764         bk->lb_lpf_fid = *cfid;
765         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
766
767         /* 8b. update bookmark */
768         rc = dt_record_write(env, bk_obj,
769                              lfsck_buf_get(env, bk, len), &pos, th);
770
771         GOTO(stop, rc);
772
773 unlock:
774         dt_write_unlock(env, child);
775
776 stop:
777         dt_trans_stop(env, dev, th);
778
779         return rc;
780 }
781
782 static int lfsck_create_lpf_remote(const struct lu_env *env,
783                                    struct lfsck_instance *lfsck,
784                                    struct dt_object *child,
785                                    struct lu_attr *la,
786                                    struct dt_object_format *dof,
787                                    const char *name)
788 {
789         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
790         struct dt_object        *parent = lfsck->li_lpf_root_obj;
791         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
792         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
793         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
794         struct thandle          *th     = NULL;
795         struct linkea_data       ldata  = { NULL };
796         struct lu_buf            linkea_buf;
797         const struct lu_name    *cname;
798         struct dt_device        *dev;
799         loff_t                   pos    = 0;
800         int                      len    = sizeof(struct lfsck_bookmark);
801         int                      rc;
802         ENTRY;
803
804         rc = linkea_data_new(&ldata,
805                              &lfsck_env_info(env)->lti_linkea_buf2);
806         if (rc != 0)
807                 RETURN(rc);
808
809         cname = lfsck_name_get_const(env, name, strlen(name));
810         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
811         if (rc != 0)
812                 RETURN(rc);
813
814         /* Create .lustre/lost+found/MDTxxxx. */
815
816         /* XXX: Currently, cross-MDT create operation needs to create the child
817          *      object firstly, then insert name into the parent directory. For
818          *      this case, the child object resides on current MDT (local), but
819          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
820          *      easy to contain all the sub-modifications orderly within single
821          *      transaction.
822          *
823          *      To avoid more inconsistency, we split the create operation into
824          *      two transactions:
825          *
826          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
827          *         locally.
828          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
829          *         remotely.
830          *
831          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
832          *      repair such inconsistency when LFSCK run next time. */
833
834         /* Transaction I: locally */
835
836         dev = lfsck_obj2dev(child);
837         th = dt_trans_create(env, dev);
838         if (IS_ERR(th))
839                 RETURN(PTR_ERR(th));
840
841         /* 1a. create child */
842         rc = dt_declare_create(env, child, la, NULL, dof, th);
843         if (rc != 0)
844                 GOTO(stop, rc);
845
846         if (!dt_try_as_dir(env, child))
847                 GOTO(stop, rc = -ENOTDIR);
848
849         /* 2a. increase child nlink */
850         rc = dt_declare_ref_add(env, child, th);
851         if (rc != 0)
852                 GOTO(stop, rc);
853
854         /* 3a. insert dot into child dir */
855         rec->rec_type = S_IFDIR;
856         rec->rec_fid = cfid;
857         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
858                                (const struct dt_key *)dot, th);
859         if (rc != 0)
860                 GOTO(stop, rc);
861
862         /* 4a. insert dotdot into child dir */
863         rec->rec_fid = &LU_LPF_FID;
864         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
865                                (const struct dt_key *)dotdot, th);
866         if (rc != 0)
867                 GOTO(stop, rc);
868
869         /* 5a. insert linkEA for child */
870         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
871                        ldata.ld_leh->leh_len);
872         rc = dt_declare_xattr_set(env, child, &linkea_buf,
873                                   XATTR_NAME_LINK, 0, th);
874         if (rc != 0)
875                 GOTO(stop, rc);
876
877         /* 6a. update bookmark */
878         rc = dt_declare_record_write(env, bk_obj,
879                                      lfsck_buf_get(env, bk, len), 0, th);
880         if (rc != 0)
881                 GOTO(stop, rc);
882
883         rc = dt_trans_start_local(env, dev, th);
884         if (rc != 0)
885                 GOTO(stop, rc);
886
887         dt_write_lock(env, child, 0);
888         /* 1b. create child */
889         rc = dt_create(env, child, la, NULL, dof, th);
890         if (rc != 0)
891                 GOTO(unlock, rc);
892
893         /* 2b. increase child nlink */
894         rc = dt_ref_add(env, child, th);
895         if (rc != 0)
896                 GOTO(unlock, rc);
897
898         /* 3b. insert dot into child dir */
899         rec->rec_type = S_IFDIR;
900         rec->rec_fid = cfid;
901         rc = dt_insert(env, child, (const struct dt_rec *)rec,
902                        (const struct dt_key *)dot, th, 1);
903         if (rc != 0)
904                 GOTO(unlock, rc);
905
906         /* 4b. insert dotdot into child dir */
907         rec->rec_fid = &LU_LPF_FID;
908         rc = dt_insert(env, child, (const struct dt_rec *)rec,
909                        (const struct dt_key *)dotdot, th, 1);
910         if (rc != 0)
911                 GOTO(unlock, rc);
912
913         /* 5b. insert linkEA for child */
914         rc = dt_xattr_set(env, child, &linkea_buf,
915                           XATTR_NAME_LINK, 0, th);
916         if (rc != 0)
917                 GOTO(unlock, rc);
918
919         bk->lb_lpf_fid = *cfid;
920         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
921
922         /* 6b. update bookmark */
923         rc = dt_record_write(env, bk_obj,
924                              lfsck_buf_get(env, bk, len), &pos, th);
925
926         dt_write_unlock(env, child);
927         dt_trans_stop(env, dev, th);
928         if (rc != 0)
929                 RETURN(rc);
930
931         /* Transaction II: remotely */
932
933         dev = lfsck_obj2dev(parent);
934         th = dt_trans_create(env, dev);
935         if (IS_ERR(th))
936                 RETURN(PTR_ERR(th));
937
938         th->th_sync = 1;
939         /* 5a. insert name into parent dir */
940         rec->rec_fid = cfid;
941         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
942                                (const struct dt_key *)name, th);
943         if (rc != 0)
944                 GOTO(stop, rc);
945
946         /* 6a. increase parent nlink */
947         rc = dt_declare_ref_add(env, parent, th);
948         if (rc != 0)
949                 GOTO(stop, rc);
950
951         rc = dt_trans_start_local(env, dev, th);
952         if (rc != 0)
953                 GOTO(stop, rc);
954
955         /* 5b. insert name into parent dir */
956         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
957                        (const struct dt_key *)name, th, 1);
958         if (rc != 0)
959                 GOTO(stop, rc);
960
961         dt_write_lock(env, parent, 0);
962         /* 6b. increase parent nlink */
963         rc = dt_ref_add(env, parent, th);
964         dt_write_unlock(env, parent);
965
966         GOTO(stop, rc);
967
968 unlock:
969         dt_write_unlock(env, child);
970 stop:
971         dt_trans_stop(env, dev, th);
972
973         if (rc != 0 && dev == lfsck_obj2dev(parent))
974                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
975                        "for orphans, but failed to insert the name %s "
976                        "to the .lustre/lost+found/. Such inconsistency "
977                        "will be repaired when LFSCK run next time: rc = %d\n",
978                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
979
980         return rc;
981 }
982
983 /**
984  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
985  *
986  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
987  * orphans and other uncertain inconsistent objects found during the
988  * LFSCK. Such directory will be created by the LFSCK engine on the
989  * local MDT before the LFSCK scanning.
990  *
991  * \param[in] env       pointer to the thread context
992  * \param[in] lfsck     pointer to the lfsck instance
993  *
994  * \retval              0 for success
995  * \retval              negative error number on failure
996  */
997 static int lfsck_create_lpf(const struct lu_env *env,
998                             struct lfsck_instance *lfsck)
999 {
1000         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
1001         struct lfsck_thread_info *info  = lfsck_env_info(env);
1002         struct lu_fid            *cfid  = &info->lti_fid2;
1003         struct lu_attr           *la    = &info->lti_la;
1004         struct dt_object_format  *dof   = &info->lti_dof;
1005         struct dt_object         *parent = lfsck->li_lpf_root_obj;
1006         struct dt_object         *child = NULL;
1007         struct lfsck_lock_handle *llh   = &info->lti_llh;
1008         char                      name[8];
1009         int                       node  = lfsck_dev_idx(lfsck);
1010         int                       rc    = 0;
1011         ENTRY;
1012
1013         LASSERT(lfsck->li_master);
1014         LASSERT(parent != NULL);
1015         LASSERT(lfsck->li_lpf_obj == NULL);
1016
1017         snprintf(name, 8, "MDT%04x", node);
1018         rc = lfsck_lock(env, lfsck, parent, name, llh,
1019                         MDS_INODELOCK_UPDATE, LCK_PW);
1020         if (rc != 0)
1021                 RETURN(rc);
1022
1023         if (fid_is_zero(&bk->lb_lpf_fid)) {
1024                 /* There is corner case that: in former LFSCK scanning we have
1025                  * created the .lustre/lost+found/MDTxxxx but failed to update
1026                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
1027                  * it from MDT0 firstly. */
1028                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1029                                (const struct dt_key *)name);
1030                 if (rc != 0 && rc != -ENOENT)
1031                         GOTO(unlock, rc);
1032
1033                 if (rc == 0) {
1034                         bk->lb_lpf_fid = *cfid;
1035                         rc = lfsck_bookmark_store(env, lfsck);
1036                 } else {
1037                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
1038                 }
1039                 if (rc != 0)
1040                         GOTO(unlock, rc);
1041         } else {
1042                 *cfid = bk->lb_lpf_fid;
1043         }
1044
1045         child = lfsck_object_find_bottom_new(env, lfsck, cfid);
1046         if (IS_ERR(child))
1047                 GOTO(unlock, rc = PTR_ERR(child));
1048
1049         if (dt_object_exists(child) != 0) {
1050                 if (unlikely(!dt_try_as_dir(env, child)))
1051                         rc = -ENOTDIR;
1052                 else
1053                         lfsck->li_lpf_obj = child;
1054
1055                 GOTO(unlock, rc);
1056         }
1057
1058         memset(la, 0, sizeof(*la));
1059         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
1060         la->la_mode = S_IFDIR | S_IRWXU;
1061         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1062                        LA_UID | LA_GID;
1063         memset(dof, 0, sizeof(*dof));
1064         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1065
1066         if (node == 0)
1067                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
1068         else
1069                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
1070         if (rc == 0)
1071                 lfsck->li_lpf_obj = child;
1072
1073         GOTO(unlock, rc);
1074
1075 unlock:
1076         lfsck_unlock(llh);
1077         if (rc != 0 && child != NULL && !IS_ERR(child))
1078                 lfsck_object_put(env, child);
1079
1080         return rc;
1081 }
1082
1083 /**
1084  * Scan .lustre/lost+found for bad name entries and remove them.
1085  *
1086  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
1087  * index in the system. Any other formatted name is invalid and should be
1088  * removed.
1089  *
1090  * \param[in] env       pointer to the thread context
1091  * \param[in] lfsck     pointer to the lfsck instance
1092  *
1093  * \retval              0 for success
1094  * \retval              negative error number on failure
1095  */
1096 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
1097                                       struct lfsck_instance *lfsck)
1098 {
1099         struct dt_object        *parent = lfsck->li_lpf_root_obj;
1100         struct lu_dirent        *ent    =
1101                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
1102         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
1103         struct dt_it            *it;
1104         int                      rc;
1105         ENTRY;
1106
1107         it = iops->init(env, parent, LUDA_64BITHASH);
1108         if (IS_ERR(it))
1109                 RETURN(PTR_ERR(it));
1110
1111         rc = iops->load(env, it, 0);
1112         if (rc == 0)
1113                 rc = iops->next(env, it);
1114         else if (rc > 0)
1115                 rc = 0;
1116
1117         while (rc == 0) {
1118                 int off = 3;
1119
1120                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1121                 if (rc != 0)
1122                         break;
1123
1124                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1125                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1126                         goto next;
1127
1128                 /* name length must be strlen("MDTxxxx") */
1129                 if (ent->lde_namelen != 7)
1130                         goto remove;
1131
1132                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1133                         goto remove;
1134
1135                 while (off < 7 && isxdigit(ent->lde_name[off]))
1136                         off++;
1137
1138                 if (off != 7) {
1139
1140 remove:
1141                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1142                                                          ent->lde_name);
1143                         if (rc != 0)
1144                                 break;
1145                 }
1146
1147 next:
1148                 rc = iops->next(env, it);
1149         }
1150
1151         iops->put(env, it);
1152         iops->fini(env, it);
1153
1154         RETURN(rc > 0 ? 0 : rc);
1155 }
1156
1157 static int lfsck_update_lpf_entry(const struct lu_env *env,
1158                                   struct lfsck_instance *lfsck,
1159                                   struct dt_object *parent,
1160                                   struct dt_object *child,
1161                                   const char *name,
1162                                   enum lfsck_verify_lpf_types type)
1163 {
1164         int rc;
1165
1166         if (type == LVLT_BY_BOOKMARK) {
1167                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1168                                              lfsck_dto2fid(child), S_IFDIR);
1169         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1170                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1171                 rc = lfsck_bookmark_store(env, lfsck);
1172
1173                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1174                        " in the bookmark file: rc = %d\n",
1175                        lfsck_lfsck2name(lfsck),
1176                        PFID(lfsck_dto2fid(child)), rc);
1177         }
1178
1179         return rc;
1180 }
1181
1182 /**
1183  * Check whether the @child back references the @parent.
1184  *
1185  * Two cases:
1186  * 1) The child's FID is stored in the bookmark file. If the child back
1187  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1188  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1189  *    the child back references another parent2, then:
1190  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1191  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1192  *      references the child. So keep them there. As the LFSCK processing,
1193  *      the parent3 may be found, then when the LFSCK run next time, the
1194  *      inconsistency can be repaired.
1195  *
1196  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1197  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1198  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1199  *    back references another parent2, then:
1200  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1201  *      from .lustre/lost+found/;
1202  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1203  *      sub-directory name entry and update the child;
1204  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1205  *      or not, then keep them there.
1206  *
1207  * \param[in] env       pointer to the thread context
1208  * \param[in] lfsck     pointer to the lfsck instance
1209  * \param[in] child     pointer to the lost+found sub-directory object
1210  * \param[in] name      the name for lost+found sub-directory object
1211  * \param[out] fid      pointer to the buffer to hold the FID of the object
1212  *                      (called it as parent2) that is referenced via the
1213  *                      child's dotdot entry; it also can be the FID that
1214  *                      is referenced by the name entry under the parent2.
1215  * \param[in] type      to indicate where the child's FID is stored in
1216  *
1217  * \retval              positive number for uncertain inconsistency
1218  * \retval              0 for success
1219  * \retval              negative error number on failure
1220  */
1221 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1222                                   struct lfsck_instance *lfsck,
1223                                   struct dt_object *child, const char *name,
1224                                   struct lu_fid *fid,
1225                                   enum lfsck_verify_lpf_types type)
1226 {
1227         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1228         struct lfsck_thread_info *info    = lfsck_env_info(env);
1229         char                     *name2   = info->lti_key;
1230         struct lu_fid            *fid2    = &info->lti_fid3;
1231         struct dt_object         *parent2 = NULL;
1232         struct lustre_handle      lh      = { 0 };
1233         int                       rc;
1234         ENTRY;
1235
1236         fid_zero(fid);
1237         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1238                        (const struct dt_key *)dotdot);
1239         if (rc != 0)
1240                 GOTO(linkea, rc);
1241
1242         if (!fid_is_sane(fid))
1243                 GOTO(linkea, rc = -EINVAL);
1244
1245         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1246                 const struct lu_name *cname;
1247
1248                 if (lfsck->li_lpf_obj == NULL) {
1249                         lu_object_get(&child->do_lu);
1250                         lfsck->li_lpf_obj = child;
1251                 }
1252
1253                 cname = lfsck_name_get_const(env, name, strlen(name));
1254                 rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID);
1255                 if (rc == 0)
1256                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1257                                                     name, type);
1258
1259                 GOTO(out_done, rc);
1260         }
1261
1262         parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1263         if (IS_ERR(parent2))
1264                 GOTO(linkea, parent2);
1265
1266         if (!dt_object_exists(parent2)) {
1267                 lfsck_object_put(env, parent2);
1268
1269                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1270         }
1271
1272         if (!dt_try_as_dir(env, parent2)) {
1273                 lfsck_object_put(env, parent2);
1274
1275                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1276         }
1277
1278 linkea:
1279         /* To prevent rename/unlink race */
1280         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1281                               MDS_INODELOCK_UPDATE, LCK_PR);
1282         if (rc != 0)
1283                 GOTO(out_put, rc);
1284
1285         dt_read_lock(env, child, 0);
1286         rc = lfsck_links_get_first(env, child, name2, fid2);
1287         if (rc != 0) {
1288                 dt_read_unlock(env, child);
1289                 lfsck_ibits_unlock(&lh, LCK_PR);
1290
1291                 GOTO(out_put, rc = 1);
1292         }
1293
1294         /* It is almost impossible that the bookmark file (or the name entry)
1295          * and the linkEA hit the same data corruption. Trust the linkEA. */
1296         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1297                 dt_read_unlock(env, child);
1298                 lfsck_ibits_unlock(&lh, LCK_PR);
1299
1300                 *fid = *fid2;
1301                 if (lfsck->li_lpf_obj == NULL) {
1302                         lu_object_get(&child->do_lu);
1303                         lfsck->li_lpf_obj = child;
1304                 }
1305
1306                 /* Update the child's dotdot entry */
1307                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1308                                              &LU_LPF_FID, S_IFDIR);
1309                 if (rc == 0)
1310                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1311                                                     name, type);
1312
1313                 GOTO(out_put, rc);
1314         }
1315
1316         if (parent2 == NULL || IS_ERR(parent2)) {
1317                 dt_read_unlock(env, child);
1318                 lfsck_ibits_unlock(&lh, LCK_PR);
1319
1320                 GOTO(out_done, rc = 1);
1321         }
1322
1323         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1324                        (const struct dt_key *)name2);
1325         dt_read_unlock(env, child);
1326         lfsck_ibits_unlock(&lh, LCK_PR);
1327         if (rc != 0 && rc != -ENOENT)
1328                 GOTO(out_put, rc);
1329
1330         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1331                 if (type == LVLT_BY_BOOKMARK)
1332                         GOTO(out_put, rc = 1);
1333
1334                 /* Trust the name entry, update the child's dotdot entry. */
1335                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1336                                              &LU_LPF_FID, S_IFDIR);
1337
1338                 GOTO(out_put, rc);
1339         }
1340
1341         if (type == LVLT_BY_BOOKMARK) {
1342                 /* Invalid FID record in the bookmark file, reset it. */
1343                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1344                 rc = lfsck_bookmark_store(env, lfsck);
1345
1346                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1347                        " in the bookmark file: rc = %d\n",
1348                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1349         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1350                 /* The name entry is wrong, remove it. */
1351                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1352         }
1353
1354         GOTO(out_put, rc);
1355
1356 out_put:
1357         if (parent2 != NULL && !IS_ERR(parent2))
1358                 lfsck_object_put(env, parent2);
1359
1360 out_done:
1361         return rc;
1362 }
1363
1364 /**
1365  * Verify the /ROOT/.lustre/lost+found/ directory.
1366  *
1367  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1368  * the LFSCK does not exactly know how to handle, such as orphans. So before
1369  * the LFSCK scanning the system, the consistency of such directory needs to
1370  * be verified firstly to allow the users to use it during the LFSCK.
1371  *
1372  * \param[in] env       pointer to the thread context
1373  * \param[in] lfsck     pointer to the lfsck instance
1374  *
1375  * \retval              positive number for uncertain inconsistency
1376  * \retval              0 for success
1377  * \retval              negative error number on failure
1378  */
1379 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1380 {
1381         struct lfsck_thread_info *info   = lfsck_env_info(env);
1382         struct lu_fid            *pfid   = &info->lti_fid;
1383         struct lu_fid            *cfid   = &info->lti_fid2;
1384         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1385         struct dt_object         *parent;
1386         /* child1's FID is in the bookmark file. */
1387         struct dt_object         *child1 = NULL;
1388         /* child2's FID is in the name entry MDTxxxx. */
1389         struct dt_object         *child2 = NULL;
1390         const struct lu_name     *cname;
1391         char                      name[8];
1392         int                       node   = lfsck_dev_idx(lfsck);
1393         int                       rc     = 0;
1394         ENTRY;
1395
1396         LASSERT(lfsck->li_master);
1397
1398         if (lfsck->li_lpf_root_obj != NULL)
1399                 RETURN(0);
1400
1401         if (node == 0) {
1402                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1403                                                   &LU_LPF_FID);
1404         } else {
1405                 struct lfsck_tgt_desc *ltd;
1406
1407                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1408                 if (unlikely(ltd == NULL))
1409                         RETURN(-ENXIO);
1410
1411                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1412                                                   &LU_LPF_FID);
1413                 lfsck_tgt_put(ltd);
1414         }
1415
1416         if (IS_ERR(parent))
1417                 RETURN(PTR_ERR(parent));
1418
1419         LASSERT(dt_object_exists(parent));
1420
1421         if (unlikely(!dt_try_as_dir(env, parent))) {
1422                 lfsck_object_put(env, parent);
1423
1424                 GOTO(put, rc = -ENOTDIR);
1425         }
1426
1427         lfsck->li_lpf_root_obj = parent;
1428         if (node == 0) {
1429                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1430                 if (rc != 0)
1431                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1432                                "for bad sub-directories: rc = %d\n",
1433                                lfsck_lfsck2name(lfsck), rc);
1434         }
1435
1436         /* child2 */
1437         snprintf(name, 8, "MDT%04x", node);
1438         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1439                        (const struct dt_key *)name);
1440         if (rc == -ENOENT) {
1441                 rc = 0;
1442                 goto find_child1;
1443         }
1444
1445         if (rc != 0)
1446                 GOTO(put, rc);
1447
1448         /* Invalid FID in the name entry, remove the name entry. */
1449         if (!fid_is_norm(cfid)) {
1450                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1451                 if (rc != 0)
1452                         GOTO(put, rc);
1453
1454                 goto find_child1;
1455         }
1456
1457         child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1458         if (IS_ERR(child2))
1459                 GOTO(put, rc = PTR_ERR(child2));
1460
1461         if (unlikely(!dt_object_exists(child2) ||
1462                      dt_object_remote(child2)) ||
1463                      !S_ISDIR(lfsck_object_type(child2))) {
1464                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1465                 if (rc != 0)
1466                         GOTO(put, rc);
1467
1468                 goto find_child1;
1469         }
1470
1471         if (unlikely(!dt_try_as_dir(env, child2))) {
1472                 lfsck_object_put(env, child2);
1473                 child2 = NULL;
1474                 rc = -ENOTDIR;
1475         }
1476
1477 find_child1:
1478         if (fid_is_zero(&bk->lb_lpf_fid))
1479                 goto check_child2;
1480
1481         if (likely(lu_fid_eq(cfid, &bk->lb_lpf_fid))) {
1482                 if (lfsck->li_lpf_obj == NULL) {
1483                         lu_object_get(&child2->do_lu);
1484                         lfsck->li_lpf_obj = child2;
1485                 }
1486
1487                 cname = lfsck_name_get_const(env, name, strlen(name));
1488                 rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID);
1489
1490                 GOTO(put, rc);
1491         }
1492
1493         if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1494                 struct lu_fid tfid = bk->lb_lpf_fid;
1495
1496                 /* Invalid FID record in the bookmark file, reset it. */
1497                 fid_zero(&bk->lb_lpf_fid);
1498                 rc = lfsck_bookmark_store(env, lfsck);
1499
1500                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1501                        " in the bookmark file: rc = %d\n",
1502                        lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1503
1504                 if (rc != 0)
1505                         GOTO(put, rc);
1506
1507                 goto check_child2;
1508         }
1509
1510         child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid);
1511         if (IS_ERR(child1)) {
1512                 child1 = NULL;
1513                 goto check_child2;
1514         }
1515
1516         if (unlikely(!dt_object_exists(child1) ||
1517                      dt_object_remote(child1)) ||
1518                      !S_ISDIR(lfsck_object_type(child1))) {
1519                 /* Invalid FID record in the bookmark file, reset it. */
1520                 fid_zero(&bk->lb_lpf_fid);
1521                 rc = lfsck_bookmark_store(env, lfsck);
1522
1523                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1524                        " in the bookmark file: rc = %d\n",
1525                        lfsck_lfsck2name(lfsck),
1526                        PFID(lfsck_dto2fid(child1)), rc);
1527
1528                 if (rc != 0)
1529                         GOTO(put, rc);
1530
1531                 lfsck_object_put(env, child1);
1532                 child1 = NULL;
1533                 goto check_child2;
1534         }
1535
1536         if (unlikely(!dt_try_as_dir(env, child1))) {
1537                 lfsck_object_put(env, child1);
1538                 child1 = NULL;
1539                 rc = -ENOTDIR;
1540                 goto check_child2;
1541         }
1542
1543         rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name, pfid,
1544                                     LVLT_BY_BOOKMARK);
1545         if (lu_fid_eq(pfid, &LU_LPF_FID))
1546                 GOTO(put, rc);
1547
1548 check_child2:
1549         if (child2 != NULL)
1550                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1551                                             pfid, LVLT_BY_NAMEENTRY);
1552
1553         GOTO(put, rc);
1554
1555 put:
1556         if (lfsck->li_lpf_obj != NULL) {
1557                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1558                         lfsck_object_put(env, lfsck->li_lpf_obj);
1559                         lfsck->li_lpf_obj = NULL;
1560                         rc = -ENOTDIR;
1561                 }
1562         } else if (rc == 0) {
1563                 rc = lfsck_create_lpf(env, lfsck);
1564         }
1565
1566         if (child2 != NULL && !IS_ERR(child2))
1567                 lfsck_object_put(env, child2);
1568         if (child1 != NULL && !IS_ERR(child1))
1569                 lfsck_object_put(env, child1);
1570
1571         return rc;
1572 }
1573
1574 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1575 {
1576         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1577         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1578         char                    *prefix;
1579         int                      rc     = 0;
1580         ENTRY;
1581
1582         if (unlikely(ss == NULL))
1583                 RETURN(-ENXIO);
1584
1585         OBD_ALLOC_PTR(lfsck->li_seq);
1586         if (lfsck->li_seq == NULL)
1587                 RETURN(-ENOMEM);
1588
1589         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1590         if (prefix == NULL)
1591                 GOTO(out, rc = -ENOMEM);
1592
1593         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1594         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1595                              ss->ss_server_seq);
1596         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1597         if (rc != 0)
1598                 GOTO(out, rc);
1599
1600         if (fid_is_sane(&bk->lb_last_fid))
1601                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1602
1603         RETURN(0);
1604
1605 out:
1606         OBD_FREE_PTR(lfsck->li_seq);
1607         lfsck->li_seq = NULL;
1608
1609         return rc;
1610 }
1611
1612 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1613 {
1614         if (lfsck->li_seq != NULL) {
1615                 seq_client_fini(lfsck->li_seq);
1616                 OBD_FREE_PTR(lfsck->li_seq);
1617                 lfsck->li_seq = NULL;
1618         }
1619 }
1620
1621 void lfsck_instance_cleanup(const struct lu_env *env,
1622                             struct lfsck_instance *lfsck)
1623 {
1624         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1625         struct lfsck_component  *com;
1626         struct lfsck_component  *next;
1627         struct lfsck_lmv_unit   *llu;
1628         struct lfsck_lmv_unit   *llu_next;
1629         struct lfsck_lmv        *llmv;
1630         ENTRY;
1631
1632         LASSERT(list_empty(&lfsck->li_link));
1633         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1634
1635         if (lfsck->li_obj_oit != NULL) {
1636                 lfsck_object_put(env, lfsck->li_obj_oit);
1637                 lfsck->li_obj_oit = NULL;
1638         }
1639
1640         LASSERT(lfsck->li_obj_dir == NULL);
1641         LASSERT(lfsck->li_lmv == NULL);
1642
1643         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1644                 llmv = &llu->llu_lmv;
1645
1646                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1647                          "still in using: %u\n",
1648                          atomic_read(&llmv->ll_ref));
1649
1650                 lfsck_lmv_put(env, llmv);
1651         }
1652
1653         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1654                 lfsck_component_cleanup(env, com);
1655         }
1656
1657         LASSERT(list_empty(&lfsck->li_list_dir));
1658
1659         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1660                                  lc_link) {
1661                 lfsck_component_cleanup(env, com);
1662         }
1663
1664         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1665                 lfsck_component_cleanup(env, com);
1666         }
1667
1668         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1669         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1670
1671         if (lfsck->li_lfsck_dir != NULL) {
1672                 lfsck_object_put(env, lfsck->li_lfsck_dir);
1673                 lfsck->li_lfsck_dir = NULL;
1674         }
1675
1676         if (lfsck->li_bookmark_obj != NULL) {
1677                 lfsck_object_put(env, lfsck->li_bookmark_obj);
1678                 lfsck->li_bookmark_obj = NULL;
1679         }
1680
1681         if (lfsck->li_lpf_obj != NULL) {
1682                 lfsck_object_put(env, lfsck->li_lpf_obj);
1683                 lfsck->li_lpf_obj = NULL;
1684         }
1685
1686         if (lfsck->li_lpf_root_obj != NULL) {
1687                 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1688                 lfsck->li_lpf_root_obj = NULL;
1689         }
1690
1691         if (lfsck->li_los != NULL) {
1692                 local_oid_storage_fini(env, lfsck->li_los);
1693                 lfsck->li_los = NULL;
1694         }
1695
1696         lfsck_fid_fini(lfsck);
1697
1698         OBD_FREE_PTR(lfsck);
1699 }
1700
1701 static inline struct lfsck_instance *
1702 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1703 {
1704         struct lfsck_instance *lfsck;
1705
1706         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1707                 if (lfsck->li_bottom == key) {
1708                         if (ref)
1709                                 lfsck_instance_get(lfsck);
1710                         if (unlink)
1711                                 list_del_init(&lfsck->li_link);
1712
1713                         return lfsck;
1714                 }
1715         }
1716
1717         return NULL;
1718 }
1719
1720 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1721                                            bool unlink)
1722 {
1723         struct lfsck_instance *lfsck;
1724
1725         spin_lock(&lfsck_instance_lock);
1726         lfsck = __lfsck_instance_find(key, ref, unlink);
1727         spin_unlock(&lfsck_instance_lock);
1728
1729         return lfsck;
1730 }
1731
1732 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1733 {
1734         struct lfsck_instance *tmp;
1735
1736         spin_lock(&lfsck_instance_lock);
1737         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1738                 if (lfsck->li_bottom == tmp->li_bottom) {
1739                         spin_unlock(&lfsck_instance_lock);
1740                         return -EEXIST;
1741                 }
1742         }
1743
1744         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1745         spin_unlock(&lfsck_instance_lock);
1746         return 0;
1747 }
1748
1749 void lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1750                      const char *prefix)
1751 {
1752         int flag;
1753         int i;
1754         bool newline = (bits != 0 ? false : true);
1755
1756         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1757
1758         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1759                 if (flag & bits) {
1760                         bits &= ~flag;
1761                         if (names[i] != NULL) {
1762                                 if (bits == 0)
1763                                         newline = true;
1764
1765                                 seq_printf(m, "%s%c", names[i],
1766                                            newline ? '\n' : ',');
1767                         }
1768                 }
1769         }
1770
1771         if (!newline)
1772                 seq_putc(m, '\n');
1773 }
1774
1775 void lfsck_time_dump(struct seq_file *m, __u64 time, const char *name)
1776 {
1777         if (time == 0) {
1778                 seq_printf(m, "%s_time: N/A\n", name);
1779                 seq_printf(m, "time_since_%s: N/A\n", name);
1780         } else {
1781                 seq_printf(m, "%s_time: %llu\n", name, time);
1782                 seq_printf(m, "time_since_%s: %llu seconds\n",
1783                            name, cfs_time_current_sec() - time);
1784         }
1785 }
1786
1787 void lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1788                     const char *prefix)
1789 {
1790         if (fid_is_zero(&pos->lp_dir_parent)) {
1791                 if (pos->lp_oit_cookie == 0) {
1792                         seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
1793                         return;
1794                 }
1795                 seq_printf(m, "%s: %llu, N/A, N/A\n",
1796                            prefix, pos->lp_oit_cookie);
1797         } else {
1798                 seq_printf(m, "%s: %llu, "DFID", %#llx\n",
1799                            prefix, pos->lp_oit_cookie,
1800                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1801         }
1802 }
1803
1804 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1805                     struct lfsck_position *pos, bool init)
1806 {
1807         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1808
1809         if (unlikely(lfsck->li_di_oit == NULL)) {
1810                 memset(pos, 0, sizeof(*pos));
1811                 return;
1812         }
1813
1814         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1815         if (!lfsck->li_current_oit_processed && !init)
1816                 pos->lp_oit_cookie--;
1817
1818         LASSERT(pos->lp_oit_cookie > 0);
1819
1820         if (lfsck->li_di_dir != NULL) {
1821                 struct dt_object *dto = lfsck->li_obj_dir;
1822
1823                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1824                                                         lfsck->li_di_dir);
1825
1826                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1827                         fid_zero(&pos->lp_dir_parent);
1828                         pos->lp_dir_cookie = 0;
1829                 } else {
1830                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1831                 }
1832         } else {
1833                 fid_zero(&pos->lp_dir_parent);
1834                 pos->lp_dir_cookie = 0;
1835         }
1836 }
1837
1838 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1839 {
1840         bool dirty = false;
1841
1842         if (limit != LFSCK_SPEED_NO_LIMIT) {
1843                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1844                         lfsck->li_sleep_rate = limit /
1845                                                msecs_to_jiffies(MSEC_PER_SEC);
1846                         lfsck->li_sleep_jif = 1;
1847                 } else {
1848                         lfsck->li_sleep_rate = 1;
1849                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1850                                               limit;
1851                 }
1852         } else {
1853                 lfsck->li_sleep_jif = 0;
1854                 lfsck->li_sleep_rate = 0;
1855         }
1856
1857         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1858                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1859                 dirty = true;
1860         }
1861
1862         return dirty;
1863 }
1864
1865 void lfsck_control_speed(struct lfsck_instance *lfsck)
1866 {
1867         struct ptlrpc_thread *thread = &lfsck->li_thread;
1868         struct l_wait_info    lwi;
1869
1870         if (lfsck->li_sleep_jif > 0 &&
1871             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1872                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1873                                        LWI_ON_SIGNAL_NOOP, NULL);
1874
1875                 l_wait_event(thread->t_ctl_waitq,
1876                              !thread_is_running(thread),
1877                              &lwi);
1878                 lfsck->li_new_scanned = 0;
1879         }
1880 }
1881
1882 void lfsck_control_speed_by_self(struct lfsck_component *com)
1883 {
1884         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1885         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1886         struct l_wait_info       lwi;
1887
1888         if (lfsck->li_sleep_jif > 0 &&
1889             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1890                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1891                                        LWI_ON_SIGNAL_NOOP, NULL);
1892
1893                 l_wait_event(thread->t_ctl_waitq,
1894                              !thread_is_running(thread),
1895                              &lwi);
1896                 com->lc_new_scanned = 0;
1897         }
1898 }
1899
1900 static struct lfsck_thread_args *
1901 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1902                        struct lfsck_component *com,
1903                        struct lfsck_start_param *lsp)
1904 {
1905         struct lfsck_thread_args *lta;
1906         int                       rc;
1907
1908         OBD_ALLOC_PTR(lta);
1909         if (lta == NULL)
1910                 return ERR_PTR(-ENOMEM);
1911
1912         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1913         if (rc != 0) {
1914                 OBD_FREE_PTR(lta);
1915                 return ERR_PTR(rc);
1916         }
1917
1918         lta->lta_lfsck = lfsck_instance_get(lfsck);
1919         if (com != NULL)
1920                 lta->lta_com = lfsck_component_get(com);
1921
1922         lta->lta_lsp = lsp;
1923
1924         return lta;
1925 }
1926
1927 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1928 {
1929         if (lta->lta_com != NULL)
1930                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1931         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1932         lu_env_fini(&lta->lta_env);
1933         OBD_FREE_PTR(lta);
1934 }
1935
1936 struct lfsck_assistant_data *
1937 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1938                           const char *name)
1939 {
1940         struct lfsck_assistant_data *lad;
1941
1942         OBD_ALLOC_PTR(lad);
1943         if (lad != NULL) {
1944                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1945                 if (lad->lad_bitmap == NULL) {
1946                         OBD_FREE_PTR(lad);
1947                         return NULL;
1948                 }
1949
1950                 INIT_LIST_HEAD(&lad->lad_req_list);
1951                 spin_lock_init(&lad->lad_lock);
1952                 INIT_LIST_HEAD(&lad->lad_ost_list);
1953                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1954                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1955                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1956                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1957                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1958                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1959                 lad->lad_ops = lao;
1960                 lad->lad_name = name;
1961         }
1962
1963         return lad;
1964 }
1965
1966 struct lfsck_assistant_object *
1967 lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
1968                             const struct lu_attr *attr, __u64 cookie,
1969                             bool is_dir)
1970 {
1971         struct lfsck_assistant_object   *lso;
1972
1973         OBD_ALLOC_PTR(lso);
1974         if (lso == NULL)
1975                 return ERR_PTR(-ENOMEM);
1976
1977         lso->lso_fid = *fid;
1978         if (attr != NULL)
1979                 lso->lso_attr = *attr;
1980
1981         atomic_set(&lso->lso_ref, 1);
1982         lso->lso_oit_cookie = cookie;
1983         if (is_dir)
1984                 lso->lso_is_dir = 1;
1985
1986         return lso;
1987 }
1988
1989 struct dt_object *
1990 lfsck_assistant_object_load(const struct lu_env *env,
1991                             struct lfsck_instance *lfsck,
1992                             struct lfsck_assistant_object *lso)
1993 {
1994         struct dt_object *obj;
1995
1996         obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
1997         if (IS_ERR(obj))
1998                 return obj;
1999
2000         if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
2001                 lso->lso_dead = 1;
2002                 lfsck_object_put(env, obj);
2003
2004                 return ERR_PTR(-ENOENT);
2005         }
2006
2007         if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj))) {
2008                 lfsck_object_put(env, obj);
2009
2010                 return ERR_PTR(-ENOTDIR);
2011         }
2012
2013         return obj;
2014 }
2015
2016 /**
2017  * Generic LFSCK asynchronous communication interpretor function.
2018  * The LFSCK RPC reply for both the event notification and status
2019  * querying will be handled here.
2020  *
2021  * \param[in] env       pointer to the thread context
2022  * \param[in] req       pointer to the LFSCK request
2023  * \param[in] args      pointer to the lfsck_async_interpret_args
2024  * \param[in] rc        the result for handling the LFSCK request
2025  *
2026  * \retval              0 for success
2027  * \retval              negative error number on failure
2028  */
2029 int lfsck_async_interpret_common(const struct lu_env *env,
2030                                  struct ptlrpc_request *req,
2031                                  void *args, int rc)
2032 {
2033         struct lfsck_async_interpret_args *laia = args;
2034         struct lfsck_component            *com  = laia->laia_com;
2035         struct lfsck_assistant_data       *lad  = com->lc_data;
2036         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
2037         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
2038         struct lfsck_request              *lr   = laia->laia_lr;
2039
2040         LASSERT(com->lc_lfsck->li_master);
2041
2042         switch (lr->lr_event) {
2043         case LE_START:
2044                 if (rc != 0) {
2045                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
2046                                "start: rc = %d\n",
2047                                lfsck_lfsck2name(com->lc_lfsck),
2048                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2049                                ltd->ltd_index, lad->lad_name, rc);
2050
2051                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2052                                 struct lfsck_layout *lo = com->lc_file_ram;
2053
2054                                 if (lr->lr_flags & LEF_TO_OST)
2055                                         lfsck_lad_set_bitmap(env, com,
2056                                                              ltd->ltd_index);
2057                                 else
2058                                         lo->ll_flags |= LF_INCOMPLETE;
2059                         } else {
2060                                 struct lfsck_namespace *ns = com->lc_file_ram;
2061
2062                                 /* If some MDT does not join the namespace
2063                                  * LFSCK, then we cannot know whether there
2064                                  * is some name entry on such MDT that with
2065                                  * the referenced MDT-object on this MDT or
2066                                  * not. So the namespace LFSCK on this MDT
2067                                  * cannot handle orphan MDT-objects properly.
2068                                  * So we mark the LFSCK as LF_INCOMPLETE and
2069                                  * skip orphan MDT-objects handling. */
2070                                 ns->ln_flags |= LF_INCOMPLETE;
2071                         }
2072                         break;
2073                 }
2074
2075                 spin_lock(&ltds->ltd_lock);
2076                 if (ltd->ltd_dead) {
2077                         spin_unlock(&ltds->ltd_lock);
2078                         break;
2079                 }
2080
2081                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2082                         struct list_head *list;
2083                         struct list_head *phase_list;
2084
2085                         if (ltd->ltd_layout_done) {
2086                                 spin_unlock(&ltds->ltd_lock);
2087                                 break;
2088                         }
2089
2090                         if (lr->lr_flags & LEF_TO_OST) {
2091                                 list = &lad->lad_ost_list;
2092                                 phase_list = &lad->lad_ost_phase1_list;
2093                         } else {
2094                                 list = &lad->lad_mdt_list;
2095                                 phase_list = &lad->lad_mdt_phase1_list;
2096                         }
2097
2098                         if (list_empty(&ltd->ltd_layout_list))
2099                                 list_add_tail(&ltd->ltd_layout_list, list);
2100                         if (list_empty(&ltd->ltd_layout_phase_list))
2101                                 list_add_tail(&ltd->ltd_layout_phase_list,
2102                                               phase_list);
2103                 } else {
2104                         if (ltd->ltd_namespace_done) {
2105                                 spin_unlock(&ltds->ltd_lock);
2106                                 break;
2107                         }
2108
2109                         if (list_empty(&ltd->ltd_namespace_list))
2110                                 list_add_tail(&ltd->ltd_namespace_list,
2111                                               &lad->lad_mdt_list);
2112                         if (list_empty(&ltd->ltd_namespace_phase_list))
2113                                 list_add_tail(&ltd->ltd_namespace_phase_list,
2114                                               &lad->lad_mdt_phase1_list);
2115                 }
2116                 spin_unlock(&ltds->ltd_lock);
2117                 break;
2118         case LE_STOP:
2119         case LE_PHASE1_DONE:
2120         case LE_PHASE2_DONE:
2121         case LE_PEER_EXIT:
2122                 if (rc != 0 && rc != -EALREADY)
2123                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
2124                               "event = %d, rc = %d\n",
2125                               lfsck_lfsck2name(com->lc_lfsck),
2126                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2127                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
2128                 break;
2129         case LE_QUERY: {
2130                 struct lfsck_reply *reply;
2131                 struct list_head *list;
2132                 struct list_head *phase_list;
2133
2134                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2135                         list = &ltd->ltd_layout_list;
2136                         phase_list = &ltd->ltd_layout_phase_list;
2137                 } else {
2138                         list = &ltd->ltd_namespace_list;
2139                         phase_list = &ltd->ltd_namespace_phase_list;
2140                 }
2141
2142                 if (rc != 0) {
2143                         if (lr->lr_flags & LEF_QUERY_ALL) {
2144                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2145                                 break;
2146                         }
2147
2148                         spin_lock(&ltds->ltd_lock);
2149                         list_del_init(phase_list);
2150                         list_del_init(list);
2151                         spin_unlock(&ltds->ltd_lock);
2152                         break;
2153                 }
2154
2155                 reply = req_capsule_server_get(&req->rq_pill,
2156                                                &RMF_LFSCK_REPLY);
2157                 if (reply == NULL) {
2158                         rc = -EPROTO;
2159                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
2160                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2161                                lad->lad_name, rc);
2162
2163                         if (lr->lr_flags & LEF_QUERY_ALL) {
2164                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2165                                 break;
2166                         }
2167
2168                         spin_lock(&ltds->ltd_lock);
2169                         list_del_init(phase_list);
2170                         list_del_init(list);
2171                         spin_unlock(&ltds->ltd_lock);
2172                         break;
2173                 }
2174
2175                 if (lr->lr_flags & LEF_QUERY_ALL) {
2176                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2177                                 ltd->ltd_layout_status = reply->lr_status;
2178                                 ltd->ltd_layout_repaired = reply->lr_repaired;
2179                         } else {
2180                                 ltd->ltd_namespace_status = reply->lr_status;
2181                                 ltd->ltd_namespace_repaired =
2182                                                         reply->lr_repaired;
2183                         }
2184                         break;
2185                 }
2186
2187                 switch (reply->lr_status) {
2188                 case LS_SCANNING_PHASE1:
2189                         break;
2190                 case LS_SCANNING_PHASE2:
2191                         spin_lock(&ltds->ltd_lock);
2192                         list_del_init(phase_list);
2193                         if (ltd->ltd_dead) {
2194                                 spin_unlock(&ltds->ltd_lock);
2195                                 break;
2196                         }
2197
2198                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2199                                 if (ltd->ltd_layout_done) {
2200                                         spin_unlock(&ltds->ltd_lock);
2201                                         break;
2202                                 }
2203
2204                                 if (lr->lr_flags & LEF_TO_OST)
2205                                         list_add_tail(phase_list,
2206                                                 &lad->lad_ost_phase2_list);
2207                                 else
2208                                         list_add_tail(phase_list,
2209                                                 &lad->lad_mdt_phase2_list);
2210                         } else {
2211                                 if (ltd->ltd_namespace_done) {
2212                                         spin_unlock(&ltds->ltd_lock);
2213                                         break;
2214                                 }
2215
2216                                 list_add_tail(phase_list,
2217                                               &lad->lad_mdt_phase2_list);
2218                         }
2219                         spin_unlock(&ltds->ltd_lock);
2220                         break;
2221                 default:
2222                         spin_lock(&ltds->ltd_lock);
2223                         list_del_init(phase_list);
2224                         list_del_init(list);
2225                         spin_unlock(&ltds->ltd_lock);
2226                         break;
2227                 }
2228                 break;
2229         }
2230         default:
2231                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2232                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2233                 break;
2234         }
2235
2236         if (!laia->laia_shared) {
2237                 lfsck_tgt_put(ltd);
2238                 lfsck_component_put(env, com);
2239         }
2240
2241         return 0;
2242 }
2243
2244 static void lfsck_interpret(const struct lu_env *env,
2245                             struct lfsck_instance *lfsck,
2246                             struct ptlrpc_request *req, void *args, int result)
2247 {
2248         struct lfsck_async_interpret_args *laia = args;
2249         struct lfsck_component            *com;
2250
2251         LASSERT(laia->laia_com == NULL);
2252         LASSERT(laia->laia_shared);
2253
2254         spin_lock(&lfsck->li_lock);
2255         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2256                 laia->laia_com = com;
2257                 lfsck_async_interpret_common(env, req, laia, result);
2258         }
2259
2260         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2261                 laia->laia_com = com;
2262                 lfsck_async_interpret_common(env, req, laia, result);
2263         }
2264         spin_unlock(&lfsck->li_lock);
2265 }
2266
2267 static int lfsck_stop_notify(const struct lu_env *env,
2268                              struct lfsck_instance *lfsck,
2269                              struct lfsck_tgt_descs *ltds,
2270                              struct lfsck_tgt_desc *ltd, __u16 type)
2271 {
2272         struct lfsck_component *com;
2273         int                     rc = 0;
2274         ENTRY;
2275
2276         LASSERT(lfsck->li_master);
2277
2278         spin_lock(&lfsck->li_lock);
2279         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2280         if (com == NULL)
2281                 com = __lfsck_component_find(lfsck, type,
2282                                              &lfsck->li_list_double_scan);
2283         if (com != NULL)
2284                 lfsck_component_get(com);
2285         spin_unlock(&lfsck->li_lock);
2286
2287         if (com != NULL) {
2288                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2289                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2290                 struct lfsck_request              *lr    = &info->lti_lr;
2291                 struct lfsck_assistant_data       *lad   = com->lc_data;
2292                 struct list_head                  *list;
2293                 struct list_head                  *phase_list;
2294                 struct ptlrpc_request_set         *set;
2295
2296                 set = ptlrpc_prep_set();
2297                 if (set == NULL) {
2298                         lfsck_component_put(env, com);
2299
2300                         RETURN(-ENOMEM);
2301                 }
2302
2303                 if (type == LFSCK_TYPE_LAYOUT) {
2304                         list = &ltd->ltd_layout_list;
2305                         phase_list = &ltd->ltd_layout_phase_list;
2306                 } else {
2307                         list = &ltd->ltd_namespace_list;
2308                         phase_list = &ltd->ltd_namespace_phase_list;
2309                 }
2310
2311                 spin_lock(&ltds->ltd_lock);
2312                 if (list_empty(list)) {
2313                         LASSERT(list_empty(phase_list));
2314                         spin_unlock(&ltds->ltd_lock);
2315                         ptlrpc_set_destroy(set);
2316
2317                         RETURN(0);
2318                 }
2319
2320                 list_del_init(phase_list);
2321                 list_del_init(list);
2322                 spin_unlock(&ltds->ltd_lock);
2323
2324                 memset(lr, 0, sizeof(*lr));
2325                 lr->lr_index = lfsck_dev_idx(lfsck);
2326                 lr->lr_event = LE_PEER_EXIT;
2327                 lr->lr_active = type;
2328                 lr->lr_status = LS_CO_PAUSED;
2329                 if (ltds == &lfsck->li_ost_descs)
2330                         lr->lr_flags = LEF_TO_OST;
2331
2332                 memset(laia, 0, sizeof(*laia));
2333                 laia->laia_com = com;
2334                 laia->laia_ltds = ltds;
2335                 atomic_inc(&ltd->ltd_ref);
2336                 laia->laia_ltd = ltd;
2337                 laia->laia_lr = lr;
2338
2339                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2340                                          lfsck_async_interpret_common,
2341                                          laia, LFSCK_NOTIFY);
2342                 if (rc != 0) {
2343                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2344                                "co-stop for %s: rc = %d\n",
2345                                lfsck_lfsck2name(lfsck),
2346                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2347                                ltd->ltd_index, lad->lad_name, rc);
2348                         lfsck_tgt_put(ltd);
2349                 } else {
2350                         rc = ptlrpc_set_wait(set);
2351                 }
2352
2353                 ptlrpc_set_destroy(set);
2354                 lfsck_component_put(env, com);
2355         }
2356
2357         RETURN(rc);
2358 }
2359
2360 static int lfsck_async_interpret(const struct lu_env *env,
2361                                  struct ptlrpc_request *req,
2362                                  void *args, int rc)
2363 {
2364         struct lfsck_async_interpret_args *laia = args;
2365         struct lfsck_instance             *lfsck;
2366
2367         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2368                               li_mdt_descs);
2369         lfsck_interpret(env, lfsck, req, laia, rc);
2370         lfsck_tgt_put(laia->laia_ltd);
2371         if (rc != 0 && laia->laia_result != -EALREADY)
2372                 laia->laia_result = rc;
2373
2374         return 0;
2375 }
2376
2377 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2378                         struct lfsck_request *lr,
2379                         struct ptlrpc_request_set *set,
2380                         ptlrpc_interpterer_t interpreter,
2381                         void *args, int request)
2382 {
2383         struct lfsck_async_interpret_args *laia;
2384         struct ptlrpc_request             *req;
2385         struct lfsck_request              *tmp;
2386         struct req_format                 *format;
2387         int                                rc;
2388
2389         switch (request) {
2390         case LFSCK_NOTIFY:
2391                 format = &RQF_LFSCK_NOTIFY;
2392                 break;
2393         case LFSCK_QUERY:
2394                 format = &RQF_LFSCK_QUERY;
2395                 break;
2396         default:
2397                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2398                        exp->exp_obd->obd_name, request, -EINVAL);
2399                 return -EINVAL;
2400         }
2401
2402         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2403         if (req == NULL)
2404                 return -ENOMEM;
2405
2406         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2407         if (rc != 0) {
2408                 ptlrpc_request_free(req);
2409
2410                 return rc;
2411         }
2412
2413         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2414         *tmp = *lr;
2415         ptlrpc_request_set_replen(req);
2416
2417         laia = ptlrpc_req_async_args(req);
2418         *laia = *(struct lfsck_async_interpret_args *)args;
2419         if (laia->laia_com != NULL)
2420                 lfsck_component_get(laia->laia_com);
2421         req->rq_interpret_reply = interpreter;
2422         req->rq_allow_intr = 1;
2423         ptlrpc_set_add_req(set, req);
2424
2425         return 0;
2426 }
2427
2428 int lfsck_query_all(const struct lu_env *env, struct lfsck_component *com)
2429 {
2430         struct lfsck_thread_info          *info  = lfsck_env_info(env);
2431         struct lfsck_request              *lr    = &info->lti_lr;
2432         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2433         struct lfsck_instance             *lfsck = com->lc_lfsck;
2434         struct lfsck_tgt_descs            *ltds  = &lfsck->li_mdt_descs;
2435         struct lfsck_tgt_desc             *ltd;
2436         struct ptlrpc_request_set         *set;
2437         int                                idx;
2438         int                                rc;
2439         ENTRY;
2440
2441         memset(lr, 0, sizeof(*lr));
2442         lr->lr_event = LE_QUERY;
2443         lr->lr_active = com->lc_type;
2444         lr->lr_flags = LEF_QUERY_ALL;
2445
2446         memset(laia, 0, sizeof(*laia));
2447         laia->laia_com = com;
2448         laia->laia_lr = lr;
2449
2450         set = ptlrpc_prep_set();
2451         if (set == NULL)
2452                 RETURN(-ENOMEM);
2453
2454 again:
2455         laia->laia_ltds = ltds;
2456         down_read(&ltds->ltd_rw_sem);
2457         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2458                 ltd = lfsck_tgt_get(ltds, idx);
2459                 LASSERT(ltd != NULL);
2460
2461                 laia->laia_ltd = ltd;
2462                 up_read(&ltds->ltd_rw_sem);
2463                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2464                                          lfsck_async_interpret_common,
2465                                          laia, LFSCK_QUERY);
2466                 if (rc != 0) {
2467                         struct lfsck_assistant_data *lad = com->lc_data;
2468
2469                         CDEBUG(D_LFSCK, "%s: Fail to query %s %x for stat %s: "
2470                                "rc = %d\n", lfsck_lfsck2name(lfsck),
2471                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2472                                ltd->ltd_index, lad->lad_name, rc);
2473                         lfsck_reset_ltd_status(ltd, com->lc_type);
2474                         lfsck_tgt_put(ltd);
2475                 }
2476                 down_read(&ltds->ltd_rw_sem);
2477         }
2478         up_read(&ltds->ltd_rw_sem);
2479
2480         if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST)) {
2481                 ltds = &lfsck->li_ost_descs;
2482                 lr->lr_flags |= LEF_TO_OST;
2483                 goto again;
2484         }
2485
2486         rc = ptlrpc_set_wait(set);
2487         ptlrpc_set_destroy(set);
2488
2489         RETURN(rc);
2490 }
2491
2492 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2493                           struct lfsck_start_param *lsp)
2494 {
2495         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2496         struct lfsck_assistant_data     *lad     = com->lc_data;
2497         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2498         struct ptlrpc_thread            *athread = &lad->lad_thread;
2499         struct lfsck_thread_args        *lta;
2500         struct task_struct              *task;
2501         int                              rc;
2502         ENTRY;
2503
2504         lad->lad_assistant_status = 0;
2505         lad->lad_post_result = 0;
2506         lad->lad_to_post = 0;
2507         lad->lad_to_double_scan = 0;
2508         lad->lad_in_double_scan = 0;
2509         lad->lad_exit = 0;
2510         lad->lad_advance_lock = false;
2511         thread_set_flags(athread, 0);
2512
2513         lta = lfsck_thread_args_init(lfsck, com, lsp);
2514         if (IS_ERR(lta))
2515                 RETURN(PTR_ERR(lta));
2516
2517         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2518         if (IS_ERR(task)) {
2519                 rc = PTR_ERR(task);
2520                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2521                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2522                 lfsck_thread_args_fini(lta);
2523         } else {
2524                 struct l_wait_info lwi = { 0 };
2525
2526                 l_wait_event(mthread->t_ctl_waitq,
2527                              thread_is_running(athread) ||
2528                              thread_is_stopped(athread),
2529                              &lwi);
2530                 if (unlikely(!thread_is_running(athread)))
2531                         rc = lad->lad_assistant_status;
2532                 else
2533                         rc = 0;
2534         }
2535
2536         RETURN(rc);
2537 }
2538
2539 int lfsck_checkpoint_generic(const struct lu_env *env,
2540                              struct lfsck_component *com)
2541 {
2542         struct lfsck_assistant_data     *lad     = com->lc_data;
2543         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2544         struct ptlrpc_thread            *athread = &lad->lad_thread;
2545         struct l_wait_info               lwi     = { 0 };
2546
2547         l_wait_event(mthread->t_ctl_waitq,
2548                      list_empty(&lad->lad_req_list) ||
2549                      !thread_is_running(mthread) ||
2550                      thread_is_stopped(athread),
2551                      &lwi);
2552
2553         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2554                 return LFSCK_CHECKPOINT_SKIP;
2555
2556         return 0;
2557 }
2558
2559 void lfsck_post_generic(const struct lu_env *env,
2560                         struct lfsck_component *com, int *result)
2561 {
2562         struct lfsck_assistant_data     *lad     = com->lc_data;
2563         struct ptlrpc_thread            *athread = &lad->lad_thread;
2564         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2565         struct l_wait_info               lwi     = { 0 };
2566
2567         lad->lad_post_result = *result;
2568         if (*result <= 0)
2569                 lad->lad_exit = 1;
2570         lad->lad_to_post = 1;
2571
2572         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s post, rc = %d\n",
2573                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2574
2575         wake_up_all(&athread->t_ctl_waitq);
2576         l_wait_event(mthread->t_ctl_waitq,
2577                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2578                      thread_is_stopped(athread),
2579                      &lwi);
2580
2581         if (lad->lad_assistant_status < 0)
2582                 *result = lad->lad_assistant_status;
2583
2584         CDEBUG(D_LFSCK, "%s: the assistant has done %s post, rc = %d\n",
2585                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2586 }
2587
2588 int lfsck_double_scan_generic(const struct lu_env *env,
2589                               struct lfsck_component *com, int status)
2590 {
2591         struct lfsck_assistant_data     *lad     = com->lc_data;
2592         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2593         struct ptlrpc_thread            *athread = &lad->lad_thread;
2594         struct l_wait_info               lwi     = { 0 };
2595
2596         if (status != LS_SCANNING_PHASE2)
2597                 lad->lad_exit = 1;
2598         else
2599                 lad->lad_to_double_scan = 1;
2600
2601         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s double_scan, "
2602                "status %d\n",
2603                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, status);
2604
2605         wake_up_all(&athread->t_ctl_waitq);
2606         l_wait_event(mthread->t_ctl_waitq,
2607                      lad->lad_in_double_scan ||
2608                      thread_is_stopped(athread),
2609                      &lwi);
2610
2611         CDEBUG(D_LFSCK, "%s: the assistant has done %s double_scan, "
2612                "status %d\n", lfsck_lfsck2name(com->lc_lfsck), lad->lad_name,
2613                lad->lad_assistant_status);
2614
2615         if (lad->lad_assistant_status < 0)
2616                 return lad->lad_assistant_status;
2617
2618         return 0;
2619 }
2620
2621 void lfsck_quit_generic(const struct lu_env *env,
2622                         struct lfsck_component *com)
2623 {
2624         struct lfsck_assistant_data     *lad     = com->lc_data;
2625         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2626         struct ptlrpc_thread            *athread = &lad->lad_thread;
2627         struct l_wait_info               lwi     = { 0 };
2628
2629         lad->lad_exit = 1;
2630         wake_up_all(&athread->t_ctl_waitq);
2631         l_wait_event(mthread->t_ctl_waitq,
2632                      thread_is_init(athread) ||
2633                      thread_is_stopped(athread),
2634                      &lwi);
2635 }
2636
2637 /* external interfaces */
2638
2639 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2640 {
2641         struct lu_env           env;
2642         struct lfsck_instance  *lfsck;
2643         int                     rc;
2644         ENTRY;
2645
2646         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2647         if (rc != 0)
2648                 RETURN(rc);
2649
2650         lfsck = lfsck_instance_find(key, true, false);
2651         if (likely(lfsck != NULL)) {
2652                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2653                 lfsck_instance_put(&env, lfsck);
2654         } else {
2655                 rc = -ENXIO;
2656         }
2657
2658         lu_env_fini(&env);
2659
2660         RETURN(rc);
2661 }
2662 EXPORT_SYMBOL(lfsck_get_speed);
2663
2664 int lfsck_set_speed(struct dt_device *key, __u32 val)
2665 {
2666         struct lu_env           env;
2667         struct lfsck_instance  *lfsck;
2668         int                     rc;
2669         ENTRY;
2670
2671         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2672         if (rc != 0)
2673                 RETURN(rc);
2674
2675         lfsck = lfsck_instance_find(key, true, false);
2676         if (likely(lfsck != NULL)) {
2677                 mutex_lock(&lfsck->li_mutex);
2678                 if (__lfsck_set_speed(lfsck, val))
2679                         rc = lfsck_bookmark_store(&env, lfsck);
2680                 mutex_unlock(&lfsck->li_mutex);
2681                 lfsck_instance_put(&env, lfsck);
2682         } else {
2683                 rc = -ENXIO;
2684         }
2685
2686         lu_env_fini(&env);
2687
2688         RETURN(rc);
2689 }
2690 EXPORT_SYMBOL(lfsck_set_speed);
2691
2692 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2693 {
2694         struct lu_env           env;
2695         struct lfsck_instance  *lfsck;
2696         int                     rc;
2697         ENTRY;
2698
2699         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2700         if (rc != 0)
2701                 RETURN(rc);
2702
2703         lfsck = lfsck_instance_find(key, true, false);
2704         if (likely(lfsck != NULL)) {
2705                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2706                 lfsck_instance_put(&env, lfsck);
2707         } else {
2708                 rc = -ENXIO;
2709         }
2710
2711         lu_env_fini(&env);
2712
2713         RETURN(rc);
2714 }
2715 EXPORT_SYMBOL(lfsck_get_windows);
2716
2717 int lfsck_set_windows(struct dt_device *key, int val)
2718 {
2719         struct lu_env           env;
2720         struct lfsck_instance  *lfsck;
2721         int                     rc;
2722         ENTRY;
2723
2724         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2725         if (rc != 0)
2726                 RETURN(rc);
2727
2728         lfsck = lfsck_instance_find(key, true, false);
2729         if (likely(lfsck != NULL)) {
2730                 if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) {
2731                         CWARN("%s: invalid async windows size that may "
2732                               "cause memory issues. The valid range is "
2733                               "[1 - %u].\n",
2734                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2735                         rc = -EINVAL;
2736                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2737                         mutex_lock(&lfsck->li_mutex);
2738                         lfsck->li_bookmark_ram.lb_async_windows = val;
2739                         rc = lfsck_bookmark_store(&env, lfsck);
2740                         mutex_unlock(&lfsck->li_mutex);
2741                 }
2742                 lfsck_instance_put(&env, lfsck);
2743         } else {
2744                 rc = -ENXIO;
2745         }
2746
2747         lu_env_fini(&env);
2748
2749         RETURN(rc);
2750 }
2751 EXPORT_SYMBOL(lfsck_set_windows);
2752
2753 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2754 {
2755         struct lu_env           env;
2756         struct lfsck_instance  *lfsck;
2757         struct lfsck_component *com;
2758         int                     rc;
2759         ENTRY;
2760
2761         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2762         if (rc != 0)
2763                 RETURN(rc);
2764
2765         lfsck = lfsck_instance_find(key, true, false);
2766         if (likely(lfsck != NULL)) {
2767                 com = lfsck_component_find(lfsck, type);
2768                 if (likely(com != NULL)) {
2769                         com->lc_ops->lfsck_dump(&env, com, m);
2770                         lfsck_component_put(&env, com);
2771                 } else {
2772                         rc = -ENOTSUPP;
2773                 }
2774
2775                 lfsck_instance_put(&env, lfsck);
2776         } else {
2777                 rc = -ENXIO;
2778         }
2779
2780         lu_env_fini(&env);
2781
2782         RETURN(rc);
2783 }
2784 EXPORT_SYMBOL(lfsck_dump);
2785
2786 static int lfsck_stop_all(const struct lu_env *env,
2787                           struct lfsck_instance *lfsck,
2788                           struct lfsck_stop *stop)
2789 {
2790         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2791         struct lfsck_request              *lr     = &info->lti_lr;
2792         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2793         struct ptlrpc_request_set         *set;
2794         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2795         struct lfsck_tgt_desc             *ltd;
2796         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2797         __u32                              idx;
2798         int                                rc     = 0;
2799         int                                rc1    = 0;
2800         ENTRY;
2801
2802         LASSERT(stop->ls_flags & LPF_BROADCAST);
2803
2804         set = ptlrpc_prep_set();
2805         if (unlikely(set == NULL))
2806                 RETURN(-ENOMEM);
2807
2808         memset(lr, 0, sizeof(*lr));
2809         lr->lr_event = LE_STOP;
2810         lr->lr_index = lfsck_dev_idx(lfsck);
2811         lr->lr_status = stop->ls_status;
2812         lr->lr_version = bk->lb_version;
2813         lr->lr_active = LFSCK_TYPES_ALL;
2814         lr->lr_param = stop->ls_flags;
2815
2816         memset(laia, 0, sizeof(*laia));
2817         laia->laia_ltds = ltds;
2818         laia->laia_lr = lr;
2819         laia->laia_shared = 1;
2820
2821         down_read(&ltds->ltd_rw_sem);
2822         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2823                 ltd = lfsck_tgt_get(ltds, idx);
2824                 LASSERT(ltd != NULL);
2825
2826                 laia->laia_ltd = ltd;
2827                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2828                                          lfsck_async_interpret, laia,
2829                                          LFSCK_NOTIFY);
2830                 if (rc != 0) {
2831                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2832                         lfsck_tgt_put(ltd);
2833                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2834                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2835                         rc1 = rc;
2836                 }
2837         }
2838         up_read(&ltds->ltd_rw_sem);
2839
2840         rc = ptlrpc_set_wait(set);
2841         ptlrpc_set_destroy(set);
2842
2843         if (rc == 0)
2844                 rc = laia->laia_result;
2845
2846         if (rc == -EALREADY)
2847                 rc = 0;
2848
2849         if (rc != 0)
2850                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2851                        lfsck_lfsck2name(lfsck), rc);
2852
2853         RETURN(rc != 0 ? rc : rc1);
2854 }
2855
2856 static int lfsck_start_all(const struct lu_env *env,
2857                            struct lfsck_instance *lfsck,
2858                            struct lfsck_start *start)
2859 {
2860         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2861         struct lfsck_request              *lr     = &info->lti_lr;
2862         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2863         struct ptlrpc_request_set         *set;
2864         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2865         struct lfsck_tgt_desc             *ltd;
2866         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2867         __u32                              idx;
2868         int                                rc     = 0;
2869         ENTRY;
2870
2871         LASSERT(start->ls_flags & LPF_BROADCAST);
2872
2873         set = ptlrpc_prep_set();
2874         if (unlikely(set == NULL))
2875                 RETURN(-ENOMEM);
2876
2877         memset(lr, 0, sizeof(*lr));
2878         lr->lr_event = LE_START;
2879         lr->lr_index = lfsck_dev_idx(lfsck);
2880         lr->lr_speed = bk->lb_speed_limit;
2881         lr->lr_version = bk->lb_version;
2882         lr->lr_active = start->ls_active;
2883         lr->lr_param = start->ls_flags;
2884         lr->lr_async_windows = bk->lb_async_windows;
2885         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2886                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2887                        LSV_CREATE_MDTOBJ;
2888
2889         memset(laia, 0, sizeof(*laia));
2890         laia->laia_ltds = ltds;
2891         laia->laia_lr = lr;
2892         laia->laia_shared = 1;
2893
2894         down_read(&ltds->ltd_rw_sem);
2895         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2896                 ltd = lfsck_tgt_get(ltds, idx);
2897                 LASSERT(ltd != NULL);
2898
2899                 laia->laia_ltd = ltd;
2900                 ltd->ltd_layout_done = 0;
2901                 ltd->ltd_namespace_done = 0;
2902                 ltd->ltd_synced_failures = 0;
2903                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2904                                          lfsck_async_interpret, laia,
2905                                          LFSCK_NOTIFY);
2906                 if (rc != 0) {
2907                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2908                         lfsck_tgt_put(ltd);
2909                         CERROR("%s: cannot notify MDT %x for LFSCK "
2910                                "start, failout: rc = %d\n",
2911                                lfsck_lfsck2name(lfsck), idx, rc);
2912                         break;
2913                 }
2914         }
2915         up_read(&ltds->ltd_rw_sem);
2916
2917         if (rc != 0) {
2918                 ptlrpc_set_destroy(set);
2919
2920                 RETURN(rc);
2921         }
2922
2923         rc = ptlrpc_set_wait(set);
2924         ptlrpc_set_destroy(set);
2925
2926         if (rc == 0)
2927                 rc = laia->laia_result;
2928
2929         if (rc != 0) {
2930                 struct lfsck_stop *stop = &info->lti_stop;
2931
2932                 CERROR("%s: cannot start LFSCK on some MDTs, "
2933                        "stop all: rc = %d\n",
2934                        lfsck_lfsck2name(lfsck), rc);
2935                 if (rc != -EALREADY) {
2936                         stop->ls_status = LS_FAILED;
2937                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2938                         lfsck_stop_all(env, lfsck, stop);
2939                 }
2940         }
2941
2942         RETURN(rc);
2943 }
2944
2945 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2946                 struct lfsck_start_param *lsp)
2947 {
2948         struct lfsck_start              *start  = lsp->lsp_start;
2949         struct lfsck_instance           *lfsck;
2950         struct lfsck_bookmark           *bk;
2951         struct ptlrpc_thread            *thread;
2952         struct lfsck_component          *com;
2953         struct l_wait_info               lwi    = { 0 };
2954         struct lfsck_thread_args        *lta;
2955         struct task_struct              *task;
2956         struct lfsck_tgt_descs          *ltds;
2957         struct lfsck_tgt_desc           *ltd;
2958         __u32                            idx;
2959         int                              rc     = 0;
2960         __u16                            valid  = 0;
2961         __u16                            flags  = 0;
2962         __u16                            type   = 1;
2963         ENTRY;
2964
2965         lfsck = lfsck_instance_find(key, true, false);
2966         if (unlikely(lfsck == NULL))
2967                 RETURN(-ENXIO);
2968
2969         /* System is not ready, try again later. */
2970         if (unlikely(lfsck->li_namespace == NULL))
2971                 GOTO(put, rc = -EAGAIN);
2972
2973         /* start == NULL means auto trigger paused LFSCK. */
2974         if ((start == NULL) &&
2975             (list_empty(&lfsck->li_list_scan) ||
2976              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2977                 GOTO(put, rc = 0);
2978
2979         bk = &lfsck->li_bookmark_ram;
2980         thread = &lfsck->li_thread;
2981         mutex_lock(&lfsck->li_mutex);
2982         spin_lock(&lfsck->li_lock);
2983         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2984                 rc = -EALREADY;
2985                 if (unlikely(start == NULL)) {
2986                         spin_unlock(&lfsck->li_lock);
2987                         GOTO(out, rc);
2988                 }
2989
2990                 while (start->ls_active != 0) {
2991                         if (!(type & start->ls_active)) {
2992                                 type <<= 1;
2993                                 continue;
2994                         }
2995
2996                         com = __lfsck_component_find(lfsck, type,
2997                                                      &lfsck->li_list_scan);
2998                         if (com == NULL)
2999                                 com = __lfsck_component_find(lfsck, type,
3000                                                 &lfsck->li_list_double_scan);
3001                         if (com == NULL) {
3002                                 rc = -EOPNOTSUPP;
3003                                 break;
3004                         }
3005
3006                         if (com->lc_ops->lfsck_join != NULL) {
3007                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
3008                                 if (rc != 0 && rc != -EALREADY)
3009                                         break;
3010                         }
3011                         start->ls_active &= ~type;
3012                         type <<= 1;
3013                 }
3014                 spin_unlock(&lfsck->li_lock);
3015                 GOTO(out, rc);
3016         }
3017         spin_unlock(&lfsck->li_lock);
3018
3019         lfsck->li_status = 0;
3020         lfsck->li_oit_over = 0;
3021         lfsck->li_start_unplug = 0;
3022         lfsck->li_drop_dryrun = 0;
3023         lfsck->li_new_scanned = 0;
3024
3025         /* For auto trigger. */
3026         if (start == NULL)
3027                 goto trigger;
3028
3029         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3030                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
3031                        lfsck_lfsck2name(lfsck));
3032
3033                 GOTO(out, rc = -EPERM);
3034         }
3035
3036         start->ls_version = bk->lb_version;
3037
3038         if (start->ls_active != 0) {
3039                 struct lfsck_component *next;
3040
3041                 if (start->ls_active == LFSCK_TYPES_ALL)
3042                         start->ls_active = LFSCK_TYPES_SUPPORTED;
3043
3044                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
3045                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
3046                         GOTO(out, rc = -ENOTSUPP);
3047                 }
3048
3049                 list_for_each_entry_safe(com, next,
3050                                          &lfsck->li_list_scan, lc_link) {
3051                         if (!(com->lc_type & start->ls_active)) {
3052                                 rc = com->lc_ops->lfsck_post(env, com, 0,
3053                                                              false);
3054                                 if (rc != 0)
3055                                         GOTO(out, rc);
3056                         }
3057                 }
3058
3059                 while (start->ls_active != 0) {
3060                         if (type & start->ls_active) {
3061                                 com = __lfsck_component_find(lfsck, type,
3062                                                         &lfsck->li_list_idle);
3063                                 if (com != NULL)
3064                                         /* The component status will be updated
3065                                          * when its prep() is called later by
3066                                          * the LFSCK main engine. */
3067                                         list_move_tail(&com->lc_link,
3068                                                        &lfsck->li_list_scan);
3069                                 start->ls_active &= ~type;
3070                         }
3071                         type <<= 1;
3072                 }
3073         }
3074
3075         if (list_empty(&lfsck->li_list_scan)) {
3076                 /* The speed limit will be used to control both the LFSCK and
3077                  * low layer scrub (if applied), need to be handled firstly. */
3078                 if (start->ls_valid & LSV_SPEED_LIMIT) {
3079                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
3080                                 rc = lfsck_bookmark_store(env, lfsck);
3081                                 if (rc != 0)
3082                                         GOTO(out, rc);
3083                         }
3084                 }
3085
3086                 goto trigger;
3087         }
3088
3089         if (start->ls_flags & LPF_RESET)
3090                 flags |= DOIF_RESET;
3091
3092         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
3093         if (rc != 0)
3094                 GOTO(out, rc);
3095
3096         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3097                 start->ls_active |= com->lc_type;
3098                 if (flags & DOIF_RESET) {
3099                         rc = com->lc_ops->lfsck_reset(env, com, false);
3100                         if (rc != 0)
3101                                 GOTO(out, rc);
3102                 }
3103         }
3104
3105         ltds = &lfsck->li_mdt_descs;
3106         down_read(&ltds->ltd_rw_sem);
3107         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
3108                 ltd = lfsck_ltd2tgt(ltds, idx);
3109                 LASSERT(ltd != NULL);
3110
3111                 ltd->ltd_layout_done = 0;
3112                 ltd->ltd_namespace_done = 0;
3113                 ltd->ltd_synced_failures = 0;
3114                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_NAMESPACE);
3115                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3116                 list_del_init(&ltd->ltd_layout_phase_list);
3117                 list_del_init(&ltd->ltd_layout_list);
3118                 list_del_init(&ltd->ltd_namespace_phase_list);
3119                 list_del_init(&ltd->ltd_namespace_list);
3120         }
3121         up_read(&ltds->ltd_rw_sem);
3122
3123         ltds = &lfsck->li_ost_descs;
3124         down_read(&ltds->ltd_rw_sem);
3125         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
3126                 ltd = lfsck_ltd2tgt(ltds, idx);
3127                 LASSERT(ltd != NULL);
3128
3129                 ltd->ltd_layout_done = 0;
3130                 ltd->ltd_synced_failures = 0;
3131                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3132                 list_del_init(&ltd->ltd_layout_phase_list);
3133                 list_del_init(&ltd->ltd_layout_list);
3134         }
3135         up_read(&ltds->ltd_rw_sem);
3136
3137 trigger:
3138         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
3139         if (bk->lb_param & LPF_DRYRUN)
3140                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
3141
3142         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
3143                 valid |= DOIV_ERROR_HANDLE;
3144                 if (start->ls_flags & LPF_FAILOUT)
3145                         flags |= DOIF_FAILOUT;
3146         }
3147
3148         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
3149                 valid |= DOIV_DRYRUN;
3150                 if (start->ls_flags & LPF_DRYRUN)
3151                         flags |= DOIF_DRYRUN;
3152         }
3153
3154         if (!list_empty(&lfsck->li_list_scan))
3155                 flags |= DOIF_OUTUSED;
3156
3157         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
3158         thread_set_flags(thread, 0);
3159         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
3160         if (IS_ERR(lta))
3161                 GOTO(out, rc = PTR_ERR(lta));
3162
3163         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
3164         task = kthread_run(lfsck_master_engine, lta, "lfsck");
3165         if (IS_ERR(task)) {
3166                 rc = PTR_ERR(task);
3167                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
3168                        lfsck_lfsck2name(lfsck), rc);
3169                 lfsck_thread_args_fini(lta);
3170
3171                 GOTO(out, rc);
3172         }
3173
3174         l_wait_event(thread->t_ctl_waitq,
3175                      thread_is_running(thread) ||
3176                      thread_is_stopped(thread),
3177                      &lwi);
3178         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
3179                 lfsck->li_start_unplug = 1;
3180                 wake_up_all(&thread->t_ctl_waitq);
3181
3182                 GOTO(out, rc = 0);
3183         }
3184
3185         /* release lfsck::li_mutex to avoid deadlock. */
3186         mutex_unlock(&lfsck->li_mutex);
3187         rc = lfsck_start_all(env, lfsck, start);
3188         if (rc != 0) {
3189                 spin_lock(&lfsck->li_lock);
3190                 if (thread_is_stopped(thread)) {
3191                         spin_unlock(&lfsck->li_lock);
3192                 } else {
3193                         lfsck->li_status = LS_FAILED;
3194                         lfsck->li_flags = 0;
3195                         thread_set_flags(thread, SVC_STOPPING);
3196                         spin_unlock(&lfsck->li_lock);
3197
3198                         lfsck->li_start_unplug = 1;
3199                         wake_up_all(&thread->t_ctl_waitq);
3200                         l_wait_event(thread->t_ctl_waitq,
3201                                      thread_is_stopped(thread),
3202                                      &lwi);
3203                 }
3204         } else&nb