Whamcloud - gitweb
LU-17010 lfsck: don't create trans in dryrun mode
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <linux/kthread.h>
34 #include <linux/sched.h>
35 #include <linux/list.h>
36 #include <linux/delay.h>
37 #include <lu_object.h>
38 #include <dt_object.h>
39 #include <md_object.h>
40 #include <lustre_fld.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_lfsck.h>
44 #include <lu_target.h>
45
46 #include "lfsck_internal.h"
47
48 #define LFSCK_CHECKPOINT_SKIP   1
49
50 /* define lfsck thread key */
51 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
52
53 static void lfsck_key_fini(const struct lu_context *ctx,
54                            struct lu_context_key *key, void *data)
55 {
56         struct lfsck_thread_info *info = data;
57
58         lu_buf_free(&info->lti_linkea_buf);
59         lu_buf_free(&info->lti_linkea_buf2);
60         lu_buf_free(&info->lti_big_buf);
61         OBD_FREE_PTR(info);
62 }
63
64 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
65 LU_KEY_INIT_GENERIC(lfsck);
66
67 static LIST_HEAD(lfsck_instance_list);
68 static LIST_HEAD(lfsck_ost_orphan_list);
69 static LIST_HEAD(lfsck_mdt_orphan_list);
70 static DEFINE_SPINLOCK(lfsck_instance_lock);
71
72 const char *const lfsck_flags_names[] = {
73         "scanned-once",
74         "inconsistent",
75         "upgrade",
76         "incomplete",
77         "crashed_lastid",
78         NULL
79 };
80
81 const char *const lfsck_param_names[] = {
82         NULL,
83         "failout",
84         "dryrun",
85         "all_targets",
86         "broadcast",
87         "orphan",
88         "create_ostobj",
89         "create_mdtobj",
90         NULL,
91         "delay_create_ostobj",
92         NULL
93 };
94
95 enum lfsck_verify_lpf_types {
96         LVLT_BY_BOOKMARK        = 0,
97         LVLT_BY_NAMEENTRY       = 1,
98 };
99
100 static inline void
101 lfsck_reset_ltd_status(struct lfsck_tgt_desc *ltd, enum lfsck_type type)
102 {
103         if (type == LFSCK_TYPE_LAYOUT) {
104                 ltd->ltd_layout_status = LS_MAX;
105                 ltd->ltd_layout_repaired = 0;
106         } else {
107                 ltd->ltd_namespace_status = LS_MAX;
108                 ltd->ltd_namespace_repaired = 0;
109         }
110 }
111
112 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
113 {
114         spin_lock_init(&ltds->ltd_lock);
115         init_rwsem(&ltds->ltd_rw_sem);
116         INIT_LIST_HEAD(&ltds->ltd_orphan);
117         ltds->ltd_tgts_bitmap = bitmap_zalloc(BITS_PER_LONG, GFP_KERNEL);
118         if (!ltds->ltd_tgts_bitmap)
119                 return -ENOMEM;
120
121         return 0;
122 }
123
124 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
125 {
126         struct lfsck_tgt_desc *ltd;
127         struct lfsck_tgt_desc *next;
128         int idx;
129
130         down_write(&ltds->ltd_rw_sem);
131
132         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
133                                  ltd_orphan_list) {
134                 list_del_init(&ltd->ltd_orphan_list);
135                 lfsck_tgt_put(ltd);
136         }
137
138         if (unlikely(!ltds->ltd_tgts_bitmap)) {
139                 up_write(&ltds->ltd_rw_sem);
140
141                 return;
142         }
143
144         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
145                 ltd = lfsck_ltd2tgt(ltds, idx);
146                 if (likely(ltd != NULL)) {
147                         LASSERT(list_empty(&ltd->ltd_layout_list));
148                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
149                         LASSERT(list_empty(&ltd->ltd_namespace_list));
150                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
151
152                         ltds->ltd_tgtnr--;
153                         clear_bit(idx, ltds->ltd_tgts_bitmap);
154                         lfsck_assign_tgt(ltds, NULL, idx);
155                         lfsck_tgt_put(ltd);
156                 }
157         }
158
159         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
160                  ltds->ltd_tgtnr);
161
162         for (idx = 0; idx < ARRAY_SIZE(ltds->ltd_tgts_idx); idx++) {
163                 if (ltds->ltd_tgts_idx[idx] != NULL) {
164                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
165                         ltds->ltd_tgts_idx[idx] = NULL;
166                 }
167         }
168
169         bitmap_free(ltds->ltd_tgts_bitmap);
170         ltds->ltd_tgts_bitmap = NULL;
171         up_write(&ltds->ltd_rw_sem);
172 }
173
174 static int __lfsck_add_target(const struct lu_env *env,
175                               struct lfsck_instance *lfsck,
176                               struct lfsck_tgt_desc *ltd,
177                               bool for_ost, bool locked)
178 {
179         struct lfsck_tgt_descs *ltds;
180         __u32                   index = ltd->ltd_index;
181         int                     rc    = 0;
182         ENTRY;
183
184         if (for_ost)
185                 ltds = &lfsck->li_ost_descs;
186         else
187                 ltds = &lfsck->li_mdt_descs;
188
189         if (!locked)
190                 down_write(&ltds->ltd_rw_sem);
191
192         LASSERT(ltds->ltd_tgts_bitmap);
193
194         if (index >= ltds->ltd_tgts_mask_len) {
195                 u32 newsize = max_t(u32, ltds->ltd_tgts_mask_len,
196                                     BITS_PER_LONG);
197                 unsigned long *old_bitmap = ltds->ltd_tgts_bitmap;
198                 unsigned long *new_bitmap;
199
200                 while (newsize < index + 1)
201                         newsize <<= 1;
202
203                 new_bitmap = bitmap_zalloc(newsize, GFP_KERNEL);
204                 if (!new_bitmap)
205                         GOTO(unlock, rc = -ENOMEM);
206
207                 if (ltds->ltd_tgtnr > 0) {
208                         bitmap_copy(new_bitmap, old_bitmap,
209                                     ltds->ltd_tgts_mask_len);
210                 }
211                 ltds->ltd_tgts_bitmap = new_bitmap;
212                 ltds->ltd_tgts_mask_len = newsize;
213                 bitmap_free(old_bitmap);
214         }
215
216         if (test_bit(index, ltds->ltd_tgts_bitmap)) {
217                 CERROR("%s: the device %s (%u) is registered already\n",
218                        lfsck_lfsck2name(lfsck),
219                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
220                 GOTO(unlock, rc = -EEXIST);
221         }
222
223         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
224                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
225                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
226                         GOTO(unlock, rc = -ENOMEM);
227         }
228
229         lfsck_assign_tgt(ltds, ltd, index);
230         set_bit(index, ltds->ltd_tgts_bitmap);
231         ltds->ltd_tgtnr++;
232
233         GOTO(unlock, rc = 0);
234
235 unlock:
236         if (!locked)
237                 up_write(&ltds->ltd_rw_sem);
238
239         return rc;
240 }
241
242 static int lfsck_add_target_from_orphan(const struct lu_env *env,
243                                         struct lfsck_instance *lfsck)
244 {
245         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
246         struct lfsck_tgt_desc   *ltd;
247         struct lfsck_tgt_desc   *next;
248         struct list_head        *head    = &lfsck_ost_orphan_list;
249         int                      rc;
250         bool                     for_ost = true;
251
252 again:
253         spin_lock(&lfsck_instance_lock);
254         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
255                 if (ltd->ltd_key == lfsck->li_bottom)
256                         list_move_tail(&ltd->ltd_orphan_list,
257                                        &ltds->ltd_orphan);
258         }
259         spin_unlock(&lfsck_instance_lock);
260
261         down_write(&ltds->ltd_rw_sem);
262         while (!list_empty(&ltds->ltd_orphan)) {
263                 ltd = list_first_entry(&ltds->ltd_orphan,
264                                        struct lfsck_tgt_desc,
265                                        ltd_orphan_list);
266                 list_del_init(&ltd->ltd_orphan_list);
267                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
268                 /* Do not hold the semaphore for too long time. */
269                 up_write(&ltds->ltd_rw_sem);
270                 if (rc != 0)
271                         return rc;
272
273                 down_write(&ltds->ltd_rw_sem);
274         }
275         up_write(&ltds->ltd_rw_sem);
276
277         if (for_ost) {
278                 ltds = &lfsck->li_mdt_descs;
279                 head = &lfsck_mdt_orphan_list;
280                 for_ost = false;
281                 goto again;
282         }
283
284         return 0;
285 }
286
287 static inline struct lfsck_component *
288 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
289                        struct list_head *list)
290 {
291         struct lfsck_component *com;
292
293         list_for_each_entry(com, list, lc_link) {
294                 if (com->lc_type == type)
295                         return com;
296         }
297         return NULL;
298 }
299
300 struct lfsck_component *
301 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
302 {
303         struct lfsck_component *com;
304
305         spin_lock(&lfsck->li_lock);
306         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
307         if (com != NULL)
308                 goto unlock;
309
310         com = __lfsck_component_find(lfsck, type,
311                                      &lfsck->li_list_double_scan);
312         if (com != NULL)
313                 goto unlock;
314
315         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
316
317 unlock:
318         if (com != NULL)
319                 lfsck_component_get(com);
320         spin_unlock(&lfsck->li_lock);
321         return com;
322 }
323
324 void lfsck_component_cleanup(const struct lu_env *env,
325                              struct lfsck_component *com)
326 {
327         if (!list_empty(&com->lc_link))
328                 list_del_init(&com->lc_link);
329         if (!list_empty(&com->lc_link_dir))
330                 list_del_init(&com->lc_link_dir);
331
332         lfsck_component_put(env, com);
333 }
334
335 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
336                     struct lu_fid *fid, bool locked)
337 {
338         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
339         int                      rc = 0;
340         ENTRY;
341
342         if (!locked)
343                 mutex_lock(&lfsck->li_mutex);
344
345         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
346         if (rc >= 0) {
347                 bk->lb_last_fid = *fid;
348                 /* We do not care about whether the subsequent sub-operations
349                  * failed or not. The worst case is that one FID is lost that
350                  * is not a big issue for the LFSCK since it is relative rare
351                  * for LFSCK create. */
352                 rc = lfsck_bookmark_store(env, lfsck);
353         }
354
355         if (!locked)
356                 mutex_unlock(&lfsck->li_mutex);
357
358         RETURN(rc);
359 }
360
361 static int __lfsck_ibits_lock(const struct lu_env *env,
362                               struct lfsck_instance *lfsck,
363                               struct dt_object *obj, struct ldlm_res_id *resid,
364                               struct lustre_handle *lh, __u64 bits,
365                               enum ldlm_mode mode)
366 {
367         struct lfsck_thread_info        *info   = lfsck_env_info(env);
368         union ldlm_policy_data          *policy = &info->lti_policy;
369         __u64                            flags  = LDLM_FL_ATOMIC_CB;
370         int                              rc;
371
372         LASSERT(lfsck->li_namespace != NULL);
373
374         memset(policy, 0, sizeof(*policy));
375         policy->l_inodebits.bits = bits;
376         policy->l_inodebits.li_initiator_id = lfsck_dev_idx(lfsck);
377         if (dt_object_remote(obj)) {
378                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
379
380                 memset(einfo, 0, sizeof(*einfo));
381                 einfo->ei_type = LDLM_IBITS;
382                 einfo->ei_mode = mode;
383                 einfo->ei_cb_bl = ldlm_blocking_ast;
384                 einfo->ei_cb_cp = ldlm_completion_ast;
385                 einfo->ei_res_id = resid;
386                 einfo->ei_req_slot = 1;
387
388                 rc = dt_object_lock(env, obj, lh, einfo, policy);
389                 /* for regular checks LFSCK doesn't use LDLM locking,
390                  * so the state isn't coherent. here we just took LDLM
391                  * lock for coherency and it's time to invalidate
392                  * previous state */
393                 if (rc == ELDLM_OK)
394                         dt_invalidate(env, obj);
395         } else {
396                 rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
397                                             LDLM_IBITS, policy, mode,
398                                             &flags, ldlm_blocking_ast,
399                                             ldlm_completion_ast, NULL, NULL,
400                                             0, LVB_T_NONE, NULL, lh);
401         }
402
403         if (rc == ELDLM_OK) {
404                 rc = 0;
405         } else {
406                 memset(lh, 0, sizeof(*lh));
407                 rc = -EIO;
408         }
409
410         return rc;
411 }
412
413 /**
414  * Request the specified ibits lock for the given object.
415  *
416  * Before the LFSCK modifying on the namespace visible object,
417  * it needs to acquire related ibits ldlm lock.
418  *
419  * \param[in] env       pointer to the thread context
420  * \param[in] lfsck     pointer to the lfsck instance
421  * \param[in] obj       pointer to the dt_object to be locked
422  * \param[out] lh       pointer to the lock handle
423  * \param[in] bits      the bits for the ldlm lock to be acquired
424  * \param[in] mode      the mode for the ldlm lock to be acquired
425  *
426  * \retval              0 for success
427  * \retval              negative error number on failure
428  */
429 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
430                      struct dt_object *obj, struct lustre_handle *lh,
431                      __u64 bits, enum ldlm_mode mode)
432 {
433         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
434
435         LASSERT(!lustre_handle_is_used(lh));
436
437         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
438         return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
439 }
440
441 /**
442  * Request the remote LOOKUP lock for the given object.
443  *
444  * If \a pobj is remote, the LOOKUP lock of \a obj is on the MDT where
445  * \a pobj is, acquire LOOKUP lock there.
446  *
447  * \param[in] env       pointer to the thread context
448  * \param[in] lfsck     pointer to the lfsck instance
449  * \param[in] pobj      pointer to parent dt_object
450  * \param[in] obj       pointer to the dt_object to be locked
451  * \param[out] lh       pointer to the lock handle
452  * \param[in] mode      the mode for the ldlm lock to be acquired
453  *
454  * \retval              0 for success
455  * \retval              negative error number on failure
456  */
457 int lfsck_remote_lookup_lock(const struct lu_env *env,
458                              struct lfsck_instance *lfsck,
459                              struct dt_object *pobj, struct dt_object *obj,
460                              struct lustre_handle *lh, enum ldlm_mode mode)
461 {
462         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
463
464         LASSERT(!lustre_handle_is_used(lh));
465
466         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
467         return __lfsck_ibits_lock(env, lfsck, pobj, resid, lh,
468                                   MDS_INODELOCK_LOOKUP, mode);
469 }
470
471 /**
472  * Release the the specified ibits lock.
473  *
474  * If the lock has been acquired before, release it
475  * and cleanup the handle. Otherwise, do nothing.
476  *
477  * \param[in] lh        pointer to the lock handle
478  * \param[in] mode      the mode for the ldlm lock to be released
479  */
480 void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
481 {
482         if (lustre_handle_is_used(lh)) {
483                 ldlm_lock_decref(lh, mode);
484                 memset(lh, 0, sizeof(*lh));
485         }
486 }
487
488 /**
489  * Request compound ibits locks for the given <obj, name> pairs.
490  *
491  * Before the LFSCK modifying on the namespace visible object, it needs to
492  * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
493  * the lock purpose. But the simple lfsck_ibits_lock for directory-based
494  * modificationis (such as insert name entry to the directory) may be too
495  * coarse-grained and not efficient.
496  *
497  * The lfsck_lock() will request compound ibits locks on the specified
498  * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
499  * lock on the directory object, and the regular ibits lock on the name hash.
500  *
501  * \param[in] env       pointer to the thread context
502  * \param[in] lfsck     pointer to the lfsck instance
503  * \param[in] obj       pointer to the dt_object to be locked
504  * \param[in] name      used for building the PDO lock resource
505  * \param[out] llh      pointer to the lfsck_lock_handle
506  * \param[in] bits      the bits for the ldlm lock to be acquired
507  * \param[in] mode      the mode for the ldlm lock to be acquired
508  *
509  * \retval              0 for success
510  * \retval              negative error number on failure
511  */
512 int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
513                struct dt_object *obj, const char *name,
514                struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode)
515 {
516         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
517         int                 rc;
518
519         LASSERT(S_ISDIR(lfsck_object_type(obj)));
520         LASSERT(name != NULL);
521         LASSERT(name[0] != 0);
522         LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
523         LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
524
525         switch (mode) {
526         case LCK_EX:
527                 llh->llh_pdo_mode = LCK_EX;
528                 break;
529         case LCK_PW:
530                 llh->llh_pdo_mode = LCK_CW;
531                 break;
532         case LCK_PR:
533                 llh->llh_pdo_mode = LCK_CR;
534                 break;
535         default:
536                 CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
537                        DFID"\n", lfsck_lfsck2name(lfsck), mode,
538                        PFID(lfsck_dto2fid(obj)));
539                 LBUG();
540         }
541
542         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
543         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
544                                 MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
545         if (rc != 0)
546                 return rc;
547
548         llh->llh_reg_mode = mode;
549         resid->name[LUSTRE_RES_ID_HSH_OFF] = ll_full_name_hash(NULL, name,
550                                                                strlen(name));
551         LASSERT(resid->name[LUSTRE_RES_ID_HSH_OFF] != 0);
552         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
553                                 bits, llh->llh_reg_mode);
554         if (rc != 0)
555                 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
556
557         return rc;
558 }
559
560 /**
561  * Release the the compound ibits locks.
562  *
563  * \param[in] llh       pointer to the lfsck_lock_handle to be released
564  */
565 void lfsck_unlock(struct lfsck_lock_handle *llh)
566 {
567         lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
568         lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
569 }
570
571 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
572                               struct lfsck_instance *lfsck,
573                               const struct lu_fid *fid)
574 {
575         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
576         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
577         int                      rc;
578
579         if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
580                 /* "ROOT" is always on the MDT0. */
581                 if (lu_fid_eq(fid, &lfsck->li_global_root_fid))
582                         return 0;
583
584                 return lfsck_dev_idx(lfsck);
585         }
586
587         fld_range_set_mdt(range);
588         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
589         if (rc == 0)
590                 rc = range->lsr_index;
591
592         return rc;
593 }
594
595 const char dot[] = ".";
596 const char dotdot[] = "..";
597 static const char dotlustre[] = ".lustre";
598 static const char lostfound[] = "lost+found";
599
600 /**
601  * Remove the name entry from the .lustre/lost+found directory.
602  *
603  * No need to care about the object referenced by the name entry,
604  * either the name entry is invalid or redundant, or the referenced
605  * object has been processed or will be handled by others.
606  *
607  * \param[in] env       pointer to the thread context
608  * \param[in] lfsck     pointer to the lfsck instance
609  * \param[in] name      the name for the name entry to be removed
610  *
611  * \retval              0 for success
612  * \retval              negative error number on failure
613  */
614 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
615                                        struct lfsck_instance *lfsck,
616                                        const char *name)
617 {
618         struct dt_object        *parent = lfsck->li_lpf_root_obj;
619         struct dt_device        *dev    = lfsck_obj2dev(parent);
620         struct thandle          *th;
621         struct lfsck_lock_handle *llh   = &lfsck_env_info(env)->lti_llh;
622         int                      rc;
623         ENTRY;
624
625         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
626                 RETURN(0);
627
628         rc = lfsck_lock(env, lfsck, parent, name, llh,
629                         MDS_INODELOCK_UPDATE, LCK_PW);
630         if (rc != 0)
631                 RETURN(rc);
632
633         th = lfsck_trans_create(env, dev, lfsck);
634         if (IS_ERR(th))
635                 GOTO(unlock, rc = PTR_ERR(th));
636
637         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
638         if (rc != 0)
639                 GOTO(stop, rc);
640
641         rc = dt_declare_ref_del(env, parent, th);
642         if (rc != 0)
643                 GOTO(stop, rc);
644
645         rc = dt_trans_start_local(env, dev, th);
646         if (rc != 0)
647                 GOTO(stop, rc);
648
649         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
650         if (rc != 0)
651                 GOTO(stop, rc);
652
653         dt_write_lock(env, parent, 0);
654         rc = dt_ref_del(env, parent, th);
655         dt_write_unlock(env, parent);
656
657         GOTO(stop, rc);
658
659 stop:
660         dt_trans_stop(env, dev, th);
661
662 unlock:
663         lfsck_unlock(llh);
664
665         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
666                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
667
668         return rc;
669 }
670
671 static int lfsck_create_lpf_local(const struct lu_env *env,
672                                   struct lfsck_instance *lfsck,
673                                   struct dt_object *child,
674                                   struct lu_attr *la,
675                                   struct dt_object_format *dof,
676                                   const char *name)
677 {
678         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
679         struct dt_object        *parent = lfsck->li_lpf_root_obj;
680         struct dt_device        *dev    = lfsck_obj2dev(child);
681         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
682         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
683         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
684         struct thandle          *th     = NULL;
685         struct linkea_data       ldata  = { NULL };
686         struct lu_buf            linkea_buf;
687         const struct lu_name    *cname;
688         loff_t                   pos    = 0;
689         int                      len    = sizeof(struct lfsck_bookmark);
690         int                      rc;
691         ENTRY;
692
693         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
694                 RETURN(0);
695
696         cname = lfsck_name_get_const(env, name, strlen(name));
697         rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf2,
698                               cname, lfsck_dto2fid(parent));
699         if (rc != 0)
700                 RETURN(rc);
701
702         th = lfsck_trans_create(env, dev, lfsck);
703         if (IS_ERR(th))
704                 RETURN(PTR_ERR(th));
705
706         /* 1a. create child */
707         rc = dt_declare_create(env, child, la, NULL, dof, th);
708         if (rc != 0)
709                 GOTO(stop, rc);
710
711         if (!dt_try_as_dir(env, child, false))
712                 GOTO(stop, rc = -ENOTDIR);
713
714         /* 2a. increase child nlink */
715         rc = dt_declare_ref_add(env, child, th);
716         if (rc != 0)
717                 GOTO(stop, rc);
718
719         /* 3a. insert dot into child dir */
720         rec->rec_type = S_IFDIR;
721         rec->rec_fid = cfid;
722         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
723                                (const struct dt_key *)dot, th);
724         if (rc != 0)
725                 GOTO(stop, rc);
726
727         /* 4a. insert dotdot into child dir */
728         rec->rec_fid = &LU_LPF_FID;
729         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
730                                (const struct dt_key *)dotdot, th);
731         if (rc != 0)
732                 GOTO(stop, rc);
733
734         /* 5a. insert linkEA for child */
735         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
736                        ldata.ld_leh->leh_len);
737         rc = dt_declare_xattr_set(env, child, &linkea_buf,
738                                   XATTR_NAME_LINK, 0, th);
739         if (rc != 0)
740                 GOTO(stop, rc);
741
742         /* 6a. insert name into parent dir */
743         rec->rec_type = S_IFDIR;
744         rec->rec_fid = cfid;
745         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
746                                (const struct dt_key *)name, th);
747         if (rc != 0)
748                 GOTO(stop, rc);
749
750         /* 7a. increase parent nlink */
751         rc = dt_declare_ref_add(env, parent, th);
752         if (rc != 0)
753                 GOTO(stop, rc);
754
755         /* 8a. update bookmark */
756         rc = dt_declare_record_write(env, bk_obj,
757                                      lfsck_buf_get(env, bk, len), 0, th);
758         if (rc != 0)
759                 GOTO(stop, rc);
760
761         rc = dt_trans_start_local(env, dev, th);
762         if (rc != 0)
763                 GOTO(stop, rc);
764
765         dt_write_lock(env, child, 0);
766         /* 1b. create child */
767         rc = dt_create(env, child, la, NULL, dof, th);
768         if (rc != 0)
769                 GOTO(unlock, rc);
770
771         /* 2b. increase child nlink */
772         rc = dt_ref_add(env, child, th);
773         if (rc != 0)
774                 GOTO(unlock, rc);
775
776         /* 3b. insert dot into child dir */
777         rec->rec_fid = cfid;
778         rc = dt_insert(env, child, (const struct dt_rec *)rec,
779                        (const struct dt_key *)dot, th);
780         if (rc != 0)
781                 GOTO(unlock, rc);
782
783         /* 4b. insert dotdot into child dir */
784         rec->rec_fid = &LU_LPF_FID;
785         rc = dt_insert(env, child, (const struct dt_rec *)rec,
786                        (const struct dt_key *)dotdot, th);
787         if (rc != 0)
788                 GOTO(unlock, rc);
789
790         /* 5b. insert linkEA for child. */
791         rc = dt_xattr_set(env, child, &linkea_buf,
792                           XATTR_NAME_LINK, 0, th);
793         dt_write_unlock(env, child);
794         if (rc != 0)
795                 GOTO(stop, rc);
796
797         /* 6b. insert name into parent dir */
798         rec->rec_fid = cfid;
799         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
800                        (const struct dt_key *)name, th);
801         if (rc != 0)
802                 GOTO(stop, rc);
803
804         dt_write_lock(env, parent, 0);
805         /* 7b. increase parent nlink */
806         rc = dt_ref_add(env, parent, th);
807         dt_write_unlock(env, parent);
808         if (rc != 0)
809                 GOTO(stop, rc);
810
811         bk->lb_lpf_fid = *cfid;
812         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
813
814         /* 8b. update bookmark */
815         rc = dt_record_write(env, bk_obj,
816                              lfsck_buf_get(env, bk, len), &pos, th);
817
818         GOTO(stop, rc);
819
820 unlock:
821         dt_write_unlock(env, child);
822
823 stop:
824         dt_trans_stop(env, dev, th);
825
826         return rc;
827 }
828
829 static int lfsck_create_lpf_remote(const struct lu_env *env,
830                                    struct lfsck_instance *lfsck,
831                                    struct dt_object *child,
832                                    struct lu_attr *la,
833                                    struct dt_object_format *dof,
834                                    const char *name)
835 {
836         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
837         struct dt_object        *parent = lfsck->li_lpf_root_obj;
838         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
839         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
840         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
841         struct thandle          *th     = NULL;
842         struct linkea_data       ldata  = { NULL };
843         struct lu_buf            linkea_buf;
844         const struct lu_name    *cname;
845         struct dt_device        *dev;
846         loff_t                   pos    = 0;
847         int                      len    = sizeof(struct lfsck_bookmark);
848         int                      rc;
849         ENTRY;
850
851         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
852                 RETURN(0);
853
854         cname = lfsck_name_get_const(env, name, strlen(name));
855         rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf2,
856                               cname, lfsck_dto2fid(parent));
857         if (rc != 0)
858                 RETURN(rc);
859
860         /* Create .lustre/lost+found/MDTxxxx. */
861
862         /* XXX: Currently, cross-MDT create operation needs to create the child
863          *      object firstly, then insert name into the parent directory. For
864          *      this case, the child object resides on current MDT (local), but
865          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
866          *      easy to contain all the sub-modifications orderly within single
867          *      transaction.
868          *
869          *      To avoid more inconsistency, we split the create operation into
870          *      two transactions:
871          *
872          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
873          *         locally.
874          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
875          *         remotely.
876          *
877          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
878          *      repair such inconsistency when LFSCK run next time. */
879
880         /* Transaction I: locally */
881
882         dev = lfsck_obj2dev(child);
883         th = lfsck_trans_create(env, dev, lfsck);
884         if (IS_ERR(th))
885                 RETURN(PTR_ERR(th));
886
887         /* 1a. create child */
888         rc = dt_declare_create(env, child, la, NULL, dof, th);
889         if (rc != 0)
890                 GOTO(stop, rc);
891
892         if (!dt_try_as_dir(env, child, false))
893                 GOTO(stop, rc = -ENOTDIR);
894
895         /* 2a. increase child nlink */
896         rc = dt_declare_ref_add(env, child, th);
897         if (rc != 0)
898                 GOTO(stop, rc);
899
900         /* 3a. insert dot into child dir */
901         rec->rec_type = S_IFDIR;
902         rec->rec_fid = cfid;
903         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
904                                (const struct dt_key *)dot, th);
905         if (rc != 0)
906                 GOTO(stop, rc);
907
908         /* 4a. insert dotdot into child dir */
909         rec->rec_fid = &LU_LPF_FID;
910         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
911                                (const struct dt_key *)dotdot, th);
912         if (rc != 0)
913                 GOTO(stop, rc);
914
915         /* 5a. insert linkEA for child */
916         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
917                        ldata.ld_leh->leh_len);
918         rc = dt_declare_xattr_set(env, child, &linkea_buf,
919                                   XATTR_NAME_LINK, 0, th);
920         if (rc != 0)
921                 GOTO(stop, rc);
922
923         /* 6a. update bookmark */
924         rc = dt_declare_record_write(env, bk_obj,
925                                      lfsck_buf_get(env, bk, len), 0, th);
926         if (rc != 0)
927                 GOTO(stop, rc);
928
929         rc = dt_trans_start_local(env, dev, th);
930         if (rc != 0)
931                 GOTO(stop, rc);
932
933         dt_write_lock(env, child, 0);
934         /* 1b. create child */
935         rc = dt_create(env, child, la, NULL, dof, th);
936         if (rc != 0)
937                 GOTO(unlock, rc);
938
939         /* 2b. increase child nlink */
940         rc = dt_ref_add(env, child, th);
941         if (rc != 0)
942                 GOTO(unlock, rc);
943
944         /* 3b. insert dot into child dir */
945         rec->rec_type = S_IFDIR;
946         rec->rec_fid = cfid;
947         rc = dt_insert(env, child, (const struct dt_rec *)rec,
948                        (const struct dt_key *)dot, th);
949         if (rc != 0)
950                 GOTO(unlock, rc);
951
952         /* 4b. insert dotdot into child dir */
953         rec->rec_fid = &LU_LPF_FID;
954         rc = dt_insert(env, child, (const struct dt_rec *)rec,
955                        (const struct dt_key *)dotdot, th);
956         if (rc != 0)
957                 GOTO(unlock, rc);
958
959         /* 5b. insert linkEA for child */
960         rc = dt_xattr_set(env, child, &linkea_buf,
961                           XATTR_NAME_LINK, 0, th);
962         if (rc != 0)
963                 GOTO(unlock, rc);
964
965         bk->lb_lpf_fid = *cfid;
966         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
967
968         /* 6b. update bookmark */
969         rc = dt_record_write(env, bk_obj,
970                              lfsck_buf_get(env, bk, len), &pos, th);
971
972         dt_write_unlock(env, child);
973         dt_trans_stop(env, dev, th);
974         if (rc != 0)
975                 RETURN(rc);
976
977         /* Transaction II: remotely */
978
979         dev = lfsck_obj2dev(parent);
980         th = lfsck_trans_create(env, dev, lfsck);
981         if (IS_ERR(th))
982                 RETURN(PTR_ERR(th));
983
984         th->th_sync = 1;
985         /* 5a. insert name into parent dir */
986         rec->rec_fid = cfid;
987         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
988                                (const struct dt_key *)name, th);
989         if (rc != 0)
990                 GOTO(stop, rc);
991
992         /* 6a. increase parent nlink */
993         rc = dt_declare_ref_add(env, parent, th);
994         if (rc != 0)
995                 GOTO(stop, rc);
996
997         rc = dt_trans_start_local(env, dev, th);
998         if (rc != 0)
999                 GOTO(stop, rc);
1000
1001         /* 5b. insert name into parent dir */
1002         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1003                        (const struct dt_key *)name, th);
1004         if (rc != 0)
1005                 GOTO(stop, rc);
1006
1007         dt_write_lock(env, parent, 0);
1008         /* 6b. increase parent nlink */
1009         rc = dt_ref_add(env, parent, th);
1010         dt_write_unlock(env, parent);
1011
1012         GOTO(stop, rc);
1013
1014 unlock:
1015         dt_write_unlock(env, child);
1016 stop:
1017         dt_trans_stop(env, dev, th);
1018
1019         if (rc != 0 && dev == lfsck_obj2dev(parent))
1020                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
1021                        "for orphans, but failed to insert the name %s "
1022                        "to the .lustre/lost+found/. Such inconsistency "
1023                        "will be repaired when LFSCK run next time: rc = %d\n",
1024                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
1025
1026         return rc;
1027 }
1028
1029 /**
1030  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
1031  *
1032  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
1033  * orphans and other uncertain inconsistent objects found during the
1034  * LFSCK. Such directory will be created by the LFSCK engine on the
1035  * local MDT before the LFSCK scanning.
1036  *
1037  * \param[in] env       pointer to the thread context
1038  * \param[in] lfsck     pointer to the lfsck instance
1039  *
1040  * \retval              0 for success
1041  * \retval              negative error number on failure
1042  */
1043 static int lfsck_create_lpf(const struct lu_env *env,
1044                             struct lfsck_instance *lfsck)
1045 {
1046         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
1047         struct lfsck_thread_info *info  = lfsck_env_info(env);
1048         struct lu_fid            *cfid  = &info->lti_fid2;
1049         struct lu_attr           *la    = &info->lti_la;
1050         struct dt_object_format  *dof   = &info->lti_dof;
1051         struct dt_object         *parent = lfsck->li_lpf_root_obj;
1052         struct dt_object         *child = NULL;
1053         struct lfsck_lock_handle *llh   = &info->lti_llh;
1054         char                      name[8];
1055         int                       node  = lfsck_dev_idx(lfsck);
1056         int                       rc    = 0;
1057         ENTRY;
1058
1059         LASSERT(lfsck->li_master);
1060         LASSERT(parent != NULL);
1061         LASSERT(lfsck->li_lpf_obj == NULL);
1062
1063         snprintf(name, 8, "MDT%04x", node);
1064         rc = lfsck_lock(env, lfsck, parent, name, llh,
1065                         MDS_INODELOCK_UPDATE, LCK_PW);
1066         if (rc != 0)
1067                 RETURN(rc);
1068
1069         if (fid_is_zero(&bk->lb_lpf_fid)) {
1070                 /* There is corner case that: in former LFSCK scanning we have
1071                  * created the .lustre/lost+found/MDTxxxx but failed to update
1072                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
1073                  * it from MDT0 firstly. */
1074                 rc = dt_lookup_dir(env, parent, name, cfid);
1075                 if (rc != 0 && rc != -ENOENT)
1076                         GOTO(unlock, rc);
1077
1078                 if (rc == 0) {
1079                         bk->lb_lpf_fid = *cfid;
1080                         rc = lfsck_bookmark_store(env, lfsck);
1081                 } else {
1082                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
1083                 }
1084                 if (rc != 0)
1085                         GOTO(unlock, rc);
1086         } else {
1087                 *cfid = bk->lb_lpf_fid;
1088         }
1089
1090         child = lfsck_object_find_bottom_new(env, lfsck, cfid);
1091         if (IS_ERR(child))
1092                 GOTO(unlock, rc = PTR_ERR(child));
1093
1094         if (dt_object_exists(child)) {
1095                 if (unlikely(!dt_try_as_dir(env, child, true)))
1096                         rc = -ENOTDIR;
1097                 else
1098                         lfsck->li_lpf_obj = child;
1099
1100                 GOTO(unlock, rc);
1101         }
1102
1103         memset(la, 0, sizeof(*la));
1104         la->la_atime = la->la_mtime = la->la_ctime = ktime_get_real_seconds();
1105         la->la_mode = S_IFDIR | S_IRWXU;
1106         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1107                        LA_UID | LA_GID | LA_TYPE;
1108         memset(dof, 0, sizeof(*dof));
1109         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1110
1111         if (node == 0)
1112                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
1113         else
1114                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
1115         if (rc == 0)
1116                 lfsck->li_lpf_obj = child;
1117
1118         GOTO(unlock, rc);
1119
1120 unlock:
1121         lfsck_unlock(llh);
1122         if (rc != 0 && child != NULL && !IS_ERR(child))
1123                 lfsck_object_put(env, child);
1124
1125         return rc;
1126 }
1127
1128 /**
1129  * Scan .lustre/lost+found for bad name entries and remove them.
1130  *
1131  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
1132  * index in the system. Any other formatted name is invalid and should be
1133  * removed.
1134  *
1135  * \param[in] env       pointer to the thread context
1136  * \param[in] lfsck     pointer to the lfsck instance
1137  *
1138  * \retval              0 for success
1139  * \retval              negative error number on failure
1140  */
1141 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
1142                                       struct lfsck_instance *lfsck)
1143 {
1144         struct dt_object        *parent = lfsck->li_lpf_root_obj;
1145         struct lu_dirent        *ent    =
1146                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
1147         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
1148         struct dt_it            *it;
1149         int                      rc;
1150         ENTRY;
1151
1152         it = iops->init(env, parent, LUDA_64BITHASH);
1153         if (IS_ERR(it))
1154                 RETURN(PTR_ERR(it));
1155
1156         rc = iops->load(env, it, 0);
1157         if (rc == 0)
1158                 rc = iops->next(env, it);
1159         else if (rc > 0)
1160                 rc = 0;
1161
1162         while (rc == 0) {
1163                 int off = 3;
1164
1165                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1166                 if (rc != 0)
1167                         break;
1168
1169                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1170                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1171                         goto next;
1172
1173                 /* name length must be strlen("MDTxxxx") */
1174                 if (ent->lde_namelen != 7)
1175                         goto remove;
1176
1177                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1178                         goto remove;
1179
1180                 while (off < 7 && isxdigit(ent->lde_name[off]))
1181                         off++;
1182
1183                 if (off != 7) {
1184
1185 remove:
1186                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1187                                                          ent->lde_name);
1188                         if (rc != 0)
1189                                 break;
1190                 }
1191
1192 next:
1193                 rc = iops->next(env, it);
1194         }
1195
1196         iops->put(env, it);
1197         iops->fini(env, it);
1198
1199         RETURN(rc > 0 ? 0 : rc);
1200 }
1201
1202 static int lfsck_update_lpf_entry(const struct lu_env *env,
1203                                   struct lfsck_instance *lfsck,
1204                                   struct dt_object *parent,
1205                                   struct dt_object *child,
1206                                   const char *name,
1207                                   enum lfsck_verify_lpf_types type)
1208 {
1209         int rc;
1210
1211         if (type == LVLT_BY_BOOKMARK) {
1212                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1213                                              lfsck_dto2fid(child), S_IFDIR);
1214         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1215                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1216                 rc = lfsck_bookmark_store(env, lfsck);
1217
1218                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1219                        " in the bookmark file: rc = %d\n",
1220                        lfsck_lfsck2name(lfsck),
1221                        PFID(lfsck_dto2fid(child)), rc);
1222         }
1223
1224         return rc;
1225 }
1226
1227 /**
1228  * Check whether the @child back references the @parent.
1229  *
1230  * Two cases:
1231  * 1) The child's FID is stored in the bookmark file. If the child back
1232  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1233  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1234  *    the child back references another parent2, then:
1235  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1236  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1237  *      references the child. So keep them there. As the LFSCK processing,
1238  *      the parent3 may be found, then when the LFSCK run next time, the
1239  *      inconsistency can be repaired.
1240  *
1241  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1242  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1243  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1244  *    back references another parent2, then:
1245  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1246  *      from .lustre/lost+found/;
1247  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1248  *      sub-directory name entry and update the child;
1249  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1250  *      or not, then keep them there.
1251  *
1252  * \param[in] env       pointer to the thread context
1253  * \param[in] lfsck     pointer to the lfsck instance
1254  * \param[in] child     pointer to the lost+found sub-directory object
1255  * \param[in] name      the name for lost+found sub-directory object
1256  * \param[out] fid      pointer to the buffer to hold the FID of the object
1257  *                      (called it as parent2) that is referenced via the
1258  *                      child's dotdot entry; it also can be the FID that
1259  *                      is referenced by the name entry under the parent2.
1260  * \param[in] type      to indicate where the child's FID is stored in
1261  *
1262  * \retval              positive number for uncertain inconsistency
1263  * \retval              0 for success
1264  * \retval              negative error number on failure
1265  */
1266 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1267                                   struct lfsck_instance *lfsck,
1268                                   struct dt_object *child, const char *name,
1269                                   struct lu_fid *fid,
1270                                   enum lfsck_verify_lpf_types type)
1271 {
1272         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1273         struct lfsck_thread_info *info    = lfsck_env_info(env);
1274         char                     *name2   = info->lti_key;
1275         struct lu_fid            *fid2    = &info->lti_fid3;
1276         struct dt_object         *parent2 = NULL;
1277         struct lustre_handle      lh      = { 0 };
1278         int                       rc;
1279         ENTRY;
1280
1281         fid_zero(fid);
1282         rc = dt_lookup_dir(env, child, dotdot, fid);
1283         if (rc != 0)
1284                 GOTO(linkea, rc);
1285
1286         if (!fid_is_sane(fid))
1287                 GOTO(linkea, rc = -EINVAL);
1288
1289         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1290                 const struct lu_name *cname;
1291
1292                 if (lfsck->li_lpf_obj == NULL) {
1293                         lu_object_get(&child->do_lu);
1294                         lfsck->li_lpf_obj = child;
1295                 }
1296
1297                 cname = lfsck_name_get_const(env, name, strlen(name));
1298                 rc = lfsck_verify_linkea(env, lfsck, child, cname, &LU_LPF_FID);
1299                 if (rc == 0)
1300                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1301                                                     name, type);
1302
1303                 GOTO(out_done, rc);
1304         }
1305
1306         parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1307         if (IS_ERR(parent2))
1308                 GOTO(linkea, parent2);
1309
1310         if (!dt_try_as_dir(env, parent2, true)) {
1311                 lfsck_object_put(env, parent2);
1312
1313                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1314         }
1315
1316 linkea:
1317         /* To prevent rename/unlink race */
1318         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1319                               MDS_INODELOCK_UPDATE, LCK_PR);
1320         if (rc != 0)
1321                 GOTO(out_put, rc);
1322
1323         dt_read_lock(env, child, 0);
1324         rc = lfsck_links_get_first(env, child, name2, fid2);
1325         if (rc != 0) {
1326                 dt_read_unlock(env, child);
1327                 lfsck_ibits_unlock(&lh, LCK_PR);
1328
1329                 GOTO(out_put, rc = 1);
1330         }
1331
1332         /* It is almost impossible that the bookmark file (or the name entry)
1333          * and the linkEA hit the same data corruption. Trust the linkEA. */
1334         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1335                 dt_read_unlock(env, child);
1336                 lfsck_ibits_unlock(&lh, LCK_PR);
1337
1338                 *fid = *fid2;
1339                 if (lfsck->li_lpf_obj == NULL) {
1340                         lu_object_get(&child->do_lu);
1341                         lfsck->li_lpf_obj = child;
1342                 }
1343
1344                 /* Update the child's dotdot entry */
1345                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1346                                              &LU_LPF_FID, S_IFDIR);
1347                 if (rc == 0)
1348                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1349                                                     name, type);
1350
1351                 GOTO(out_put, rc);
1352         }
1353
1354         if (parent2 == NULL || IS_ERR(parent2)) {
1355                 dt_read_unlock(env, child);
1356                 lfsck_ibits_unlock(&lh, LCK_PR);
1357
1358                 GOTO(out_done, rc = 1);
1359         }
1360
1361         rc = dt_lookup_dir(env, parent2, name2, fid);
1362         dt_read_unlock(env, child);
1363         lfsck_ibits_unlock(&lh, LCK_PR);
1364         if (rc != 0 && rc != -ENOENT)
1365                 GOTO(out_put, rc);
1366
1367         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1368                 if (type == LVLT_BY_BOOKMARK)
1369                         GOTO(out_put, rc = 1);
1370
1371                 /* Trust the name entry, update the child's dotdot entry. */
1372                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1373                                              &LU_LPF_FID, S_IFDIR);
1374
1375                 GOTO(out_put, rc);
1376         }
1377
1378         if (type == LVLT_BY_BOOKMARK) {
1379                 /* Invalid FID record in the bookmark file, reset it. */
1380                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1381                 rc = lfsck_bookmark_store(env, lfsck);
1382
1383                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1384                        " in the bookmark file: rc = %d\n",
1385                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1386         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1387                 /* The name entry is wrong, remove it. */
1388                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1389         }
1390
1391         GOTO(out_put, rc);
1392
1393 out_put:
1394         if (parent2 != NULL && !IS_ERR(parent2))
1395                 lfsck_object_put(env, parent2);
1396
1397 out_done:
1398         return rc;
1399 }
1400
1401 /**
1402  * Verify the /ROOT/.lustre/lost+found/ directory.
1403  *
1404  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1405  * the LFSCK does not exactly know how to handle, such as orphans. So before
1406  * the LFSCK scanning the system, the consistency of such directory needs to
1407  * be verified firstly to allow the users to use it during the LFSCK.
1408  *
1409  * \param[in] env       pointer to the thread context
1410  * \param[in] lfsck     pointer to the lfsck instance
1411  *
1412  * \retval              positive number for uncertain inconsistency
1413  * \retval              0 for success
1414  * \retval              negative error number on failure
1415  */
1416 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1417 {
1418         struct lfsck_thread_info *info   = lfsck_env_info(env);
1419         struct lu_fid            *pfid   = &info->lti_fid;
1420         struct lu_fid            *cfid   = &info->lti_fid2;
1421         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1422         struct dt_object         *parent;
1423         /* child1's FID is in the bookmark file. */
1424         struct dt_object         *child1 = NULL;
1425         /* child2's FID is in the name entry MDTxxxx. */
1426         struct dt_object         *child2 = NULL;
1427         const struct lu_name     *cname;
1428         char                      name[8];
1429         int                       node   = lfsck_dev_idx(lfsck);
1430         int                       rc     = 0;
1431         ENTRY;
1432
1433         LASSERT(lfsck->li_master);
1434
1435         if (lfsck_is_dryrun(lfsck))
1436                 RETURN(0);
1437
1438         if (lfsck->li_lpf_root_obj != NULL)
1439                 RETURN(0);
1440
1441         if (node == 0) {
1442                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1443                                                   &LU_LPF_FID);
1444         } else {
1445                 struct lfsck_tgt_desc *ltd;
1446
1447                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1448                 if (unlikely(ltd == NULL))
1449                         RETURN(-ENXIO);
1450
1451                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1452                                                   &LU_LPF_FID);
1453                 lfsck_tgt_put(ltd);
1454         }
1455
1456         if (IS_ERR(parent))
1457                 RETURN(PTR_ERR(parent));
1458
1459         LASSERT(dt_object_exists(parent));
1460
1461         if (unlikely(!dt_try_as_dir(env, parent, true))) {
1462                 lfsck_object_put(env, parent);
1463
1464                 GOTO(put, rc = -ENOTDIR);
1465         }
1466
1467         lfsck->li_lpf_root_obj = parent;
1468         if (node == 0) {
1469                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1470                 if (rc != 0)
1471                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1472                                "for bad sub-directories: rc = %d\n",
1473                                lfsck_lfsck2name(lfsck), rc);
1474         }
1475
1476         /* child2 */
1477         snprintf(name, 8, "MDT%04x", node);
1478         rc = dt_lookup_dir(env, parent, name, cfid);
1479         if (rc == -ENOENT) {
1480                 rc = 0;
1481                 goto find_child1;
1482         }
1483
1484         if (rc != 0)
1485                 GOTO(put, rc);
1486
1487         /* Invalid FID in the name entry, remove the name entry. */
1488         if (!fid_is_norm(cfid)) {
1489                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1490                 if (rc != 0)
1491                         GOTO(put, rc);
1492
1493                 goto find_child1;
1494         }
1495
1496         child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1497         if (IS_ERR(child2))
1498                 GOTO(put, rc = PTR_ERR(child2));
1499
1500         if (unlikely(!dt_object_exists(child2) ||
1501                      dt_object_remote(child2)) ||
1502                      !S_ISDIR(lfsck_object_type(child2))) {
1503                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1504                 if (rc != 0)
1505                         GOTO(put, rc);
1506
1507                 goto find_child1;
1508         }
1509
1510         if (unlikely(!dt_try_as_dir(env, child2, true)))
1511                 GOTO(put, rc = -ENOTDIR);
1512
1513 find_child1:
1514         if (fid_is_zero(&bk->lb_lpf_fid))
1515                 goto check_child2;
1516
1517         if (likely(lu_fid_eq(cfid, &bk->lb_lpf_fid))) {
1518                 if (lfsck->li_lpf_obj == NULL) {
1519                         lu_object_get(&child2->do_lu);
1520                         lfsck->li_lpf_obj = child2;
1521                 }
1522
1523                 cname = lfsck_name_get_const(env, name, strlen(name));
1524                 rc = lfsck_verify_linkea(env, lfsck, child2, cname,
1525                                          &LU_LPF_FID);
1526
1527                 GOTO(put, rc);
1528         }
1529
1530         if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1531                 struct lu_fid tfid = bk->lb_lpf_fid;
1532
1533                 /* Invalid FID record in the bookmark file, reset it. */
1534                 fid_zero(&bk->lb_lpf_fid);
1535                 rc = lfsck_bookmark_store(env, lfsck);
1536
1537                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1538                        " in the bookmark file: rc = %d\n",
1539                        lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1540
1541                 if (rc != 0)
1542                         GOTO(put, rc);
1543
1544                 goto check_child2;
1545         }
1546
1547         child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid);
1548         if (IS_ERR(child1)) {
1549                 child1 = NULL;
1550                 goto check_child2;
1551         }
1552
1553         if (unlikely(!dt_object_exists(child1) ||
1554                      dt_object_remote(child1)) ||
1555                      !S_ISDIR(lfsck_object_type(child1))) {
1556                 /* Invalid FID record in the bookmark file, reset it. */
1557                 fid_zero(&bk->lb_lpf_fid);
1558                 rc = lfsck_bookmark_store(env, lfsck);
1559
1560                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1561                        " in the bookmark file: rc = %d\n",
1562                        lfsck_lfsck2name(lfsck),
1563                        PFID(lfsck_dto2fid(child1)), rc);
1564
1565                 if (rc != 0)
1566                         GOTO(put, rc);
1567
1568                 lfsck_object_put(env, child1);
1569                 child1 = NULL;
1570                 goto check_child2;
1571         }
1572
1573         if (unlikely(!dt_try_as_dir(env, child1, true))) {
1574                 lfsck_object_put(env, child1);
1575                 child1 = NULL;
1576                 rc = -ENOTDIR;
1577                 goto check_child2;
1578         }
1579
1580         rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name, pfid,
1581                                     LVLT_BY_BOOKMARK);
1582         if (lu_fid_eq(pfid, &LU_LPF_FID))
1583                 GOTO(put, rc);
1584
1585 check_child2:
1586         if (child2 != NULL)
1587                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1588                                             pfid, LVLT_BY_NAMEENTRY);
1589
1590         GOTO(put, rc);
1591
1592 put:
1593         if (lfsck->li_lpf_obj != NULL) {
1594                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj, true))) {
1595                         lfsck_object_put(env, lfsck->li_lpf_obj);
1596                         lfsck->li_lpf_obj = NULL;
1597                         rc = -ENOTDIR;
1598                 }
1599         } else if (rc == 0) {
1600                 rc = lfsck_create_lpf(env, lfsck);
1601         }
1602
1603         if (child2 != NULL && !IS_ERR(child2))
1604                 lfsck_object_put(env, child2);
1605         if (child1 != NULL && !IS_ERR(child1))
1606                 lfsck_object_put(env, child1);
1607
1608         return rc;
1609 }
1610
1611 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1612 {
1613         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1614         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1615         char                    *prefix;
1616         int                      rc     = 0;
1617         ENTRY;
1618
1619         if (unlikely(ss == NULL))
1620                 RETURN(-ENXIO);
1621
1622         OBD_ALLOC_PTR(lfsck->li_seq);
1623         if (lfsck->li_seq == NULL)
1624                 RETURN(-ENOMEM);
1625
1626         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1627         if (prefix == NULL)
1628                 GOTO(out, rc = -ENOMEM);
1629
1630         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1631         seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1632                              ss->ss_server_seq);
1633         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1634
1635         if (fid_is_sane(&bk->lb_last_fid))
1636                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1637
1638         RETURN(0);
1639
1640 out:
1641         OBD_FREE_PTR(lfsck->li_seq);
1642         lfsck->li_seq = NULL;
1643
1644         return rc;
1645 }
1646
1647 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1648 {
1649         if (lfsck->li_seq != NULL) {
1650                 seq_client_fini(lfsck->li_seq);
1651                 OBD_FREE_PTR(lfsck->li_seq);
1652                 lfsck->li_seq = NULL;
1653         }
1654 }
1655
1656 void lfsck_instance_cleanup(const struct lu_env *env,
1657                             struct lfsck_instance *lfsck)
1658 {
1659         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1660         struct lfsck_component  *com;
1661         struct lfsck_component  *next;
1662         struct lfsck_lmv_unit   *llu;
1663         struct lfsck_lmv_unit   *llu_next;
1664         struct lfsck_lmv        *llmv;
1665         ENTRY;
1666
1667         LASSERT(list_empty(&lfsck->li_link));
1668         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1669
1670         if (lfsck->li_obj_oit != NULL) {
1671                 lfsck_object_put(env, lfsck->li_obj_oit);
1672                 lfsck->li_obj_oit = NULL;
1673         }
1674
1675         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1676                 llmv = &llu->llu_lmv;
1677
1678                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1679                          "still in using: %u\n",
1680                          atomic_read(&llmv->ll_ref));
1681
1682                 lfsck_lmv_put(env, llmv);
1683         }
1684
1685         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1686                 lfsck_component_cleanup(env, com);
1687         }
1688
1689         LASSERT(list_empty(&lfsck->li_list_dir));
1690
1691         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1692                                  lc_link) {
1693                 lfsck_component_cleanup(env, com);
1694         }
1695
1696         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1697                 lfsck_component_cleanup(env, com);
1698         }
1699
1700         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1701         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1702
1703         if (lfsck->li_lfsck_dir != NULL) {
1704                 lfsck_object_put(env, lfsck->li_lfsck_dir);
1705                 lfsck->li_lfsck_dir = NULL;
1706         }
1707
1708         if (lfsck->li_bookmark_obj != NULL) {
1709                 lfsck_object_put(env, lfsck->li_bookmark_obj);
1710                 lfsck->li_bookmark_obj = NULL;
1711         }
1712
1713         if (lfsck->li_lpf_obj != NULL) {
1714                 lfsck_object_put(env, lfsck->li_lpf_obj);
1715                 lfsck->li_lpf_obj = NULL;
1716         }
1717
1718         if (lfsck->li_lpf_root_obj != NULL) {
1719                 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1720                 lfsck->li_lpf_root_obj = NULL;
1721         }
1722
1723         if (lfsck->li_los != NULL) {
1724                 local_oid_storage_fini(env, lfsck->li_los);
1725                 lfsck->li_los = NULL;
1726         }
1727
1728         lfsck_fid_fini(lfsck);
1729
1730         OBD_FREE_PTR(lfsck);
1731 }
1732
1733 static inline struct lfsck_instance *
1734 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1735 {
1736         struct lfsck_instance *lfsck;
1737
1738         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1739                 if (lfsck->li_bottom == key) {
1740                         if (ref)
1741                                 lfsck_instance_get(lfsck);
1742                         if (unlink)
1743                                 list_del_init(&lfsck->li_link);
1744
1745                         return lfsck;
1746                 }
1747         }
1748
1749         return NULL;
1750 }
1751
1752 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1753                                            bool unlink)
1754 {
1755         struct lfsck_instance *lfsck;
1756
1757         spin_lock(&lfsck_instance_lock);
1758         lfsck = __lfsck_instance_find(key, ref, unlink);
1759         spin_unlock(&lfsck_instance_lock);
1760
1761         return lfsck;
1762 }
1763
1764 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1765 {
1766         struct lfsck_instance *tmp;
1767
1768         spin_lock(&lfsck_instance_lock);
1769         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1770                 if (lfsck->li_bottom == tmp->li_bottom) {
1771                         spin_unlock(&lfsck_instance_lock);
1772                         return -EEXIST;
1773                 }
1774         }
1775
1776         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1777         spin_unlock(&lfsck_instance_lock);
1778         return 0;
1779 }
1780
1781 void lfsck_bits_dump(struct seq_file *m, int bits, const char *const names[],
1782                      const char *prefix)
1783 {
1784         int flag;
1785         int i;
1786         bool newline = (bits != 0 ? false : true);
1787
1788         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1789
1790         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
1791                 if (flag & bits) {
1792                         bits &= ~flag;
1793                         if (names[i] != NULL) {
1794                                 if (bits == 0)
1795                                         newline = true;
1796
1797                                 seq_printf(m, "%s%c", names[i],
1798                                            newline ? '\n' : ',');
1799                         }
1800                 }
1801         }
1802
1803         if (!newline)
1804                 seq_putc(m, '\n');
1805 }
1806
1807 void lfsck_time_dump(struct seq_file *m, time64_t time, const char *name)
1808 {
1809         if (time == 0) {
1810                 seq_printf(m, "%s_time: N/A\n", name);
1811                 seq_printf(m, "time_since_%s: N/A\n", name);
1812         } else {
1813                 seq_printf(m, "%s_time: %lld\n", name, time);
1814                 seq_printf(m, "time_since_%s: %lld seconds\n",
1815                            name, ktime_get_real_seconds() - time);
1816         }
1817 }
1818
1819 void lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1820                     const char *prefix)
1821 {
1822         if (fid_is_zero(&pos->lp_dir_parent)) {
1823                 if (pos->lp_oit_cookie == 0) {
1824                         seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
1825                         return;
1826                 }
1827                 seq_printf(m, "%s: %llu, N/A, N/A\n",
1828                            prefix, pos->lp_oit_cookie);
1829         } else {
1830                 seq_printf(m, "%s: %llu, "DFID", %#llx\n",
1831                            prefix, pos->lp_oit_cookie,
1832                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1833         }
1834 }
1835
1836 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1837                     struct lfsck_position *pos, bool init)
1838 {
1839         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1840
1841         if (unlikely(lfsck->li_di_oit == NULL)) {
1842                 memset(pos, 0, sizeof(*pos));
1843                 return;
1844         }
1845
1846         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1847         if (!lfsck->li_current_oit_processed && !init)
1848                 pos->lp_oit_cookie--;
1849
1850         if (unlikely(pos->lp_oit_cookie == 0))
1851                 pos->lp_oit_cookie = 1;
1852
1853         spin_lock(&lfsck->li_lock);
1854         if (lfsck->li_di_dir != NULL) {
1855                 struct dt_object *dto = lfsck->li_obj_dir;
1856
1857                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1858                                                         lfsck->li_di_dir);
1859
1860                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1861                         fid_zero(&pos->lp_dir_parent);
1862                         pos->lp_dir_cookie = 0;
1863                 } else {
1864                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1865                 }
1866         } else {
1867                 fid_zero(&pos->lp_dir_parent);
1868                 pos->lp_dir_cookie = 0;
1869         }
1870         spin_unlock(&lfsck->li_lock);
1871 }
1872
1873 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1874 {
1875         bool dirty = false;
1876
1877         if (limit != LFSCK_SPEED_NO_LIMIT) {
1878                 if (limit > cfs_time_seconds(1)) {
1879                         lfsck->li_sleep_rate = limit / cfs_time_seconds(1);
1880                         lfsck->li_sleep_jif = 1;
1881                 } else {
1882                         lfsck->li_sleep_rate = 1;
1883                         lfsck->li_sleep_jif = cfs_time_seconds(1) / limit;
1884                 }
1885         } else {
1886                 lfsck->li_sleep_jif = 0;
1887                 lfsck->li_sleep_rate = 0;
1888         }
1889
1890         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1891                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1892                 dirty = true;
1893         }
1894
1895         return dirty;
1896 }
1897
1898 void lfsck_control_speed(struct lfsck_instance *lfsck)
1899 {
1900         struct ptlrpc_thread *thread = &lfsck->li_thread;
1901
1902         if (lfsck->li_sleep_jif > 0 &&
1903             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1904                 wait_event_idle_timeout(thread->t_ctl_waitq,
1905                                         !thread_is_running(thread),
1906                                         lfsck->li_sleep_jif);
1907                 lfsck->li_new_scanned = 0;
1908         }
1909 }
1910
1911 void lfsck_control_speed_by_self(struct lfsck_component *com)
1912 {
1913         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1914         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1915
1916         if (lfsck->li_sleep_jif > 0 &&
1917             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1918                 wait_event_idle_timeout(thread->t_ctl_waitq,
1919                                         !thread_is_running(thread),
1920                                         lfsck->li_sleep_jif);
1921                 com->lc_new_scanned = 0;
1922         }
1923 }
1924
1925 static struct lfsck_thread_args *
1926 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1927                        struct lfsck_component *com,
1928                        struct lfsck_start_param *lsp)
1929 {
1930         struct lfsck_thread_args *lta;
1931         int                       rc;
1932
1933         OBD_ALLOC_PTR(lta);
1934         if (lta == NULL)
1935                 return ERR_PTR(-ENOMEM);
1936
1937         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1938         if (rc != 0) {
1939                 OBD_FREE_PTR(lta);
1940                 return ERR_PTR(rc);
1941         }
1942
1943         lta->lta_lfsck = lfsck_instance_get(lfsck);
1944         if (com != NULL)
1945                 lta->lta_com = lfsck_component_get(com);
1946
1947         lta->lta_lsp = lsp;
1948
1949         return lta;
1950 }
1951
1952 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1953 {
1954         if (lta->lta_com != NULL)
1955                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1956         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1957         lu_env_fini(&lta->lta_env);
1958         OBD_FREE_PTR(lta);
1959 }
1960
1961 struct lfsck_assistant_data *
1962 lfsck_assistant_data_init(const struct lfsck_assistant_operations *lao,
1963                           const char *name)
1964 {
1965         struct lfsck_assistant_data *lad;
1966
1967         OBD_ALLOC_PTR(lad);
1968         if (lad != NULL) {
1969                 lad->lad_bitmap = bitmap_zalloc(BITS_PER_LONG, GFP_KERNEL);
1970                 if (lad->lad_bitmap == NULL) {
1971                         OBD_FREE_PTR(lad);
1972                         return NULL;
1973                 }
1974                 lad->lad_bitmap_count = BITS_PER_LONG;
1975
1976                 INIT_LIST_HEAD(&lad->lad_req_list);
1977                 spin_lock_init(&lad->lad_lock);
1978                 INIT_LIST_HEAD(&lad->lad_ost_list);
1979                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1980                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1981                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1982                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1983                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1984                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1985                 lad->lad_ops = lao;
1986                 lad->lad_name = name;
1987         }
1988
1989         return lad;
1990 }
1991
1992 struct lfsck_assistant_object *
1993 lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
1994                             const struct lu_attr *attr, __u64 cookie,
1995                             bool is_dir)
1996 {
1997         struct lfsck_assistant_object   *lso;
1998
1999         OBD_ALLOC_PTR(lso);
2000         if (lso == NULL)
2001                 return ERR_PTR(-ENOMEM);
2002
2003         lso->lso_fid = *fid;
2004         if (attr != NULL)
2005                 lso->lso_attr = *attr;
2006
2007         atomic_set(&lso->lso_ref, 1);
2008         lso->lso_oit_cookie = cookie;
2009         if (is_dir)
2010                 lso->lso_is_dir = 1;
2011
2012         return lso;
2013 }
2014
2015 struct dt_object *
2016 lfsck_assistant_object_load(const struct lu_env *env,
2017                             struct lfsck_instance *lfsck,
2018                             struct lfsck_assistant_object *lso)
2019 {
2020         struct dt_object *obj;
2021
2022         obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
2023         if (IS_ERR(obj))
2024                 return obj;
2025
2026         if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
2027                 lso->lso_dead = 1;
2028                 lfsck_object_put(env, obj);
2029
2030                 return ERR_PTR(-ENOENT);
2031         }
2032
2033         if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj, true))) {
2034                 lfsck_object_put(env, obj);
2035
2036                 return ERR_PTR(-ENOTDIR);
2037         }
2038
2039         return obj;
2040 }
2041
2042 /**
2043  * Generic LFSCK asynchronous communication interpretor function.
2044  * The LFSCK RPC reply for both the event notification and status
2045  * querying will be handled here.
2046  *
2047  * \param[in] env       pointer to the thread context
2048  * \param[in] req       pointer to the LFSCK request
2049  * \param[in] args      pointer to the lfsck_async_interpret_args
2050  * \param[in] rc        the result for handling the LFSCK request
2051  *
2052  * \retval              0 for success
2053  * \retval              negative error number on failure
2054  */
2055 int lfsck_async_interpret_common(const struct lu_env *env,
2056                                  struct ptlrpc_request *req,
2057                                  void *args, int rc)
2058 {
2059         struct lfsck_async_interpret_args *laia = args;
2060         struct lfsck_component            *com  = laia->laia_com;
2061         struct lfsck_assistant_data       *lad  = com->lc_data;
2062         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
2063         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
2064         struct lfsck_request              *lr   = laia->laia_lr;
2065
2066         LASSERT(com->lc_lfsck->li_master);
2067
2068         switch (lr->lr_event) {
2069         case LE_START:
2070                 if (unlikely(rc == -EINPROGRESS)) {
2071                         ltd->ltd_retry_start = 1;
2072                         break;
2073                 }
2074
2075                 if (rc != 0) {
2076                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
2077                                "start: rc = %d\n",
2078                                lfsck_lfsck2name(com->lc_lfsck),
2079                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2080                                ltd->ltd_index, lad->lad_name, rc);
2081
2082                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2083                                 struct lfsck_layout *lo = com->lc_file_ram;
2084
2085                                 if (lr->lr_flags & LEF_TO_OST)
2086                                         lfsck_lad_set_bitmap(env, com,
2087                                                              ltd->ltd_index);
2088                                 else
2089                                         lo->ll_flags |= LF_INCOMPLETE;
2090                         } else {
2091                                 struct lfsck_namespace *ns = com->lc_file_ram;
2092
2093                                 /* If some MDT does not join the namespace
2094                                  * LFSCK, then we cannot know whether there
2095                                  * is some name entry on such MDT that with
2096                                  * the referenced MDT-object on this MDT or
2097                                  * not. So the namespace LFSCK on this MDT
2098                                  * cannot handle orphan MDT-objects properly.
2099                                  * So we mark the LFSCK as LF_INCOMPLETE and
2100                                  * skip orphan MDT-objects handling. */
2101                                 ns->ln_flags |= LF_INCOMPLETE;
2102                         }
2103                         break;
2104                 }
2105
2106                 spin_lock(&ltds->ltd_lock);
2107                 if (ltd->ltd_dead) {
2108                         spin_unlock(&ltds->ltd_lock);
2109                         break;
2110                 }
2111
2112                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2113                         struct list_head *list;
2114                         struct list_head *phase_list;
2115
2116                         if (ltd->ltd_layout_done) {
2117                                 spin_unlock(&ltds->ltd_lock);
2118                                 break;
2119                         }
2120
2121                         if (lr->lr_flags & LEF_TO_OST) {
2122                                 list = &lad->lad_ost_list;
2123                                 phase_list = &lad->lad_ost_phase1_list;
2124                         } else {
2125                                 list = &lad->lad_mdt_list;
2126                                 phase_list = &lad->lad_mdt_phase1_list;
2127                         }
2128
2129                         if (list_empty(&ltd->ltd_layout_list))
2130                                 list_add_tail(&ltd->ltd_layout_list, list);
2131                         if (list_empty(&ltd->ltd_layout_phase_list))
2132                                 list_add_tail(&ltd->ltd_layout_phase_list,
2133                                               phase_list);
2134                 } else {
2135                         if (ltd->ltd_namespace_done) {
2136                                 spin_unlock(&ltds->ltd_lock);
2137                                 break;
2138                         }
2139
2140                         if (list_empty(&ltd->ltd_namespace_list))
2141                                 list_add_tail(&ltd->ltd_namespace_list,
2142                                               &lad->lad_mdt_list);
2143                         if (list_empty(&ltd->ltd_namespace_phase_list))
2144                                 list_add_tail(&ltd->ltd_namespace_phase_list,
2145                                               &lad->lad_mdt_phase1_list);
2146                 }
2147                 spin_unlock(&ltds->ltd_lock);
2148                 break;
2149         case LE_STOP:
2150         case LE_PHASE1_DONE:
2151         case LE_PHASE2_DONE:
2152         case LE_PEER_EXIT:
2153                 if (rc != 0 && rc != -EALREADY)
2154                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
2155                               "event = %d, rc = %d\n",
2156                               lfsck_lfsck2name(com->lc_lfsck),
2157                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2158                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
2159                 break;
2160         case LE_QUERY: {
2161                 struct lfsck_reply *reply;
2162                 struct list_head *list;
2163                 struct list_head *phase_list;
2164
2165                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2166                         list = &ltd->ltd_layout_list;
2167                         phase_list = &ltd->ltd_layout_phase_list;
2168                 } else {
2169                         list = &ltd->ltd_namespace_list;
2170                         phase_list = &ltd->ltd_namespace_phase_list;
2171                 }
2172
2173                 if (rc != 0) {
2174                         if (lr->lr_flags & LEF_QUERY_ALL) {
2175                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2176                                 break;
2177                         }
2178
2179                         spin_lock(&ltds->ltd_lock);
2180                         list_del_init(phase_list);
2181                         list_del_init(list);
2182                         spin_unlock(&ltds->ltd_lock);
2183                         break;
2184                 }
2185
2186                 reply = req_capsule_server_get(&req->rq_pill,
2187                                                &RMF_LFSCK_REPLY);
2188                 if (reply == NULL) {
2189                         rc = -EPROTO;
2190                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
2191                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2192                                lad->lad_name, rc);
2193
2194                         if (lr->lr_flags & LEF_QUERY_ALL) {
2195                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2196                                 break;
2197                         }
2198
2199                         spin_lock(&ltds->ltd_lock);
2200                         list_del_init(phase_list);
2201                         list_del_init(list);
2202                         spin_unlock(&ltds->ltd_lock);
2203                         break;
2204                 }
2205
2206                 if (lr->lr_flags & LEF_QUERY_ALL) {
2207                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2208                                 ltd->ltd_layout_status = reply->lr_status;
2209                                 ltd->ltd_layout_repaired = reply->lr_repaired;
2210                         } else {
2211                                 ltd->ltd_namespace_status = reply->lr_status;
2212                                 ltd->ltd_namespace_repaired =
2213                                                         reply->lr_repaired;
2214                         }
2215                         break;
2216                 }
2217
2218                 switch (reply->lr_status) {
2219                 case LS_SCANNING_PHASE1:
2220                         break;
2221                 case LS_SCANNING_PHASE2:
2222                         spin_lock(&ltds->ltd_lock);
2223                         list_del_init(phase_list);
2224                         if (ltd->ltd_dead) {
2225                                 spin_unlock(&ltds->ltd_lock);
2226                                 break;
2227                         }
2228
2229                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2230                                 if (ltd->ltd_layout_done) {
2231                                         spin_unlock(&ltds->ltd_lock);
2232                                         break;
2233                                 }
2234
2235                                 if (lr->lr_flags & LEF_TO_OST)
2236                                         list_add_tail(phase_list,
2237                                                 &lad->lad_ost_phase2_list);
2238                                 else
2239                                         list_add_tail(phase_list,
2240                                                 &lad->lad_mdt_phase2_list);
2241                         } else {
2242                                 if (ltd->ltd_namespace_done) {
2243                                         spin_unlock(&ltds->ltd_lock);
2244                                         break;
2245                                 }
2246
2247                                 list_add_tail(phase_list,
2248                                               &lad->lad_mdt_phase2_list);
2249                         }
2250                         spin_unlock(&ltds->ltd_lock);
2251                         break;
2252                 default:
2253                         spin_lock(&ltds->ltd_lock);
2254                         list_del_init(phase_list);
2255                         list_del_init(list);
2256                         spin_unlock(&ltds->ltd_lock);
2257                         break;
2258                 }
2259                 break;
2260         }
2261         default:
2262                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2263                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2264                 break;
2265         }
2266
2267         if (!laia->laia_shared) {
2268                 lfsck_tgt_put(ltd);
2269                 lfsck_component_put(env, com);
2270         }
2271
2272         return 0;
2273 }
2274
2275 static void lfsck_interpret(const struct lu_env *env,
2276                             struct lfsck_instance *lfsck,
2277                             struct ptlrpc_request *req, void *args, int result)
2278 {
2279         struct lfsck_async_interpret_args *laia = args;
2280         struct lfsck_component            *com;
2281
2282         LASSERT(laia->laia_com == NULL);
2283         LASSERT(laia->laia_shared);
2284
2285         spin_lock(&lfsck->li_lock);
2286         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2287                 laia->laia_com = com;
2288                 lfsck_async_interpret_common(env, req, laia, result);
2289         }
2290
2291         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2292                 laia->laia_com = com;
2293                 lfsck_async_interpret_common(env, req, laia, result);
2294         }
2295         spin_unlock(&lfsck->li_lock);
2296 }
2297
2298 static int lfsck_stop_notify(const struct lu_env *env,
2299                              struct lfsck_instance *lfsck,
2300                              struct lfsck_tgt_descs *ltds,
2301                              struct lfsck_tgt_desc *ltd, __u16 type)
2302 {
2303         struct lfsck_component *com;
2304         int                     rc = 0;
2305         ENTRY;
2306
2307         LASSERT(lfsck->li_master);
2308
2309         spin_lock(&lfsck->li_lock);
2310         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2311         if (com == NULL)
2312                 com = __lfsck_component_find(lfsck, type,
2313                                              &lfsck->li_list_double_scan);
2314         if (com != NULL)
2315                 lfsck_component_get(com);
2316         spin_unlock(&lfsck->li_lock);
2317
2318         if (com != NULL) {
2319                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2320                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2321                 struct lfsck_request              *lr    = &info->lti_lr;
2322                 struct lfsck_assistant_data       *lad   = com->lc_data;
2323                 struct list_head                  *list;
2324                 struct list_head                  *phase_list;
2325                 struct ptlrpc_request_set         *set;
2326
2327                 set = ptlrpc_prep_set();
2328                 if (set == NULL) {
2329                         lfsck_component_put(env, com);
2330
2331                         RETURN(-ENOMEM);
2332                 }
2333
2334                 if (type == LFSCK_TYPE_LAYOUT) {
2335                         list = &ltd->ltd_layout_list;
2336                         phase_list = &ltd->ltd_layout_phase_list;
2337                 } else {
2338                         list = &ltd->ltd_namespace_list;
2339                         phase_list = &ltd->ltd_namespace_phase_list;
2340                 }
2341
2342                 spin_lock(&ltds->ltd_lock);
2343                 if (list_empty(list)) {
2344                         LASSERT(list_empty(phase_list));
2345                         spin_unlock(&ltds->ltd_lock);
2346                         ptlrpc_set_destroy(set);
2347
2348                         RETURN(0);
2349                 }
2350
2351                 list_del_init(phase_list);
2352                 list_del_init(list);
2353                 spin_unlock(&ltds->ltd_lock);
2354
2355                 memset(lr, 0, sizeof(*lr));
2356                 lr->lr_index = lfsck_dev_idx(lfsck);
2357                 lr->lr_event = LE_PEER_EXIT;
2358                 lr->lr_active = type;
2359                 lr->lr_status = LS_CO_PAUSED;
2360                 if (ltds == &lfsck->li_ost_descs)
2361                         lr->lr_flags = LEF_TO_OST;
2362
2363                 memset(laia, 0, sizeof(*laia));
2364                 laia->laia_com = com;
2365                 laia->laia_ltds = ltds;
2366                 atomic_inc(&ltd->ltd_ref);
2367                 laia->laia_ltd = ltd;
2368                 laia->laia_lr = lr;
2369
2370                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2371                                          lfsck_async_interpret_common,
2372                                          laia, LFSCK_NOTIFY);
2373                 if (rc != 0) {
2374                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2375                                "co-stop for %s: rc = %d\n",
2376                                lfsck_lfsck2name(lfsck),
2377                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2378                                ltd->ltd_index, lad->lad_name, rc);
2379                         lfsck_tgt_put(ltd);
2380                 } else {
2381                         rc = ptlrpc_set_wait(env, set);
2382                 }
2383
2384                 ptlrpc_set_destroy(set);
2385                 lfsck_component_put(env, com);
2386         }
2387
2388         RETURN(rc);
2389 }
2390
2391 static int lfsck_async_interpret(const struct lu_env *env,
2392                                  struct ptlrpc_request *req,
2393                                  void *args, int rc)
2394 {
2395         struct lfsck_async_interpret_args *laia = args;
2396         struct lfsck_instance             *lfsck;
2397
2398         lfsck = container_of(laia->laia_ltds, struct lfsck_instance,
2399                              li_mdt_descs);
2400         lfsck_interpret(env, lfsck, req, laia, rc);
2401         lfsck_tgt_put(laia->laia_ltd);
2402         if (rc != 0 && laia->laia_result != -EALREADY)
2403                 laia->laia_result = rc;
2404
2405         return 0;
2406 }
2407
2408 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2409                         struct lfsck_request *lr,
2410                         struct ptlrpc_request_set *set,
2411                         ptlrpc_interpterer_t interpreter,
2412                         void *args, int request)
2413 {
2414         struct lfsck_async_interpret_args *laia;
2415         struct ptlrpc_request             *req;
2416         struct lfsck_request              *tmp;
2417         struct req_format                 *format;
2418         int                                rc;
2419
2420         switch (request) {
2421         case LFSCK_NOTIFY:
2422                 format = &RQF_LFSCK_NOTIFY;
2423                 break;
2424         case LFSCK_QUERY:
2425                 format = &RQF_LFSCK_QUERY;
2426                 break;
2427         default:
2428                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2429                        exp->exp_obd->obd_name, request, -EINVAL);
2430                 return -EINVAL;
2431         }
2432
2433         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2434         if (req == NULL)
2435                 return -ENOMEM;
2436
2437         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2438         if (rc != 0) {
2439                 ptlrpc_request_free(req);
2440
2441                 return rc;
2442         }
2443
2444         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2445         *tmp = *lr;
2446         ptlrpc_request_set_replen(req);
2447
2448         laia = ptlrpc_req_async_args(laia, req);
2449         *laia = *(struct lfsck_async_interpret_args *)args;
2450         if (laia->laia_com != NULL)
2451                 lfsck_component_get(laia->laia_com);
2452         req->rq_interpret_reply = interpreter;
2453         req->rq_allow_intr = 1;
2454         req->rq_no_delay = 1;
2455         ptlrpc_set_add_req(set, req);
2456
2457         return 0;
2458 }
2459
2460 int lfsck_query_all(const struct lu_env *env, struct lfsck_component *com)
2461 {
2462         struct lfsck_thread_info *info = lfsck_env_info(env);
2463         struct lfsck_request *lr = &info->lti_lr;
2464         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2465         struct lfsck_instance *lfsck = com->lc_lfsck;
2466         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2467         struct lfsck_tgt_desc *ltd;
2468         struct ptlrpc_request_set *set;
2469         int idx;
2470         int rc;
2471
2472         ENTRY;
2473         memset(lr, 0, sizeof(*lr));
2474         lr->lr_event = LE_QUERY;
2475         lr->lr_active = com->lc_type;
2476         lr->lr_flags = LEF_QUERY_ALL;
2477
2478         memset(laia, 0, sizeof(*laia));
2479         laia->laia_com = com;
2480         laia->laia_lr = lr;
2481
2482         set = ptlrpc_prep_set();
2483         if (set == NULL)
2484                 RETURN(-ENOMEM);
2485
2486 again:
2487         laia->laia_ltds = ltds;
2488         down_read(&ltds->ltd_rw_sem);
2489         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
2490                 ltd = lfsck_tgt_get(ltds, idx);
2491                 LASSERT(ltd != NULL);
2492
2493                 laia->laia_ltd = ltd;
2494                 up_read(&ltds->ltd_rw_sem);
2495                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2496                                          lfsck_async_interpret_common,
2497                                          laia, LFSCK_QUERY);
2498                 if (rc != 0) {
2499                         struct lfsck_assistant_data *lad = com->lc_data;
2500
2501                         CDEBUG(D_LFSCK, "%s: Fail to query %s %x for stat %s: "
2502                                "rc = %d\n", lfsck_lfsck2name(lfsck),
2503                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2504                                ltd->ltd_index, lad->lad_name, rc);
2505                         lfsck_reset_ltd_status(ltd, com->lc_type);
2506                         lfsck_tgt_put(ltd);
2507                 }
2508                 down_read(&ltds->ltd_rw_sem);
2509         }
2510         up_read(&ltds->ltd_rw_sem);
2511
2512         if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST)) {
2513                 ltds = &lfsck->li_ost_descs;
2514                 lr->lr_flags |= LEF_TO_OST;
2515                 goto again;
2516         }
2517
2518         rc = ptlrpc_set_wait(env, set);
2519         ptlrpc_set_destroy(set);
2520
2521         RETURN(rc);
2522 }
2523
2524 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2525                           struct lfsck_start_param *lsp)
2526 {
2527         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2528         struct lfsck_assistant_data     *lad     = com->lc_data;
2529         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2530         struct ptlrpc_thread            *athread = &lad->lad_thread;
2531         struct lfsck_thread_args        *lta;
2532         struct task_struct              *task;
2533         int                              rc;
2534         ENTRY;
2535
2536         lad->lad_assistant_status = 0;
2537         lad->lad_post_result = 0;
2538         lad->lad_flags = 0;
2539         lad->lad_advance_lock = false;
2540         thread_set_flags(athread, 0);
2541
2542         lta = lfsck_thread_args_init(lfsck, com, lsp);
2543         if (IS_ERR(lta))
2544                 RETURN(PTR_ERR(lta));
2545
2546         task = kthread_run(lfsck_assistant_engine, lta, "%s", lad->lad_name);
2547         if (IS_ERR(task)) {
2548                 rc = PTR_ERR(task);
2549                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2550                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2551                 lfsck_thread_args_fini(lta);
2552         } else {
2553                 wait_event_idle(mthread->t_ctl_waitq,
2554                                 thread_is_running(athread) ||
2555                                 thread_is_stopped(athread) ||
2556                                 !thread_is_starting(mthread));
2557                 if (unlikely(!thread_is_starting(mthread)))
2558                         /* stopped by race */
2559                         rc = -ESRCH;
2560                 else if (unlikely(!thread_is_running(athread)))
2561                         rc = lad->lad_assistant_status;
2562                 else
2563                         rc = 0;
2564         }
2565
2566         RETURN(rc);
2567 }
2568
2569 int lfsck_checkpoint_generic(const struct lu_env *env,
2570                              struct lfsck_component *com)
2571 {
2572         struct lfsck_assistant_data     *lad     = com->lc_data;
2573         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2574         struct ptlrpc_thread            *athread = &lad->lad_thread;
2575
2576         wait_event_idle(mthread->t_ctl_waitq,
2577                         list_empty(&lad->lad_req_list) ||
2578                         !thread_is_running(mthread) ||
2579                         thread_is_stopped(athread));
2580
2581         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2582                 return LFSCK_CHECKPOINT_SKIP;
2583
2584         return 0;
2585 }
2586
2587 void lfsck_post_generic(const struct lu_env *env,
2588                         struct lfsck_component *com, int *result)
2589 {
2590         struct lfsck_assistant_data     *lad     = com->lc_data;
2591         struct ptlrpc_thread            *athread = &lad->lad_thread;
2592         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2593
2594         lad->lad_post_result = *result;
2595         if (*result <= 0)
2596                 set_bit(LAD_EXIT, &lad->lad_flags);
2597         set_bit(LAD_TO_POST, &lad->lad_flags);
2598
2599         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s post, rc = %d\n",
2600                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2601
2602         wake_up(&athread->t_ctl_waitq);
2603         wait_event_idle(mthread->t_ctl_waitq,
2604                         (*result > 0 && list_empty(&lad->lad_req_list)) ||
2605                         thread_is_stopped(athread));
2606
2607         if (lad->lad_assistant_status < 0)
2608                 *result = lad->lad_assistant_status;
2609
2610         CDEBUG(D_LFSCK, "%s: the assistant has done %s post, rc = %d\n",
2611                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2612 }
2613
2614 int lfsck_double_scan_generic(const struct lu_env *env,
2615                               struct lfsck_component *com, int status)
2616 {
2617         struct lfsck_assistant_data     *lad     = com->lc_data;
2618         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2619         struct ptlrpc_thread            *athread = &lad->lad_thread;
2620
2621         if (status != LS_SCANNING_PHASE2)
2622                 set_bit(LAD_EXIT, &lad->lad_flags);
2623         else
2624                 set_bit(LAD_TO_DOUBLE_SCAN, &lad->lad_flags);
2625
2626         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s double_scan, "
2627                "status %d\n",
2628                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, status);
2629
2630         wake_up(&athread->t_ctl_waitq);
2631         wait_event_idle(mthread->t_ctl_waitq,
2632                         test_bit(LAD_IN_DOUBLE_SCAN, &lad->lad_flags) ||
2633                         thread_is_stopped(athread));
2634
2635         CDEBUG(D_LFSCK, "%s: the assistant has done %s double_scan, "
2636                "status %d\n", lfsck_lfsck2name(com->lc_lfsck), lad->lad_name,
2637                lad->lad_assistant_status);
2638
2639         if (lad->lad_assistant_status < 0)
2640                 return lad->lad_assistant_status;
2641
2642         return 0;
2643 }
2644
2645 void lfsck_quit_generic(const struct lu_env *env,
2646                         struct lfsck_component *com)
2647 {
2648         struct lfsck_assistant_data     *lad     = com->lc_data;
2649         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2650         struct ptlrpc_thread            *athread = &lad->lad_thread;
2651
2652         set_bit(LAD_EXIT, &lad->lad_flags);
2653         wake_up(&athread->t_ctl_waitq);
2654         wait_event_idle(mthread->t_ctl_waitq,
2655                         thread_is_init(athread) ||
2656                         thread_is_stopped(athread));
2657 }
2658
2659 int lfsck_load_one_trace_file(const struct lu_env *env,
2660                               struct lfsck_component *com,
2661                               struct dt_object *parent,
2662                               struct dt_object **child,
2663                               const struct dt_index_features *ft,
2664                               const char *name, bool reset)
2665 {
2666         struct lfsck_instance *lfsck = com->lc_lfsck;
2667         struct dt_object *obj;
2668         int rc;
2669         ENTRY;
2670
2671         if (*child != NULL) {
2672                 struct dt_it *it;
2673                 const struct dt_it_ops *iops;
2674                 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid3;
2675
2676                 if (!reset)
2677                         RETURN(0);
2678
2679                 obj = *child;
2680                 rc = obj->do_ops->do_index_try(env, obj, ft);
2681                 if (rc)
2682                         /* unlink by force */
2683                         goto unlink;
2684
2685                 iops = &obj->do_index_ops->dio_it;
2686                 it = iops->init(env, obj, 0);
2687                 if (IS_ERR(it))
2688                         /* unlink by force */
2689                         goto unlink;
2690
2691                 fid_zero(fid);
2692                 rc = iops->get(env, it, (const struct dt_key *)fid);
2693                 if (rc >= 0) {
2694                         rc = iops->next(env, it);
2695                         iops->put(env, it);
2696                 }
2697                 iops->fini(env, it);
2698                 if (rc > 0)
2699                         /* "rc > 0" means the index file is empty. */
2700                         RETURN(0);
2701
2702 unlink:
2703                 /* The old index is not empty, remove it firstly. */
2704                 rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
2705                 CDEBUG_LIMIT(rc ? D_ERROR : D_LFSCK,
2706                              "%s: unlink lfsck sub trace file %s: rc = %d\n",
2707                              lfsck_lfsck2name(com->lc_lfsck), name, rc);
2708                 if (rc)
2709                         RETURN(rc);
2710
2711                 if (*child) {
2712                         lfsck_object_put(env, *child);
2713                         *child = NULL;
2714                 }
2715         } else if (reset) {
2716                 goto unlink;
2717         }
2718
2719         obj = local_index_find_or_create(env, lfsck->li_los, parent, name,
2720                                          S_IFREG | S_IRUGO | S_IWUSR, ft);
2721         if (IS_ERR(obj))
2722                 RETURN(PTR_ERR(obj));
2723
2724         rc = obj->do_ops->do_index_try(env, obj, ft);
2725         if (rc) {
2726                 lfsck_object_put(env, obj);
2727                 CDEBUG(D_LFSCK, "%s: LFSCK fail to load "
2728                        "sub trace file %s: rc = %d\n",
2729                        lfsck_lfsck2name(com->lc_lfsck), name, rc);
2730         } else {
2731                 *child = obj;
2732         }
2733
2734         RETURN(rc);
2735 }
2736
2737 int lfsck_load_sub_trace_files(const struct lu_env *env,
2738                                struct lfsck_component *com,
2739                                const struct dt_index_features *ft,
2740                                const char *prefix, bool reset)
2741 {
2742         char *name = lfsck_env_info(env)->lti_key;
2743         struct lfsck_sub_trace_obj *lsto;
2744         int rc;
2745         int i;
2746
2747         for (i = 0, rc = 0, lsto = &com->lc_sub_trace_objs[0];
2748              i < LFSCK_STF_COUNT && rc == 0; i++, lsto++) {
2749                 snprintf(name, NAME_MAX, "%s_%02d", prefix, i);
2750                 rc = lfsck_load_one_trace_file(env, com,
2751                                 com->lc_lfsck->li_lfsck_dir,
2752                                 &lsto->lsto_obj, ft, name, reset);
2753         }
2754
2755         return rc;
2756 }
2757
2758 /* external interfaces */
2759 int lfsck_get_speed(char *buf, struct dt_device *key)
2760 {
2761         struct lu_env           env;
2762         struct lfsck_instance  *lfsck;
2763         int                     rc;
2764         ENTRY;
2765
2766         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2767         if (rc != 0)
2768                 RETURN(rc);
2769
2770         lfsck = lfsck_instance_find(key, true, false);
2771         if (lfsck && buf) {
2772                 rc = sprintf(buf, "%u\n",
2773                              lfsck->li_bookmark_ram.lb_speed_limit);
2774                 lfsck_instance_put(&env, lfsck);
2775         } else {
2776                 rc = -ENXIO;
2777         }
2778
2779         lu_env_fini(&env);
2780
2781         RETURN(rc);
2782 }
2783 EXPORT_SYMBOL(lfsck_get_speed);
2784
2785 int lfsck_set_speed(struct dt_device *key, __u32 val)
2786 {
2787         struct lu_env           env;
2788         struct lfsck_instance  *lfsck;
2789         int                     rc;
2790         ENTRY;
2791
2792         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2793         if (rc != 0)
2794                 RETURN(rc);
2795
2796         lfsck = lfsck_instance_find(key, true, false);
2797         if (likely(lfsck != NULL)) {
2798                 mutex_lock(&lfsck->li_mutex);
2799                 if (__lfsck_set_speed(lfsck, val))
2800                         rc = lfsck_bookmark_store(&env, lfsck);
2801                 mutex_unlock(&lfsck->li_mutex);
2802                 lfsck_instance_put(&env, lfsck);
2803         } else {
2804                 rc = -ENXIO;
2805         }
2806
2807         lu_env_fini(&env);
2808
2809         RETURN(rc);
2810 }
2811 EXPORT_SYMBOL(lfsck_set_speed);
2812
2813 int lfsck_get_windows(char *buf, struct dt_device *key)
2814 {
2815         struct lu_env           env;
2816         struct lfsck_instance  *lfsck;
2817         int                     rc;
2818         ENTRY;
2819
2820         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2821         if (rc != 0)
2822                 RETURN(rc);
2823
2824         lfsck = lfsck_instance_find(key, true, false);
2825         if (likely(lfsck != NULL)) {
2826                 rc = sprintf(buf, "%u\n",
2827                              lfsck->li_bookmark_ram.lb_async_windows);
2828                 lfsck_instance_put(&env, lfsck);
2829         } else {
2830                 rc = -ENXIO;
2831         }
2832
2833         lu_env_fini(&env);
2834
2835         RETURN(rc);
2836 }
2837 EXPORT_SYMBOL(lfsck_get_windows);
2838
2839 int lfsck_set_windows(struct dt_device *key, unsigned int val)
2840 {
2841         struct lu_env           env;
2842         struct lfsck_instance  *lfsck;
2843         int                     rc;
2844         ENTRY;
2845
2846         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2847         if (rc != 0)
2848                 RETURN(rc);
2849
2850         lfsck = lfsck_instance_find(key, true, false);
2851         if (likely(lfsck != NULL)) {
2852                 if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) {
2853                         CWARN("%s: invalid async windows size that may "
2854                               "cause memory issues. The valid range is "
2855                               "[1 - %u].\n",
2856                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2857                         rc = -EINVAL;
2858                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2859                         mutex_lock(&lfsck->li_mutex);
2860                         lfsck->li_bookmark_ram.lb_async_windows = val;
2861                         rc = lfsck_bookmark_store(&env, lfsck);
2862                         mutex_unlock(&lfsck->li_mutex);
2863                 }
2864                 lfsck_instance_put(&env, lfsck);
2865         } else {
2866                 rc = -ENXIO;
2867         }
2868
2869         lu_env_fini(&env);
2870
2871         RETURN(rc);
2872 }
2873 EXPORT_SYMBOL(lfsck_set_windows);
2874
2875 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2876 {
2877         struct lu_env           env;
2878         struct lfsck_instance  *lfsck;
2879         struct lfsck_component *com;
2880         int                     rc;
2881         ENTRY;
2882
2883         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2884         if (rc != 0)
2885                 RETURN(rc);
2886
2887         lfsck = lfsck_instance_find(key, true, false);
2888         if (likely(lfsck != NULL)) {
2889                 com = lfsck_component_find(lfsck, type);
2890                 if (likely(com != NULL)) {
2891                         com->lc_ops->lfsck_dump(&env, com, m);
2892                         lfsck_component_put(&env, com);
2893                 } else {
2894                         rc = -ENOTSUPP;
2895                 }
2896
2897                 lfsck_instance_put(&env, lfsck);
2898         } else {
2899                 rc = -ENXIO;
2900         }
2901
2902         lu_env_fini(&env);
2903
2904         RETURN(rc);
2905 }
2906 EXPORT_SYMBOL(lfsck_dump);
2907
2908 static int lfsck_stop_all(const struct lu_env *env,
2909                           struct lfsck_instance *lfsck,
2910                           struct lfsck_stop *stop)
2911 {
2912         struct lfsck_thread_info *info = lfsck_env_info(env);
2913         struct lfsck_request *lr = &info->lti_lr;
2914         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2915         struct ptlrpc_request_set *set;
2916         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2917         struct lfsck_tgt_desc *ltd;
2918         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2919         int idx;
2920         int rc = 0;
2921         int rc1 = 0;
2922
2923         ENTRY;
2924         LASSERT(stop->ls_flags & LPF_BROADCAST);
2925
2926         set = ptlrpc_prep_set();
2927         if (unlikely(set == NULL))
2928                 RETURN(-ENOMEM);
2929
2930         memset(lr, 0, sizeof(*lr));
2931         lr->lr_event = LE_STOP;
2932         lr->lr_index = lfsck_dev_idx(lfsck);
2933         lr->lr_status = stop->ls_status;
2934         lr->lr_version = bk->lb_version;
2935         lr->lr_active = LFSCK_TYPES_ALL;
2936         lr->lr_param = stop->ls_flags;
2937
2938         memset(laia, 0, sizeof(*laia));
2939         laia->laia_ltds = ltds;
2940         laia->laia_lr = lr;
2941         laia->laia_shared = 1;
2942
2943         down_read(&ltds->ltd_rw_sem);
2944         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
2945                 ltd = lfsck_tgt_get(ltds, idx);
2946                 LASSERT(ltd != NULL);
2947
2948                 laia->laia_ltd = ltd;
2949                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2950                                          lfsck_async_interpret, laia,
2951                                          LFSCK_NOTIFY);
2952                 if (rc != 0) {
2953                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2954                         lfsck_tgt_put(ltd);
2955                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2956                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2957                         rc1 = rc;
2958                 }
2959         }
2960         up_read(&ltds->ltd_rw_sem);
2961
2962         rc = ptlrpc_set_wait(env, set);
2963         ptlrpc_set_destroy(set);
2964
2965         if (rc == 0)
2966                 rc = laia->laia_result;
2967
2968         if (rc == -EALREADY)
2969                 rc = 0;
2970
2971         if (rc != 0)
2972                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2973                        lfsck_lfsck2name(lfsck), rc);
2974
2975         RETURN(rc != 0 ? rc : rc1);
2976 }
2977
2978 static int lfsck_start_all(const struct lu_env *env,
2979                            struct lfsck_instance *lfsck,
2980                            struct lfsck_start *start)
2981 {
2982         struct lfsck_thread_info *info = lfsck_env_info(env);
2983         struct lfsck_request *lr = &info->lti_lr;
2984         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2985         struct ptlrpc_request_set *set;
2986         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2987         struct lfsck_tgt_desc *ltd;
2988         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2989         int idx;
2990         int rc = 0;
2991         bool retry = false;
2992         ENTRY;
2993
2994         LASSERT(start->ls_flags & LPF_BROADCAST);
2995
2996         memset(lr, 0, sizeof(*lr));
2997         lr->lr_event = LE_START;
2998         lr->lr_index = lfsck_dev_idx(lfsck);
2999         lr->lr_speed = bk->lb_speed_limit;
3000         lr->lr_version = bk->lb_version;
3001         lr->lr_active = start->ls_active;
3002         lr->lr_param = start->ls_flags;
3003         lr->lr_async_windows = bk->lb_async_windows;
3004         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
3005                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
3006                        LSV_CREATE_MDTOBJ;
3007
3008         memset(laia, 0, sizeof(*laia));
3009         laia->laia_ltds = ltds;
3010         laia->laia_lr = lr;
3011         laia->laia_shared = 1;
3012
3013 again:
3014         set = ptlrpc_prep_set();
3015         if (unlikely(!set))
3016                 RETURN(-ENOMEM);
3017
3018         down_read(&ltds->ltd_rw_sem);
3019         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3020                 ltd = lfsck_tgt_get(ltds, idx);
3021                 LASSERT(ltd != NULL);
3022
3023                 if (retry && !ltd->ltd_retry_start) {
3024                         lfsck_tgt_put(ltd);
3025                         continue;
3026                 }
3027
3028                 laia->laia_ltd = ltd;
3029                 ltd->ltd_retry_start = 0;
3030                 ltd->ltd_layout_done = 0;
3031                 ltd->ltd_namespace_done = 0;
3032                 ltd->ltd_synced_failures = 0;
3033                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
3034                                          lfsck_async_interpret, laia,
3035                                          LFSCK_NOTIFY);
3036                 if (rc != 0) {
3037                         lfsck_interpret(env, lfsck, NULL, laia, rc);
3038                         lfsck_tgt_put(ltd);
3039                         CERROR("%s: cannot notify MDT %x for LFSCK "
3040                                "start, failout: rc = %d\n",
3041                                lfsck_lfsck2name(lfsck), idx, rc);
3042                         break;
3043                 }
3044         }
3045         up_read(&ltds->ltd_rw_sem);
3046
3047         if (rc != 0) {
3048                 ptlrpc_set_destroy(set);
3049
3050                 RETURN(rc);
3051         }
3052
3053         rc = ptlrpc_set_wait(env, set);
3054         ptlrpc_set_destroy(set);
3055
3056         if (rc == 0)
3057                 rc = laia->laia_result;
3058
3059         if (unlikely(rc == -EINPROGRESS)) {
3060                 retry = true;
3061                 schedule_timeout_interruptible(cfs_time_seconds(1));
3062                 set_current_state(TASK_RUNNING);
3063                 if (!signal_pending(current) &&
3064                     thread_is_running(&lfsck->li_thread))
3065                         goto again;
3066
3067                 rc = -EINTR;
3068         }
3069
3070         if (rc != 0) {
3071                 struct lfsck_stop *stop = &info->lti_stop;
3072
3073                 CERROR("%s: cannot start LFSCK on some MDTs, "
3074                        "stop all: rc = %d\n",
3075                        lfsck_lfsck2name(lfsck), rc);
3076                 if (rc != -EALREADY) {
3077                         stop->ls_status = LS_FAILED;
3078                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
3079                         lfsck_stop_all(env, lfsck, stop);
3080                 }
3081         }
3082
3083         RETURN(rc);
3084 }
3085
3086 int lfsck_start(const struct lu_env *env, struct dt_device *key,
3087                 struct lfsck_start_param *lsp)
3088 {
3089         struct lfsck_start *start = lsp->lsp_start;
3090         struct lfsck_instance *lfsck;
3091         struct lfsck_bookmark *bk;
3092         struct ptlrpc_thread *thread;
3093         struct lfsck_component *com;
3094         struct lfsck_thread_args *lta;
3095         struct task_struct *task;
3096         struct lfsck_tgt_descs *ltds;
3097         struct lfsck_tgt_desc *ltd;
3098         int idx;
3099         int rc = 0;
3100         __u16 valid  = 0;
3101         __u16 flags  = 0;
3102         __u16 type   = 1;
3103
3104         ENTRY;
3105         if (key->dd_rdonly)
3106                 RETURN(-EROFS);
3107
3108         lfsck = lfsck_instance_find(key, true, false);
3109         if (unlikely(lfsck == NULL))
3110                 RETURN(-ENXIO);
3111
3112         if (unlikely(lfsck->li_stopping))
3113                 GOTO(put, rc = -ENXIO);
3114
3115         /* System is not ready, try again later. */
3116         if (unlikely(lfsck->li_namespace == NULL ||
3117                      lfsck_dev_site(lfsck)->ss_server_fld == NULL))
3118                 GOTO(put, rc = -EINPROGRESS);
3119
3120         /* start == NULL means auto trigger paused LFSCK. */
3121         if (!start) {
3122                 if (list_empty(&lfsck->li_list_scan) ||
3123                     CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO))
3124                         GOTO(put, rc = 0);
3125         } else if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3126                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
3127                        lfsck_lfsck2name(lfsck));
3128
3129                 GOTO(put, rc = -EPERM);
3130         }
3131
3132         bk = &lfsck->li_bookmark_ram;
3133         thread = &lfsck->li_thread;
3134         mutex_lock(&lfsck->li_mutex);
3135         spin_lock(&lfsck->li_lock);
3136         if (unlikely(thread_is_stopping(thread))) {
3137                 /* Someone is stopping the LFSCK. */
3138                 spin_unlock(&lfsck->li_lock);
3139                 GOTO(out, rc = -EBUSY);
3140         }
3141
3142         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
3143                 rc = -EALREADY;
3144                 if (unlikely(start == NULL)) {
3145                         spin_unlock(&lfsck->li_lock);
3146                         GOTO(out, rc);
3147                 }
3148
3149                 while (start->ls_active != 0) {
3150                         if (!(type & start->ls_active)) {
3151                                 type <<= 1;
3152                                 continue;
3153                         }
3154
3155                         com = __lfsck_component_find(lfsck, type,
3156                                                      &lfsck->li_list_scan);
3157                         if (com == NULL)
3158                                 com = __lfsck_component_find(lfsck, type,
3159                                                 &lfsck->li_list_double_scan);
3160                         if (com == NULL) {
3161                                 rc = -EOPNOTSUPP;
3162                                 break;
3163                         }
3164
3165                         if (com->lc_ops->lfsck_join != NULL) {
3166                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
3167                                 if (rc != 0 && rc != -EALREADY)
3168                                         break;
3169                         }
3170                         start->ls_active &= ~type;
3171                         type <<= 1;
3172                 }
3173                 spin_unlock(&lfsck->li_lock);
3174                 GOTO(out, rc);
3175         }
3176         spin_unlock(&lfsck->li_lock);
3177
3178         lfsck->li_status = 0;
3179         lfsck->li_oit_over = 0;
3180         lfsck->li_start_unplug = 0;
3181         lfsck->li_drop_dryrun = 0;
3182         lfsck->li_new_scanned = 0;
3183
3184         /* For auto trigger. */
3185         if (start == NULL)
3186                 goto trigger;
3187
3188         start->ls_version = bk->lb_version;
3189
3190         if (start->ls_active != 0) {
3191                 struct lfsck_component *next;
3192
3193                 if (start->ls_active == LFSCK_TYPES_ALL)
3194                         start->ls_active = LFSCK_TYPES_SUPPORTED;
3195
3196                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
3197                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
3198                         GOTO(out, rc = -ENOTSUPP);
3199                 }
3200
3201                 list_for_each_entry_safe(com, next,
3202                                          &lfsck->li_list_scan, lc_link) {
3203                         if (!(com->lc_type & start->ls_active)) {
3204                                 rc = com->lc_ops->lfsck_post(env, com, 0,
3205                                                              false);
3206                                 if (rc != 0)
3207                                         GOTO(out, rc);
3208                         }
3209                 }
3210
3211                 while (start->ls_active != 0) {
3212                         if (type & start->ls_active) {
3213                                 com = __lfsck_component_find(lfsck, type,
3214                                                         &lfsck->li_list_idle);
3215                                 if (com != NULL)
3216                                         /* The component status will be updated
3217                                          * when its prep() is called later by
3218                                          * the LFSCK main engine. */
3219                                         list_move_tail(&com->lc_link,
3220                                                        &lfsck->li_list_scan);
3221                                 start->ls_active &= ~type;
3222                         }
3223                         type <<= 1;
3224                 }
3225         }
3226
3227         if (list_empty(&lfsck->li_list_scan)) {
3228                 /* The speed limit will be used to control both the LFSCK and
3229                  * low layer scrub (if applied), need to be handled firstly. */
3230                 if (start->ls_valid & LSV_SPEED_LIMIT) {
3231                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
3232                                 rc = lfsck_bookmark_store(env, lfsck);
3233                                 if (rc != 0)
3234                                         GOTO(out, rc);
3235                         }
3236                 }
3237
3238                 goto trigger;
3239         }
3240
3241         if (start->ls_flags & LPF_RESET)
3242                 flags |= DOIF_RESET;
3243
3244         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
3245         if (rc != 0)
3246                 GOTO(out, rc);
3247
3248         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3249                 start->ls_active |= com->lc_type;
3250                 if (flags & DOIF_RESET) {
3251                         rc = com->lc_ops->lfsck_reset(env, com, false);
3252                         if (rc != 0)
3253                                 GOTO(out, rc);
3254                 }
3255         }
3256
3257         ltds = &lfsck->li_mdt_descs;
3258         down_read(&ltds->ltd_rw_sem);
3259         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3260                 ltd = lfsck_ltd2tgt(ltds, idx);
3261                 LASSERT(ltd != NULL);
3262
3263                 ltd->ltd_layout_done = 0;
3264                 ltd->ltd_namespace_done = 0;
3265                 ltd->ltd_synced_failures = 0;
3266                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_NAMESPACE);
3267                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3268                 list_del_init(&ltd->ltd_layout_phase_list);
3269                 list_del_init(&ltd->ltd_layout_list);
3270                 list_del_init(&ltd->ltd_namespace_phase_list);
3271                 list_del_init(&ltd->ltd_namespace_list);
3272         }
3273         up_read(&ltds->ltd_rw_sem);
3274
3275         ltds = &lfsck->li_ost_descs;
3276         down_read(&ltds->ltd_rw_sem);
3277         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3278                 ltd = lfsck_ltd2tgt(ltds, idx);
3279                 LASSERT(ltd != NULL);
3280
3281                 ltd->ltd_layout_done = 0;
3282                 ltd->ltd_synced_failures = 0;
3283                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3284                 list_del_init(&ltd->ltd_layout_phase_list);
3285                 list_del_init(&ltd->ltd_layout_list);
3286         }
3287         up_read(&ltds->ltd_rw_sem);
3288
3289 trigger:
3290         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
3291         if (bk->lb_param & LPF_DRYRUN)
3292                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
3293
3294         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
3295                 valid |= DOIV_ERROR_HANDLE;
3296                 if (start->ls_flags & LPF_FAILOUT)
3297                         flags |= DOIF_FAILOUT;
3298         }
3299
3300         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
3301                 valid |= DOIV_DRYRUN;
3302                 if (start->ls_flags & LPF_DRYRUN)
3303                         flags |= DOIF_DRYRUN;
3304         }
3305
3306         if (!list_empty(&lfsck->li_list_scan))
3307                 flags |= DOIF_OUTUSED;
3308
3309         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
3310         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
3311         if (IS_ERR(lta))
3312                 GOTO(out, rc = PTR_ERR(lta));
3313
3314         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
3315         spin_lock(&lfsck->li_lock);
3316         thread_set_flags(thread, SVC_STARTING);
3317         spin_unlock(&lfsck->li_lock);
3318         task = kthread_run(lfsck_master_engine, lta, "lfsck");
3319         if (IS_ERR(task)) {
3320                 rc = PTR_ERR(task);
3321                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
3322                        lfsck_lfsck2name(lfsck), rc);
3323                 lfsck_thread_args_fini(lta);
3324
3325                 GOTO(out, rc);
3326         }
3327
3328         wait_event_idle(thread->t_ctl_waitq,
3329                         thread_is_running(thread) ||
3330                         thread_is_stopped(thread));
3331         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
3332                 lfsck->li_start_unplug = 1;
3333                 wake_up(&thread->t_ctl_waitq);
3334
3335                 GOTO(out, rc = 0);
3336         }
3337
3338         /* release lfsck::li_mutex to avoid deadlock. */
3339         mutex_unlock(&lfsck->li_mutex);
3340         rc = lfsck_start_all(env, lfsck, start);
3341         if (rc != 0) {
3342                 spin_lock(&lfsck->li_lock);
3343                 if (thread_is_stopped(thread)) {
3344                         spin_unlock(&lfsck->li_lock);
3345                 } else {
3346                         lfsck->li_status = LS_FAILED;
3347                         lfsck->li_flags = 0;
3348                         thread_set_flags(thread, SVC_STOPPING);
3349                         spin_unlock(&lfsck->li_lock);
3350
3351                         lfsck->li_start_unplug = 1;
3352                         wake_up(&thread->t_ctl_waitq);
3353                         wait_event_idle(thread->t_ctl_waitq,
3354                                         thread_is_stopped(thread));
3355                 }
3356         } else {
3357                 lfsck->li_start_unplug = 1;
3358                 wake_up(&thread->t_ctl_waitq);
3359         }
3360
3361         GOTO(put, rc);
3362
3363 out:
3364         mutex_unlock(&lfsck->li_mutex);
3365
3366 put:
3367         lfsck_instance_put(env, lfsck);
3368
3369         return rc < 0 ? rc : 0;
3370 }
3371 EXPORT_SYMBOL(lfsck_start);
3372
3373 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
3374                struct lfsck_stop *stop)
3375 {
3376         struct lfsck_instance   *lfsck;
3377         struct ptlrpc_thread    *thread;
3378         int                      rc     = 0;
3379         int                      rc1    = 0;
3380         ENTRY;
3381
3382         lfsck = lfsck_instance_find(key, true, false);
3383         if (unlikely(lfsck == NULL))
3384                 RETURN(-ENXIO);
3385
3386         thread = &lfsck->li_thread;
3387         if (stop && stop->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3388                 CERROR("%s: only allow to specify '-A' via MDS\n",
3389                        lfsck_lfsck2name(lfsck));
3390                 GOTO(put, rc = -EPERM);
3391         }
3392
3393         spin_lock(&lfsck->li_lock);
3394         /* The target is umounted */
3395         if (stop && stop->ls_status == LS_PAUSED)
3396                 lfsck->li_stopping = 1;
3397
3398         if (thread_is_init(thread) || thread_is_stopped(thread))
3399                 /* no error if LFSCK stopped already, or not started */
3400                 GOTO(unlock, rc = 0);
3401
3402         if (thread_is_stopping(thread))
3403                 /* Someone is stopping LFSCK. */
3404                 GOTO(unlock, rc = -EINPROGRESS);
3405
3406         if (stop) {
3407                 lfsck->li_status = stop->ls_status;
3408                 lfsck->li_flags = stop->ls_flags;
3409         } else {
3410                 lfsck->li_status = LS_STOPPED;
3411                 lfsck->li_flags = 0;
3412         }
3413
3414         thread_set_flags(thread, SVC_STOPPING);
3415
3416         LASSERT(lfsck->li_task);
3417         send_sig(SIGINT, lfsck->li_task, 1);
3418
3419         if (lfsck->li_master) {
3420                 struct lfsck_component *com;
3421                 struct lfsck_assistant_data *lad;
3422
3423                 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3424                         lad = com->lc_data;
3425                         spin_lock(&lad->lad_lock);
3426                         if (lad->lad_task)
3427                                 send_sig(SIGINT, lad->lad_task, 1);
3428                         spin_unlock(&lad->lad_lock);
3429                 }
3430
3431                 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
3432                         lad = com->lc_data;
3433                         spin_lock(&lad->lad_lock);
3434                         if (lad->lad_task)
3435                                 send_sig(SIGINT, lad->lad_task, 1);
3436                         spin_unlock(&lad->lad_lock);
3437                 }
3438         }
3439
3440         wake_up(&thread->t_ctl_waitq);
3441         spin_unlock(&lfsck->li_lock);
3442         if (stop && stop->ls_flags & LPF_BROADCAST)
3443                 rc1 = lfsck_stop_all(env, lfsck, stop);
3444
3445         /* It was me set the status as 'stopping' just now, if it is not
3446          * 'stopping' now, then either stopped, or re-started by race. */
3447         wait_event_idle(thread->t_ctl_waitq,
3448                         !thread_is_stopping(thread));
3449
3450         GOTO(put, rc = 0);
3451
3452 unlock:
3453         spin_unlock(&lfsck->li_lock);
3454 put:
3455         lfsck_instance_put(env, lfsck);
3456
3457         return rc != 0 ? rc : rc1;
3458 }
3459 EXPORT_SYMBOL(lfsck_stop);
3460
3461 int lfsck_in_notify_local(const struct lu_env *env, struct dt_device *key,
3462                           struct lfsck_req_local *lrl, struct thandle *th)
3463 {
3464         struct lfsck_instance *lfsck;
3465         struct lfsck_component *com;
3466         int rc = -EOPNOTSUPP;
3467         ENTRY;
3468
3469         lfsck = lfsck_instance_find(key, true, false);
3470         if (unlikely(!lfsck))
3471                 RETURN(-ENXIO);
3472
3473         com = lfsck_component_find(lfsck, lrl->lrl_active);
3474         if (likely(com && com->lc_ops->lfsck_in_notify_local)) {
3475                 rc = com->lc_ops->lfsck_in_notify_local(env, com, lrl, th);
3476                 lfsck_component_put(env, com);
3477         }
3478
3479         lfsck_instance_put(env, lfsck);
3480
3481         RETURN(rc);
3482 }
3483 EXPORT_SYMBOL(lfsck_in_notify_local);
3484
3485 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
3486                     struct lfsck_request *lr)
3487 {
3488         int rc = -EOPNOTSUPP;
3489         ENTRY;
3490
3491         switch (lr->lr_event) {
3492         case LE_START: {
3493                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
3494                 struct lfsck_start_param  lsp;
3495
3496                 memset(start, 0, sizeof(*start));
3497                 start->ls_valid = lr->lr_valid;
3498                 start->ls_speed_limit = lr->lr_speed;
3499                 start->ls_version = lr->lr_version;
3500                 start->ls_active = lr->lr_active;
3501                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3502                 start->ls_async_windows = lr->lr_async_windows;
3503
3504                 lsp.lsp_start = start;
3505                 lsp.lsp_index = lr->lr_index;
3506                 lsp.lsp_index_valid = 1;
3507                 rc = lfsck_start(env, key, &lsp);
3508                 break;
3509         }
3510         case LE_STOP: {
3511                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3512
3513                 memset(stop, 0, sizeof(*stop));
3514                 stop->ls_status = lr->lr_status;
3515                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3516                 rc = lfsck_stop(env, key, stop);
3517                 break;
3518         }
3519         case LE_PHASE1_DONE:
3520         case LE_PHASE2_DONE:
3521         case LE_PEER_EXIT:
3522         case LE_CONDITIONAL_DESTROY:
3523         case LE_SET_LMV_MASTER:
3524         case LE_SET_LMV_SLAVE:
3525         case LE_PAIRS_VERIFY: {
3526                 struct lfsck_instance  *lfsck;
3527                 struct lfsck_component *com;
3528
3529                 lfsck = lfsck_instance_find(key, true, false);
3530                 if (unlikely(lfsck == NULL))
3531                         RETURN(-ENXIO);
3532
3533                 com = lfsck_component_find(lfsck, lr->lr_active);
3534                 if (likely(com)) {
3535                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
3536                         lfsck_component_put(env, com);
3537                 }
3538
3539                 lfsck_instance_put(env, lfsck);
3540                 break;
3541         }
3542         default:
3543                 break;
3544         }
3545
3546         RETURN(rc);
3547 }
3548 EXPORT_SYMBOL(lfsck_in_notify);
3549
3550 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3551                 struct lfsck_request *req, struct lfsck_reply *rep,
3552                 struct lfsck_query *que)
3553 {
3554         struct lfsck_instance  *lfsck;
3555         struct lfsck_component *com;
3556         int                     i;
3557         int                     rc = 0;
3558         __u16                   type;
3559         ENTRY;
3560
3561         lfsck = lfsck_instance_find(key, true, false);
3562         if (unlikely(lfsck == NULL))
3563                 RETURN(-ENXIO);
3564
3565         if (que != NULL) {
3566                 if (que->lu_types == LFSCK_TYPES_ALL)
3567                         que->lu_types =
3568                                 LFSCK_TYPES_SUPPORTED & ~LFSCK_TYPE_SCRUB;
3569
3570                 if (que->lu_types & ~LFSCK_TYPES_SUPPORTED) {
3571                         que->lu_types &= ~LFSCK_TYPES_SUPPORTED;
3572
3573                         GOTO(out, rc = -ENOTSUPP);
3574                 }
3575
3576                 for (i = 0, type = BIT(i); i < LFSCK_TYPE_BITS;
3577                      i++, type = BIT(i)) {
3578                         if (!(que->lu_types & type))
3579                                 continue;
3580
3581 again:
3582                         com = lfsck_component_find(lfsck, type);
3583                         if (unlikely(com == NULL))
3584                                 GOTO(out, rc = -ENOTSUPP);
3585
3586                         memset(que->lu_mdts_count[i], 0,
3587                                sizeof(__u32) * (LS_MAX + 1));
3588                         memset(que->lu_osts_count[i], 0,
3589                                sizeof(__u32) * (LS_MAX + 1));
3590                         que->lu_repaired[i] = 0;
3591                         rc = com->lc_ops->lfsck_query(env, com, req, rep,
3592                                                       que, i);
3593                         lfsck_component_put(env, com);
3594                         if  (rc < 0)
3595                                 GOTO(out, rc);
3596                 }
3597
3598                 if (!(que->lu_flags & LPF_WAIT))
3599                         GOTO(out, rc);
3600
3601                 for (i = 0, type = BIT(i); i < LFSCK_TYPE_BITS;
3602                      i++, type = BIT(i)) {
3603                         if (!(que->lu_types & type))
3604                                 continue;
3605
3606                         if (que->lu_mdts_count[i][LS_SCANNING_PHASE1] != 0 ||
3607                             que->lu_mdts_count[i][LS_SCANNING_PHASE2] != 0 ||
3608                             que->lu_osts_count[i][LS_SCANNING_PHASE1] != 0 ||
3609                             que->lu_osts_count[i][LS_SCANNING_PHASE2] != 0) {
3610                                 /* If it is required to wait, then sleep
3611                                  * 3 seconds and try to query again.
3612                                  */
3613                                 unsigned long timeout =
3614                                         msecs_to_jiffies(3000) + 1;
3615                                 while (timeout &&
3616                                        !fatal_signal_pending(current))
3617                                         timeout = schedule_timeout_killable(
3618                                                 timeout);
3619                                 if (timeout == 0)
3620                                         goto again;
3621                         }
3622                 }
3623         } else {
3624                 com = lfsck_component_find(lfsck, req->lr_active);
3625                 if (likely(com != NULL)) {
3626                         rc = com->lc_ops->lfsck_query(env, com, req, rep,
3627                                                       que, -1);
3628                         lfsck_component_put(env, com);
3629                 } else {
3630                         rc = -ENOTSUPP;
3631                 }
3632         }
3633
3634         GOTO(out, rc);
3635
3636 out:
3637         lfsck_instance_put(env, lfsck);
3638         return rc;
3639 }
3640 EXPORT_SYMBOL(lfsck_query);
3641
3642 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3643                              struct ldlm_namespace *ns)
3644 {
3645         struct lfsck_instance  *lfsck;
3646         int                     rc      = -ENXIO;
3647
3648         lfsck = lfsck_instance_find(key, true, false);
3649         if (likely(lfsck != NULL)) {
3650                 lfsck->li_namespace = ns;
3651                 lfsck_instance_put(env, lfsck);
3652                 rc = 0;
3653         }
3654
3655         return rc;
3656 }
3657 EXPORT_SYMBOL(lfsck_register_namespace);
3658
3659 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3660                    struct dt_device *next, struct obd_device *obd,
3661                    lfsck_out_notify notify, void *notify_data, bool master)
3662 {
3663         struct lfsck_instance   *lfsck;
3664         struct dt_object        *root  = NULL;
3665         struct dt_object        *obj   = NULL;
3666         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3667         int                      rc;
3668         ENTRY;
3669
3670         lfsck = lfsck_instance_find(key, false, false);
3671         if (unlikely(lfsck != NULL))
3672                 RETURN(-EEXIST);
3673
3674         OBD_ALLOC_PTR(lfsck);
3675         if (lfsck == NULL)
3676                 RETURN(-ENOMEM);
3677
3678         mutex_init(&lfsck->li_mutex);
3679         spin_lock_init(&lfsck->li_lock);
3680         INIT_LIST_HEAD(&lfsck->li_link);
3681         INIT_LIST_HEAD(&lfsck->li_list_scan);
3682         INIT_LIST_HEAD(&lfsck->li_list_dir);
3683         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3684         INIT_LIST_HEAD(&lfsck->li_list_idle);
3685         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3686         atomic_set(&lfsck->li_ref, 1);
3687         atomic_set(&lfsck->li_double_scan_count, 0);
3688         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3689         lfsck->li_out_notify = notify;
3690         lfsck->li_out_notify_data = notify_data;
3691         lfsck->li_next = next;
3692         lfsck->li_bottom = key;
3693         lfsck->li_obd = obd;
3694
3695         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3696         if (rc != 0)
3697                 GOTO(out, rc);
3698
3699         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3700         if (rc != 0)
3701                 GOTO(out, rc);
3702
3703         fid->f_seq = FID_SEQ_LOCAL_NAME;
3704         fid->f_oid = 1;
3705         fid->f_ver = 0;
3706         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3707         if (rc != 0)
3708                 GOTO(out, rc);
3709
3710         rc = dt_root_get(env, key, fid);
3711         if (rc != 0)
3712                 GOTO(out, rc);
3713
3714         root = dt_locate(env, key, fid);
3715         if (IS_ERR(root))
3716                 GOTO(out, rc = PTR_ERR(root));
3717
3718         lfsck->li_local_root_fid = *fid;
3719         if (master) {
3720                 lfsck->li_master = 1;
3721                 if (lfsck_dev_idx(lfsck) == 0) {
3722                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3723                         const struct lu_name *cname;
3724
3725                         rc = dt_lookup_dir(env, root, "ROOT",
3726                                            &lfsck->li_global_root_fid);
3727                         if (rc != 0)
3728                                 GOTO(out, rc);
3729
3730                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3731                         if (IS_ERR(obj))
3732                                 GOTO(out, rc = PTR_ERR(obj));
3733
3734                         rc = dt_lookup_dir(env, obj, dotlustre, fid);
3735                         if (rc != 0)
3736                                 GOTO(out, rc);
3737
3738                         lfsck_object_put(env, obj);
3739                         obj = dt_locate(env, key, fid);
3740                         if (IS_ERR(obj))
3741                                 GOTO(out, rc = PTR_ERR(obj));
3742
3743                         cname = lfsck_name_get_const(env, dotlustre,
3744                                                      strlen(dotlustre));
3745                         rc = lfsck_verify_linkea(env, lfsck, obj, cname,
3746                                                  &lfsck->li_global_root_fid);
3747                         if (rc != 0)
3748                                 GOTO(out, rc);
3749
3750                         *pfid = *fid;
3751                         rc = dt_lookup_dir(env, obj, lostfound, fid);
3752                         if (rc != 0)
3753                                 GOTO(out, rc);
3754
3755                         lfsck_object_put(env, obj);
3756                         obj = dt_locate(env, key, fid);
3757                         if (IS_ERR(obj))
3758                                 GOTO(out, rc = PTR_ERR(obj));
3759
3760                         cname = lfsck_name_get_const(env, lostfound,
3761                                                      strlen(lostfound));
3762                         rc = lfsck_verify_linkea(env, lfsck, obj, cname, pfid);
3763                         if (rc != 0)
3764                                 GOTO(out, rc);
3765
3766                         lfsck_object_put(env, obj);
3767                         obj = NULL;
3768                 }
3769         }
3770
3771         fid->f_seq = FID_SEQ_LOCAL_FILE;
3772         fid->f_oid = OTABLE_IT_OID;
3773         fid->f_ver = 0;
3774         obj = dt_locate(env, key, fid);
3775         if (IS_ERR(obj))
3776                 GOTO(out, rc = PTR_ERR(obj));
3777
3778         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3779         if (rc != 0)
3780                 GOTO(out, rc);
3781
3782         lfsck->li_obj_oit = obj;
3783         obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3784                                         S_IFDIR | S_IRUGO | S_IWUSR);
3785         if (IS_ERR(obj))
3786                 GOTO(out, rc = PTR_ERR(obj));
3787
3788         lu_object_get(&obj->do_lu);
3789         lfsck->li_lfsck_dir = obj;
3790         rc = lfsck_bookmark_setup(env, lfsck);
3791         if (rc != 0)
3792                 GOTO(out, rc);
3793
3794         if (master) {
3795                 rc = lfsck_fid_init(lfsck);
3796                 if (rc < 0)
3797                         GOTO(out, rc);
3798
3799                 rc = lfsck_namespace_setup(env, lfsck);
3800                 if (rc < 0)
3801                         GOTO(out, rc);
3802         }
3803
3804         rc = lfsck_layout_setup(env, lfsck);
3805         if (rc < 0)
3806                 GOTO(out, rc);
3807
3808         /* XXX: more LFSCK components initialization to be added here. */
3809
3810         rc = lfsck_instance_add(lfsck);
3811         if (rc == 0)
3812                 rc = lfsck_add_target_from_orphan(env, lfsck);
3813 out:
3814         if (obj != NULL && !IS_ERR(obj))
3815                 lfsck_object_put(env, obj);
3816         if (root != NULL && !IS_ERR(root))
3817                 lfsck_object_put(env, root);
3818         if (rc != 0)
3819                 lfsck_instance_cleanup(env, lfsck);
3820         return rc;
3821 }
3822 EXPORT_SYMBOL(lfsck_register);
3823
3824 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3825 {
3826         struct lfsck_instance *lfsck;
3827
3828         lfsck = lfsck_instance_find(key, false, true);
3829         if (lfsck != NULL)
3830                 lfsck_instance_put(env, lfsck);
3831 }
3832 EXPORT_SYMBOL(lfsck_degister);
3833
3834 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3835                      struct dt_device *tgt, struct obd_export *exp,
3836                      __u32 index, bool for_ost)
3837 {
3838         struct lfsck_instance   *lfsck;
3839         struct lfsck_tgt_desc   *ltd;
3840         int                      rc;
3841         ENTRY;
3842
3843         OBD_ALLOC_PTR(ltd);
3844         if (ltd == NULL)
3845                 RETURN(-ENOMEM);
3846
3847         ltd->ltd_tgt = tgt;
3848         ltd->ltd_key = key;
3849         ltd->ltd_exp = exp;
3850         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3851         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3852         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3853         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3854         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3855         atomic_set(&ltd->ltd_ref, 1);
3856         ltd->ltd_index = index;
3857
3858         spin_lock(&lfsck_instance_lock);
3859         lfsck = __lfsck_instance_find(key, true, false);
3860         if (lfsck == NULL) {
3861                 if (for_ost)
3862                         list_add_tail(&ltd->ltd_orphan_list,
3863                                       &lfsck_ost_orphan_list);
3864                 else
3865                         list_add_tail(&ltd->ltd_orphan_list,
3866                                       &lfsck_mdt_orphan_list);
3867                 spin_unlock(&lfsck_instance_lock);
3868
3869                 RETURN(0);
3870         }
3871         spin_unlock(&lfsck_instance_lock);
3872
3873         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3874         if (rc != 0)
3875                 lfsck_tgt_put(ltd);
3876
3877         lfsck_instance_put(env, lfsck);
3878
3879         RETURN(rc);
3880 }
3881 EXPORT_SYMBOL(lfsck_add_target);
3882
3883 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3884                       struct dt_device *tgt, __u32 index, bool for_ost)
3885 {
3886         struct lfsck_instance   *lfsck;
3887         struct lfsck_tgt_descs  *ltds;
3888         struct lfsck_tgt_desc   *ltd;
3889         struct list_head        *head;
3890
3891         if (for_ost)
3892                 head = &lfsck_ost_orphan_list;
3893         else
3894                 head = &lfsck_mdt_orphan_list;
3895
3896         spin_lock(&lfsck_instance_lock);
3897         list_for_each_entry(ltd, head, ltd_orphan_list) {
3898                 if (ltd->ltd_tgt == tgt) {
3899                         list_del_init(&ltd->ltd_orphan_list);
3900                         spin_unlock(&lfsck_instance_lock);
3901                         lfsck_tgt_put(ltd);
3902
3903                         return;
3904                 }
3905         }
3906
3907         ltd = NULL;
3908         lfsck = __lfsck_instance_find(key, true, false);
3909         spin_unlock(&lfsck_instance_lock);
3910         if (unlikely(lfsck == NULL))
3911                 return;
3912
3913         if (for_ost)
3914                 ltds = &lfsck->li_ost_descs;
3915         else
3916                 ltds = &lfsck->li_mdt_descs;
3917
3918         down_write(&ltds->ltd_rw_sem);
3919         LASSERT(ltds->ltd_tgts_bitmap);
3920
3921         if (unlikely(index >= ltds->ltd_tgts_mask_len))
3922                 goto unlock;
3923
3924         ltd = lfsck_ltd2tgt(ltds, index);
3925         if (unlikely(ltd == NULL))
3926                 goto unlock;
3927
3928         LASSERT(ltds->ltd_tgtnr > 0);
3929
3930         ltds->ltd_tgtnr--;
3931         set_bit(index, ltds->ltd_tgts_bitmap);
3932         lfsck_assign_tgt(ltds, NULL, index);
3933
3934 unlock:
3935         if (ltd == NULL) {
3936                 if (for_ost)
3937                         head = &lfsck->li_ost_descs.ltd_orphan;
3938                 else
3939                         head = &lfsck->li_mdt_descs.ltd_orphan;
3940
3941                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3942                         if (ltd->ltd_tgt == tgt) {
3943                                 list_del_init(&ltd->ltd_orphan_list);
3944                                 break;
3945                         }
3946                 }
3947         }
3948
3949         up_write(&ltds->ltd_rw_sem);
3950         if (ltd != NULL) {
3951                 spin_lock(&ltds->ltd_lock);
3952                 ltd->ltd_dead = 1;
3953                 spin_unlock(&ltds->ltd_lock);
3954                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3955                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3956                 lfsck_tgt_put(ltd);
3957         }
3958
3959         lfsck_instance_put(env, lfsck);
3960 }
3961 EXPORT_SYMBOL(lfsck_del_target);
3962
3963 static int __init lfsck_init(void)
3964 {
3965         int rc;
3966
3967         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3968         rc = lu_context_key_register(&lfsck_thread_key);
3969         if (!rc) {
3970                 tgt_register_lfsck_in_notify_local(lfsck_in_notify_local);
3971                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3972                 tgt_register_lfsck_query(lfsck_query);
3973         }
3974
3975         return rc;
3976 }
3977
3978 static void __exit lfsck_exit(void)
3979 {
3980         struct lfsck_tgt_desc *ltd;
3981         struct lfsck_tgt_desc *next;
3982
3983         LASSERT(list_empty(&lfsck_instance_list));
3984
3985         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3986                                  ltd_orphan_list) {
3987                 list_del_init(&ltd->ltd_orphan_list);
3988                 lfsck_tgt_put(ltd);
3989         }
3990
3991         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3992                                  ltd_orphan_list) {
3993                 list_del_init(&ltd->ltd_orphan_list);
3994                 lfsck_tgt_put(ltd);
3995         }
3996
3997         lu_context_key_degister(&lfsck_thread_key);
3998 }
3999
4000 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
4001 MODULE_DESCRIPTION("Lustre File System Checker");
4002 MODULE_VERSION(LUSTRE_VERSION_STRING);
4003 MODULE_LICENSE("GPL");
4004
4005 module_init(lfsck_init);
4006 module_exit(lfsck_exit);