Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <linux/kthread.h>
34 #include <linux/sched.h>
35 #include <linux/list.h>
36 #include <linux/delay.h>
37 #include <lu_object.h>
38 #include <dt_object.h>
39 #include <md_object.h>
40 #include <lustre_fld.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_lfsck.h>
44 #include <lu_target.h>
45
46 #include "lfsck_internal.h"
47
48 #define LFSCK_CHECKPOINT_SKIP   1
49
50 /* define lfsck thread key */
51 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
52
53 static void lfsck_key_fini(const struct lu_context *ctx,
54                            struct lu_context_key *key, void *data)
55 {
56         struct lfsck_thread_info *info = data;
57
58         lu_buf_free(&info->lti_linkea_buf);
59         lu_buf_free(&info->lti_linkea_buf2);
60         lu_buf_free(&info->lti_big_buf);
61         OBD_FREE_PTR(info);
62 }
63
64 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
65 LU_KEY_INIT_GENERIC(lfsck);
66
67 static LIST_HEAD(lfsck_instance_list);
68 static LIST_HEAD(lfsck_ost_orphan_list);
69 static LIST_HEAD(lfsck_mdt_orphan_list);
70 static DEFINE_SPINLOCK(lfsck_instance_lock);
71
72 const char *const lfsck_flags_names[] = {
73         "scanned-once",
74         "inconsistent",
75         "upgrade",
76         "incomplete",
77         "crashed_lastid",
78         NULL
79 };
80
81 const char *const lfsck_param_names[] = {
82         NULL,
83         "failout",
84         "dryrun",
85         "all_targets",
86         "broadcast",
87         "orphan",
88         "create_ostobj",
89         "create_mdtobj",
90         NULL,
91         "delay_create_ostobj",
92         NULL
93 };
94
95 enum lfsck_verify_lpf_types {
96         LVLT_BY_BOOKMARK        = 0,
97         LVLT_BY_NAMEENTRY       = 1,
98 };
99
100 static inline void
101 lfsck_reset_ltd_status(struct lfsck_tgt_desc *ltd, enum lfsck_type type)
102 {
103         if (type == LFSCK_TYPE_LAYOUT) {
104                 ltd->ltd_layout_status = LS_MAX;
105                 ltd->ltd_layout_repaired = 0;
106         } else {
107                 ltd->ltd_namespace_status = LS_MAX;
108                 ltd->ltd_namespace_repaired = 0;
109         }
110 }
111
112 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
113 {
114         spin_lock_init(&ltds->ltd_lock);
115         init_rwsem(&ltds->ltd_rw_sem);
116         INIT_LIST_HEAD(&ltds->ltd_orphan);
117         ltds->ltd_tgts_bitmap = bitmap_zalloc(BITS_PER_LONG, GFP_KERNEL);
118         if (!ltds->ltd_tgts_bitmap)
119                 return -ENOMEM;
120
121         return 0;
122 }
123
124 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
125 {
126         struct lfsck_tgt_desc *ltd;
127         struct lfsck_tgt_desc *next;
128         int idx;
129
130         down_write(&ltds->ltd_rw_sem);
131
132         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
133                                  ltd_orphan_list) {
134                 list_del_init(&ltd->ltd_orphan_list);
135                 lfsck_tgt_put(ltd);
136         }
137
138         if (unlikely(!ltds->ltd_tgts_bitmap)) {
139                 up_write(&ltds->ltd_rw_sem);
140
141                 return;
142         }
143
144         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
145                 ltd = lfsck_ltd2tgt(ltds, idx);
146                 if (likely(ltd != NULL)) {
147                         LASSERT(list_empty(&ltd->ltd_layout_list));
148                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
149                         LASSERT(list_empty(&ltd->ltd_namespace_list));
150                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
151
152                         ltds->ltd_tgtnr--;
153                         clear_bit(idx, ltds->ltd_tgts_bitmap);
154                         lfsck_assign_tgt(ltds, NULL, idx);
155                         lfsck_tgt_put(ltd);
156                 }
157         }
158
159         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
160                  ltds->ltd_tgtnr);
161
162         for (idx = 0; idx < ARRAY_SIZE(ltds->ltd_tgts_idx); idx++) {
163                 if (ltds->ltd_tgts_idx[idx] != NULL) {
164                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
165                         ltds->ltd_tgts_idx[idx] = NULL;
166                 }
167         }
168
169         bitmap_free(ltds->ltd_tgts_bitmap);
170         ltds->ltd_tgts_bitmap = NULL;
171         up_write(&ltds->ltd_rw_sem);
172 }
173
174 static int __lfsck_add_target(const struct lu_env *env,
175                               struct lfsck_instance *lfsck,
176                               struct lfsck_tgt_desc *ltd,
177                               bool for_ost, bool locked)
178 {
179         struct lfsck_tgt_descs *ltds;
180         __u32 index = ltd->ltd_index;
181         int rc = 0;
182
183         ENTRY;
184         if (for_ost)
185                 ltds = &lfsck->li_ost_descs;
186         else
187                 ltds = &lfsck->li_mdt_descs;
188
189         if (!locked)
190                 down_write(&ltds->ltd_rw_sem);
191
192         LASSERT(ltds->ltd_tgts_bitmap);
193
194         if (index >= ltds->ltd_tgts_mask_len) {
195                 u32 newsize = max_t(u32, ltds->ltd_tgts_mask_len,
196                                     BITS_PER_LONG);
197                 unsigned long *old_bitmap = ltds->ltd_tgts_bitmap;
198                 unsigned long *new_bitmap;
199
200                 while (newsize < index + 1)
201                         newsize <<= 1;
202
203                 new_bitmap = bitmap_zalloc(newsize, GFP_KERNEL);
204                 if (!new_bitmap)
205                         GOTO(unlock, rc = -ENOMEM);
206
207                 if (ltds->ltd_tgtnr > 0) {
208                         bitmap_copy(new_bitmap, old_bitmap,
209                                     ltds->ltd_tgts_mask_len);
210                 }
211                 ltds->ltd_tgts_bitmap = new_bitmap;
212                 ltds->ltd_tgts_mask_len = newsize;
213                 bitmap_free(old_bitmap);
214         }
215
216         if (test_bit(index, ltds->ltd_tgts_bitmap)) {
217                 CERROR("%s: the device %s (%u) is registered already\n",
218                        lfsck_lfsck2name(lfsck),
219                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
220                 GOTO(unlock, rc = -EEXIST);
221         }
222
223         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
224                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
225                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
226                         GOTO(unlock, rc = -ENOMEM);
227         }
228
229         lfsck_assign_tgt(ltds, ltd, index);
230         set_bit(index, ltds->ltd_tgts_bitmap);
231         ltds->ltd_tgtnr++;
232
233         GOTO(unlock, rc = 0);
234
235 unlock:
236         if (!locked)
237                 up_write(&ltds->ltd_rw_sem);
238
239         return rc;
240 }
241
242 static int lfsck_add_target_from_orphan(const struct lu_env *env,
243                                         struct lfsck_instance *lfsck)
244 {
245         struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
246         struct lfsck_tgt_desc *ltd;
247         struct lfsck_tgt_desc *next;
248         struct list_head *head = &lfsck_ost_orphan_list;
249         int rc;
250         bool for_ost = true;
251
252 again:
253         spin_lock(&lfsck_instance_lock);
254         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
255                 if (ltd->ltd_key == lfsck->li_bottom)
256                         list_move_tail(&ltd->ltd_orphan_list,
257                                        &ltds->ltd_orphan);
258         }
259         spin_unlock(&lfsck_instance_lock);
260
261         down_write(&ltds->ltd_rw_sem);
262         while (!list_empty(&ltds->ltd_orphan)) {
263                 ltd = list_first_entry(&ltds->ltd_orphan,
264                                        struct lfsck_tgt_desc,
265                                        ltd_orphan_list);
266                 list_del_init(&ltd->ltd_orphan_list);
267                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
268                 /* Do not hold the semaphore for too long time. */
269                 up_write(&ltds->ltd_rw_sem);
270                 if (rc != 0)
271                         return rc;
272
273                 down_write(&ltds->ltd_rw_sem);
274         }
275         up_write(&ltds->ltd_rw_sem);
276
277         if (for_ost) {
278                 ltds = &lfsck->li_mdt_descs;
279                 head = &lfsck_mdt_orphan_list;
280                 for_ost = false;
281                 goto again;
282         }
283
284         return 0;
285 }
286
287 static inline struct lfsck_component *
288 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
289                        struct list_head *list)
290 {
291         struct lfsck_component *com;
292
293         list_for_each_entry(com, list, lc_link) {
294                 if (com->lc_type == type)
295                         return com;
296         }
297         return NULL;
298 }
299
300 struct lfsck_component *
301 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
302 {
303         struct lfsck_component *com;
304
305         spin_lock(&lfsck->li_lock);
306         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
307         if (com != NULL)
308                 goto unlock;
309
310         com = __lfsck_component_find(lfsck, type,
311                                      &lfsck->li_list_double_scan);
312         if (com != NULL)
313                 goto unlock;
314
315         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
316
317 unlock:
318         if (com != NULL)
319                 lfsck_component_get(com);
320         spin_unlock(&lfsck->li_lock);
321         return com;
322 }
323
324 void lfsck_component_cleanup(const struct lu_env *env,
325                              struct lfsck_component *com)
326 {
327         if (!list_empty(&com->lc_link))
328                 list_del_init(&com->lc_link);
329         if (!list_empty(&com->lc_link_dir))
330                 list_del_init(&com->lc_link_dir);
331
332         lfsck_component_put(env, com);
333 }
334
335 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
336                     struct lu_fid *fid, bool locked)
337 {
338         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
339         int                      rc = 0;
340
341         ENTRY;
342         if (!locked)
343                 mutex_lock(&lfsck->li_mutex);
344
345         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
346         if (rc >= 0) {
347                 bk->lb_last_fid = *fid;
348                 /* We do not care about whether the subsequent sub-operations
349                  * failed or not. The worst case is that one FID is lost that
350                  * is not a big issue for the LFSCK since it is relative rare
351                  * for LFSCK create.
352                  */
353                 rc = lfsck_bookmark_store(env, lfsck);
354         }
355
356         if (!locked)
357                 mutex_unlock(&lfsck->li_mutex);
358
359         RETURN(rc);
360 }
361
362 static int __lfsck_ibits_lock(const struct lu_env *env,
363                               struct lfsck_instance *lfsck,
364                               struct dt_object *obj, struct ldlm_res_id *resid,
365                               struct lustre_handle *lh, __u64 bits,
366                               enum ldlm_mode mode)
367 {
368         struct lfsck_thread_info *info = lfsck_env_info(env);
369         union ldlm_policy_data *policy = &info->lti_policy;
370         __u64 flags = LDLM_FL_ATOMIC_CB;
371         int rc;
372
373         LASSERT(lfsck->li_namespace != NULL);
374
375         memset(policy, 0, sizeof(*policy));
376         policy->l_inodebits.bits = bits;
377         policy->l_inodebits.li_initiator_id = lfsck_dev_idx(lfsck);
378         if (dt_object_remote(obj)) {
379                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
380
381                 memset(einfo, 0, sizeof(*einfo));
382                 einfo->ei_type = LDLM_IBITS;
383                 einfo->ei_mode = mode;
384                 einfo->ei_cb_bl = ldlm_blocking_ast;
385                 einfo->ei_cb_cp = ldlm_completion_ast;
386                 einfo->ei_res_id = resid;
387                 einfo->ei_req_slot = 1;
388
389                 rc = dt_object_lock(env, obj, lh, einfo, policy);
390                 /* for regular checks LFSCK doesn't use LDLM locking,
391                  * so the state isn't coherent. here we just took LDLM
392                  * lock for coherency and it's time to invalidate
393                  * previous state
394                  */
395                 if (rc == ELDLM_OK)
396                         dt_invalidate(env, obj);
397         } else {
398                 rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
399                                             LDLM_IBITS, policy, mode,
400                                             &flags, ldlm_blocking_ast,
401                                             ldlm_completion_ast, NULL, NULL,
402                                             0, LVB_T_NONE, NULL, lh);
403         }
404
405         if (rc == ELDLM_OK) {
406                 rc = 0;
407         } else {
408                 memset(lh, 0, sizeof(*lh));
409                 rc = -EIO;
410         }
411
412         return rc;
413 }
414
415 /**
416  * Request the specified ibits lock for the given object.
417  *
418  * Before the LFSCK modifying on the namespace visible object,
419  * it needs to acquire related ibits ldlm lock.
420  *
421  * \param[in] env       pointer to the thread context
422  * \param[in] lfsck     pointer to the lfsck instance
423  * \param[in] obj       pointer to the dt_object to be locked
424  * \param[out] lh       pointer to the lock handle
425  * \param[in] bits      the bits for the ldlm lock to be acquired
426  * \param[in] mode      the mode for the ldlm lock to be acquired
427  *
428  * \retval              0 for success
429  * \retval              negative error number on failure
430  */
431 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
432                      struct dt_object *obj, struct lustre_handle *lh,
433                      __u64 bits, enum ldlm_mode mode)
434 {
435         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
436
437         LASSERT(!lustre_handle_is_used(lh));
438
439         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
440         return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
441 }
442
443 /**
444  * Request the remote LOOKUP lock for the given object.
445  *
446  * If \a pobj is remote, the LOOKUP lock of \a obj is on the MDT where
447  * \a pobj is, acquire LOOKUP lock there.
448  *
449  * \param[in] env       pointer to the thread context
450  * \param[in] lfsck     pointer to the lfsck instance
451  * \param[in] pobj      pointer to parent dt_object
452  * \param[in] obj       pointer to the dt_object to be locked
453  * \param[out] lh       pointer to the lock handle
454  * \param[in] mode      the mode for the ldlm lock to be acquired
455  *
456  * \retval              0 for success
457  * \retval              negative error number on failure
458  */
459 int lfsck_remote_lookup_lock(const struct lu_env *env,
460                              struct lfsck_instance *lfsck,
461                              struct dt_object *pobj, struct dt_object *obj,
462                              struct lustre_handle *lh, enum ldlm_mode mode)
463 {
464         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
465
466         LASSERT(!lustre_handle_is_used(lh));
467
468         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
469         return __lfsck_ibits_lock(env, lfsck, pobj, resid, lh,
470                                   MDS_INODELOCK_LOOKUP, mode);
471 }
472
473 /**
474  * Release the the specified ibits lock.
475  *
476  * If the lock has been acquired before, release it
477  * and cleanup the handle. Otherwise, do nothing.
478  *
479  * \param[in] lh        pointer to the lock handle
480  * \param[in] mode      the mode for the ldlm lock to be released
481  */
482 void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
483 {
484         if (lustre_handle_is_used(lh)) {
485                 ldlm_lock_decref(lh, mode);
486                 memset(lh, 0, sizeof(*lh));
487         }
488 }
489
490 /**
491  * Request compound ibits locks for the given <obj, name> pairs.
492  *
493  * Before the LFSCK modifying on the namespace visible object, it needs to
494  * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
495  * the lock purpose. But the simple lfsck_ibits_lock for directory-based
496  * modificationis (such as insert name entry to the directory) may be too
497  * coarse-grained and not efficient.
498  *
499  * The lfsck_lock() will request compound ibits locks on the specified
500  * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
501  * lock on the directory object, and the regular ibits lock on the name hash.
502  *
503  * \param[in] env       pointer to the thread context
504  * \param[in] lfsck     pointer to the lfsck instance
505  * \param[in] obj       pointer to the dt_object to be locked
506  * \param[in] name      used for building the PDO lock resource
507  * \param[out] llh      pointer to the lfsck_lock_handle
508  * \param[in] bits      the bits for the ldlm lock to be acquired
509  * \param[in] mode      the mode for the ldlm lock to be acquired
510  *
511  * \retval              0 for success
512  * \retval              negative error number on failure
513  */
514 int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
515                struct dt_object *obj, const char *name,
516                struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode)
517 {
518         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
519         int rc;
520
521         LASSERT(S_ISDIR(lfsck_object_type(obj)));
522         LASSERT(name != NULL);
523         LASSERT(name[0] != 0);
524         LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
525         LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
526
527         switch (mode) {
528         case LCK_EX:
529                 llh->llh_pdo_mode = LCK_EX;
530                 break;
531         case LCK_PW:
532                 llh->llh_pdo_mode = LCK_CW;
533                 break;
534         case LCK_PR:
535                 llh->llh_pdo_mode = LCK_CR;
536                 break;
537         default:
538                 CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
539                        DFID"\n", lfsck_lfsck2name(lfsck), mode,
540                        PFID(lfsck_dto2fid(obj)));
541                 LBUG();
542         }
543
544         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
545         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
546                                 MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
547         if (rc != 0)
548                 return rc;
549
550         llh->llh_reg_mode = mode;
551         resid->name[LUSTRE_RES_ID_HSH_OFF] = ll_full_name_hash(NULL, name,
552                                                                strlen(name));
553         LASSERT(resid->name[LUSTRE_RES_ID_HSH_OFF] != 0);
554         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
555                                 bits, llh->llh_reg_mode);
556         if (rc != 0)
557                 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
558
559         return rc;
560 }
561
562 /**
563  * Release the the compound ibits locks.
564  *
565  * \param[in] llh       pointer to the lfsck_lock_handle to be released
566  */
567 void lfsck_unlock(struct lfsck_lock_handle *llh)
568 {
569         lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
570         lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
571 }
572
573 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
574                               struct lfsck_instance *lfsck,
575                               const struct lu_fid *fid)
576 {
577         struct seq_server_site *ss = lfsck_dev_site(lfsck);
578         struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
579         int rc;
580
581         if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
582                 /* "ROOT" is always on the MDT0. */
583                 if (lu_fid_eq(fid, &lfsck->li_global_root_fid))
584                         return 0;
585
586                 return lfsck_dev_idx(lfsck);
587         }
588
589         fld_range_set_mdt(range);
590         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
591         if (rc == 0)
592                 rc = range->lsr_index;
593
594         return rc;
595 }
596
597 const char dot[] = ".";
598 const char dotdot[] = "..";
599 static const char dotlustre[] = ".lustre";
600 static const char lostfound[] = "lost+found";
601
602 /**
603  * Remove the name entry from the .lustre/lost+found directory.
604  *
605  * No need to care about the object referenced by the name entry,
606  * either the name entry is invalid or redundant, or the referenced
607  * object has been processed or will be handled by others.
608  *
609  * \param[in] env       pointer to the thread context
610  * \param[in] lfsck     pointer to the lfsck instance
611  * \param[in] name      the name for the name entry to be removed
612  *
613  * \retval              0 for success
614  * \retval              negative error number on failure
615  */
616 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
617                                        struct lfsck_instance *lfsck,
618                                        const char *name)
619 {
620         struct dt_object *parent = lfsck->li_lpf_root_obj;
621         struct dt_device *dev = lfsck_obj2dev(parent);
622         struct thandle *th;
623         struct lfsck_lock_handle *llh = &lfsck_env_info(env)->lti_llh;
624         int rc;
625
626         ENTRY;
627         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
628                 RETURN(0);
629
630         rc = lfsck_lock(env, lfsck, parent, name, llh,
631                         MDS_INODELOCK_UPDATE, LCK_PW);
632         if (rc != 0)
633                 RETURN(rc);
634
635         th = lfsck_trans_create(env, dev, lfsck);
636         if (IS_ERR(th))
637                 GOTO(unlock, rc = PTR_ERR(th));
638
639         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
640         if (rc != 0)
641                 GOTO(stop, rc);
642
643         rc = dt_declare_ref_del(env, parent, th);
644         if (rc != 0)
645                 GOTO(stop, rc);
646
647         rc = dt_trans_start_local(env, dev, th);
648         if (rc != 0)
649                 GOTO(stop, rc);
650
651         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
652         if (rc != 0)
653                 GOTO(stop, rc);
654
655         dt_write_lock(env, parent, 0);
656         rc = dt_ref_del(env, parent, th);
657         dt_write_unlock(env, parent);
658
659         GOTO(stop, rc);
660
661 stop:
662         dt_trans_stop(env, dev, th);
663
664 unlock:
665         lfsck_unlock(llh);
666
667         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
668                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
669
670         return rc;
671 }
672
673 static int lfsck_create_lpf_local(const struct lu_env *env,
674                                   struct lfsck_instance *lfsck,
675                                   struct dt_object *child,
676                                   struct lu_attr *la,
677                                   struct dt_object_format *dof,
678                                   const char *name)
679 {
680         struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec;
681         struct dt_object *parent = lfsck->li_lpf_root_obj;
682         struct dt_device *dev = lfsck_obj2dev(child);
683         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
684         struct dt_object *bk_obj = lfsck->li_bookmark_obj;
685         const struct lu_fid *cfid = lfsck_dto2fid(child);
686         struct thandle *th = NULL;
687         struct linkea_data ldata = { NULL };
688         struct lu_buf linkea_buf;
689         const struct lu_name *cname;
690         loff_t pos = 0;
691         int len = sizeof(struct lfsck_bookmark);
692         int rc;
693
694         ENTRY;
695         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
696                 RETURN(0);
697
698         cname = lfsck_name_get_const(env, name, strlen(name));
699         rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf2,
700                               cname, lfsck_dto2fid(parent));
701         if (rc != 0)
702                 RETURN(rc);
703
704         th = lfsck_trans_create(env, dev, lfsck);
705         if (IS_ERR(th))
706                 RETURN(PTR_ERR(th));
707
708         /* 1a. create child */
709         rc = dt_declare_create(env, child, la, NULL, dof, th);
710         if (rc != 0)
711                 GOTO(stop, rc);
712
713         if (!dt_try_as_dir(env, child, false))
714                 GOTO(stop, rc = -ENOTDIR);
715
716         /* 2a. increase child nlink */
717         rc = dt_declare_ref_add(env, child, th);
718         if (rc != 0)
719                 GOTO(stop, rc);
720
721         /* 3a. insert dot into child dir */
722         rec->rec_type = S_IFDIR;
723         rec->rec_fid = cfid;
724         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
725                                (const struct dt_key *)dot, th);
726         if (rc != 0)
727                 GOTO(stop, rc);
728
729         /* 4a. insert dotdot into child dir */
730         rec->rec_fid = &LU_LPF_FID;
731         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
732                                (const struct dt_key *)dotdot, th);
733         if (rc != 0)
734                 GOTO(stop, rc);
735
736         /* 5a. insert linkEA for child */
737         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
738                        ldata.ld_leh->leh_len);
739         rc = dt_declare_xattr_set(env, child, &linkea_buf,
740                                   XATTR_NAME_LINK, 0, th);
741         if (rc != 0)
742                 GOTO(stop, rc);
743
744         /* 6a. insert name into parent dir */
745         rec->rec_type = S_IFDIR;
746         rec->rec_fid = cfid;
747         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
748                                (const struct dt_key *)name, th);
749         if (rc != 0)
750                 GOTO(stop, rc);
751
752         /* 7a. increase parent nlink */
753         rc = dt_declare_ref_add(env, parent, th);
754         if (rc != 0)
755                 GOTO(stop, rc);
756
757         /* 8a. update bookmark */
758         rc = dt_declare_record_write(env, bk_obj,
759                                      lfsck_buf_get(env, bk, len), 0, th);
760         if (rc != 0)
761                 GOTO(stop, rc);
762
763         rc = dt_trans_start_local(env, dev, th);
764         if (rc != 0)
765                 GOTO(stop, rc);
766
767         dt_write_lock(env, child, 0);
768         /* 1b. create child */
769         rc = dt_create(env, child, la, NULL, dof, th);
770         if (rc != 0)
771                 GOTO(unlock, rc);
772
773         /* 2b. increase child nlink */
774         rc = dt_ref_add(env, child, th);
775         if (rc != 0)
776                 GOTO(unlock, rc);
777
778         /* 3b. insert dot into child dir */
779         rec->rec_fid = cfid;
780         rc = dt_insert(env, child, (const struct dt_rec *)rec,
781                        (const struct dt_key *)dot, th);
782         if (rc != 0)
783                 GOTO(unlock, rc);
784
785         /* 4b. insert dotdot into child dir */
786         rec->rec_fid = &LU_LPF_FID;
787         rc = dt_insert(env, child, (const struct dt_rec *)rec,
788                        (const struct dt_key *)dotdot, th);
789         if (rc != 0)
790                 GOTO(unlock, rc);
791
792         /* 5b. insert linkEA for child. */
793         rc = dt_xattr_set(env, child, &linkea_buf,
794                           XATTR_NAME_LINK, 0, th);
795         dt_write_unlock(env, child);
796         if (rc != 0)
797                 GOTO(stop, rc);
798
799         /* 6b. insert name into parent dir */
800         rec->rec_fid = cfid;
801         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
802                        (const struct dt_key *)name, th);
803         if (rc != 0)
804                 GOTO(stop, rc);
805
806         dt_write_lock(env, parent, 0);
807         /* 7b. increase parent nlink */
808         rc = dt_ref_add(env, parent, th);
809         dt_write_unlock(env, parent);
810         if (rc != 0)
811                 GOTO(stop, rc);
812
813         bk->lb_lpf_fid = *cfid;
814         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
815
816         /* 8b. update bookmark */
817         rc = dt_record_write(env, bk_obj,
818                              lfsck_buf_get(env, bk, len), &pos, th);
819
820         GOTO(stop, rc);
821
822 unlock:
823         dt_write_unlock(env, child);
824
825 stop:
826         dt_trans_stop(env, dev, th);
827
828         return rc;
829 }
830
831 static int lfsck_create_lpf_remote(const struct lu_env *env,
832                                    struct lfsck_instance *lfsck,
833                                    struct dt_object *child,
834                                    struct lu_attr *la,
835                                    struct dt_object_format *dof,
836                                    const char *name)
837 {
838         struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec;
839         struct dt_object *parent = lfsck->li_lpf_root_obj;
840         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
841         struct dt_object *bk_obj = lfsck->li_bookmark_obj;
842         const struct lu_fid *cfid = lfsck_dto2fid(child);
843         struct thandle *th = NULL;
844         struct linkea_data ldata = { NULL };
845         struct lu_buf linkea_buf;
846         const struct lu_name *cname;
847         struct dt_device *dev;
848         loff_t pos = 0;
849         int len = sizeof(struct lfsck_bookmark);
850         int rc;
851
852         ENTRY;
853         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
854                 RETURN(0);
855
856         cname = lfsck_name_get_const(env, name, strlen(name));
857         rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf2,
858                               cname, lfsck_dto2fid(parent));
859         if (rc != 0)
860                 RETURN(rc);
861
862         /* Create .lustre/lost+found/MDTxxxx. */
863
864         /* XXX: Currently, cross-MDT create operation needs to create the child
865          *      object firstly, then insert name into the parent directory. For
866          *      this case, the child object resides on current MDT (local), but
867          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
868          *      easy to contain all the sub-modifications orderly within single
869          *      transaction.
870          *
871          *      To avoid more inconsistency, we split the create operation into
872          *      two transactions:
873          *
874          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
875          *         locally.
876          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
877          *         remotely.
878          *
879          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
880          *      repair such inconsistency when LFSCK run next time.
881          */
882
883         /* Transaction I: locally */
884
885         dev = lfsck_obj2dev(child);
886         th = lfsck_trans_create(env, dev, lfsck);
887         if (IS_ERR(th))
888                 RETURN(PTR_ERR(th));
889
890         /* 1a. create child */
891         rc = dt_declare_create(env, child, la, NULL, dof, th);
892         if (rc != 0)
893                 GOTO(stop, rc);
894
895         if (!dt_try_as_dir(env, child, false))
896                 GOTO(stop, rc = -ENOTDIR);
897
898         /* 2a. increase child nlink */
899         rc = dt_declare_ref_add(env, child, th);
900         if (rc != 0)
901                 GOTO(stop, rc);
902
903         /* 3a. insert dot into child dir */
904         rec->rec_type = S_IFDIR;
905         rec->rec_fid = cfid;
906         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
907                                (const struct dt_key *)dot, th);
908         if (rc != 0)
909                 GOTO(stop, rc);
910
911         /* 4a. insert dotdot into child dir */
912         rec->rec_fid = &LU_LPF_FID;
913         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
914                                (const struct dt_key *)dotdot, th);
915         if (rc != 0)
916                 GOTO(stop, rc);
917
918         /* 5a. insert linkEA for child */
919         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
920                        ldata.ld_leh->leh_len);
921         rc = dt_declare_xattr_set(env, child, &linkea_buf,
922                                   XATTR_NAME_LINK, 0, th);
923         if (rc != 0)
924                 GOTO(stop, rc);
925
926         /* 6a. update bookmark */
927         rc = dt_declare_record_write(env, bk_obj,
928                                      lfsck_buf_get(env, bk, len), 0, th);
929         if (rc != 0)
930                 GOTO(stop, rc);
931
932         rc = dt_trans_start_local(env, dev, th);
933         if (rc != 0)
934                 GOTO(stop, rc);
935
936         dt_write_lock(env, child, 0);
937         /* 1b. create child */
938         rc = dt_create(env, child, la, NULL, dof, th);
939         if (rc != 0)
940                 GOTO(unlock, rc);
941
942         /* 2b. increase child nlink */
943         rc = dt_ref_add(env, child, th);
944         if (rc != 0)
945                 GOTO(unlock, rc);
946
947         /* 3b. insert dot into child dir */
948         rec->rec_type = S_IFDIR;
949         rec->rec_fid = cfid;
950         rc = dt_insert(env, child, (const struct dt_rec *)rec,
951                        (const struct dt_key *)dot, th);
952         if (rc != 0)
953                 GOTO(unlock, rc);
954
955         /* 4b. insert dotdot into child dir */
956         rec->rec_fid = &LU_LPF_FID;
957         rc = dt_insert(env, child, (const struct dt_rec *)rec,
958                        (const struct dt_key *)dotdot, th);
959         if (rc != 0)
960                 GOTO(unlock, rc);
961
962         /* 5b. insert linkEA for child */
963         rc = dt_xattr_set(env, child, &linkea_buf,
964                           XATTR_NAME_LINK, 0, th);
965         if (rc != 0)
966                 GOTO(unlock, rc);
967
968         bk->lb_lpf_fid = *cfid;
969         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
970
971         /* 6b. update bookmark */
972         rc = dt_record_write(env, bk_obj,
973                              lfsck_buf_get(env, bk, len), &pos, th);
974
975         dt_write_unlock(env, child);
976         dt_trans_stop(env, dev, th);
977         if (rc != 0)
978                 RETURN(rc);
979
980         /* Transaction II: remotely */
981
982         dev = lfsck_obj2dev(parent);
983         th = lfsck_trans_create(env, dev, lfsck);
984         if (IS_ERR(th))
985                 RETURN(PTR_ERR(th));
986
987         th->th_sync = 1;
988         /* 5a. insert name into parent dir */
989         rec->rec_fid = cfid;
990         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
991                                (const struct dt_key *)name, th);
992         if (rc != 0)
993                 GOTO(stop, rc);
994
995         /* 6a. increase parent nlink */
996         rc = dt_declare_ref_add(env, parent, th);
997         if (rc != 0)
998                 GOTO(stop, rc);
999
1000         rc = dt_trans_start_local(env, dev, th);
1001         if (rc != 0)
1002                 GOTO(stop, rc);
1003
1004         /* 5b. insert name into parent dir */
1005         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1006                        (const struct dt_key *)name, th);
1007         if (rc != 0)
1008                 GOTO(stop, rc);
1009
1010         dt_write_lock(env, parent, 0);
1011         /* 6b. increase parent nlink */
1012         rc = dt_ref_add(env, parent, th);
1013         dt_write_unlock(env, parent);
1014
1015         GOTO(stop, rc);
1016
1017 unlock:
1018         dt_write_unlock(env, child);
1019 stop:
1020         dt_trans_stop(env, dev, th);
1021
1022         if (rc != 0 && dev == lfsck_obj2dev(parent))
1023                 CDEBUG(D_LFSCK,
1024                        "%s: partially created the object "DFID"for orphans, but failed to insert the name %s to the .lustre/lost+found/. Such inconsistency will be repaired when LFSCK run next time: rc = %d\n",
1025                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
1026
1027         return rc;
1028 }
1029
1030 /**
1031  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
1032  *
1033  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
1034  * orphans and other uncertain inconsistent objects found during the
1035  * LFSCK. Such directory will be created by the LFSCK engine on the
1036  * local MDT before the LFSCK scanning.
1037  *
1038  * \param[in] env       pointer to the thread context
1039  * \param[in] lfsck     pointer to the lfsck instance
1040  *
1041  * \retval              0 for success
1042  * \retval              negative error number on failure
1043  */
1044 static int lfsck_create_lpf(const struct lu_env *env,
1045                             struct lfsck_instance *lfsck)
1046 {
1047         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1048         struct lfsck_thread_info *info = lfsck_env_info(env);
1049         struct lu_fid *cfid = &info->lti_fid2;
1050         struct lu_attr *la = &info->lti_la;
1051         struct dt_object_format *dof = &info->lti_dof;
1052         struct dt_object *parent = lfsck->li_lpf_root_obj;
1053         struct dt_object *child = NULL;
1054         struct lfsck_lock_handle *llh = &info->lti_llh;
1055         char name[8];
1056         int node = lfsck_dev_idx(lfsck);
1057         int rc = 0;
1058
1059         ENTRY;
1060         LASSERT(lfsck->li_master);
1061         LASSERT(parent != NULL);
1062         LASSERT(lfsck->li_lpf_obj == NULL);
1063
1064         snprintf(name, 8, "MDT%04x", node);
1065         rc = lfsck_lock(env, lfsck, parent, name, llh,
1066                         MDS_INODELOCK_UPDATE, LCK_PW);
1067         if (rc != 0)
1068                 RETURN(rc);
1069
1070         if (fid_is_zero(&bk->lb_lpf_fid)) {
1071                 /* There is corner case that: in former LFSCK scanning we have
1072                  * created the .lustre/lost+found/MDTxxxx but failed to update
1073                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
1074                  * it from MDT0 firstly.
1075                  */
1076                 rc = dt_lookup_dir(env, parent, name, cfid);
1077                 if (rc != 0 && rc != -ENOENT)
1078                         GOTO(unlock, rc);
1079
1080                 if (rc == 0) {
1081                         bk->lb_lpf_fid = *cfid;
1082                         rc = lfsck_bookmark_store(env, lfsck);
1083                 } else {
1084                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
1085                 }
1086                 if (rc != 0)
1087                         GOTO(unlock, rc);
1088         } else {
1089                 *cfid = bk->lb_lpf_fid;
1090         }
1091
1092         child = lfsck_object_find_bottom_new(env, lfsck, cfid);
1093         if (IS_ERR(child))
1094                 GOTO(unlock, rc = PTR_ERR(child));
1095
1096         if (dt_object_exists(child)) {
1097                 if (unlikely(!dt_try_as_dir(env, child, true)))
1098                         rc = -ENOTDIR;
1099                 else
1100                         lfsck->li_lpf_obj = child;
1101
1102                 GOTO(unlock, rc);
1103         }
1104
1105         memset(la, 0, sizeof(*la));
1106         la->la_atime = la->la_mtime = la->la_ctime = ktime_get_real_seconds();
1107         la->la_mode = S_IFDIR | S_IRWXU;
1108         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1109                        LA_UID | LA_GID | LA_TYPE;
1110         memset(dof, 0, sizeof(*dof));
1111         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1112
1113         if (node == 0)
1114                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
1115         else
1116                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
1117         if (rc == 0)
1118                 lfsck->li_lpf_obj = child;
1119
1120         GOTO(unlock, rc);
1121
1122 unlock:
1123         lfsck_unlock(llh);
1124         if (rc != 0 && child != NULL && !IS_ERR(child))
1125                 lfsck_object_put(env, child);
1126
1127         return rc;
1128 }
1129
1130 /**
1131  * Scan .lustre/lost+found for bad name entries and remove them.
1132  *
1133  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
1134  * index in the system. Any other formatted name is invalid and should be
1135  * removed.
1136  *
1137  * \param[in] env       pointer to the thread context
1138  * \param[in] lfsck     pointer to the lfsck instance
1139  *
1140  * \retval              0 for success
1141  * \retval              negative error number on failure
1142  */
1143 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
1144                                       struct lfsck_instance *lfsck)
1145 {
1146         struct dt_object *parent = lfsck->li_lpf_root_obj;
1147         struct lu_dirent *ent =
1148                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
1149         const struct dt_it_ops *iops = &parent->do_index_ops->dio_it;
1150         struct dt_it *it;
1151         int rc;
1152
1153         ENTRY;
1154         it = iops->init(env, parent, LUDA_64BITHASH);
1155         if (IS_ERR(it))
1156                 RETURN(PTR_ERR(it));
1157
1158         rc = iops->load(env, it, 0);
1159         if (rc == 0)
1160                 rc = iops->next(env, it);
1161         else if (rc > 0)
1162                 rc = 0;
1163
1164         while (rc == 0) {
1165                 int off = 3;
1166
1167                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1168                 if (rc != 0)
1169                         break;
1170
1171                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1172                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1173                         goto next;
1174
1175                 /* name length must be strlen("MDTxxxx") */
1176                 if (ent->lde_namelen != 7)
1177                         goto remove;
1178
1179                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1180                         goto remove;
1181
1182                 while (off < 7 && isxdigit(ent->lde_name[off]))
1183                         off++;
1184
1185                 if (off != 7) {
1186
1187 remove:
1188                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1189                                                          ent->lde_name);
1190                         if (rc != 0)
1191                                 break;
1192                 }
1193
1194 next:
1195                 rc = iops->next(env, it);
1196         }
1197
1198         iops->put(env, it);
1199         iops->fini(env, it);
1200
1201         RETURN(rc > 0 ? 0 : rc);
1202 }
1203
1204 static int lfsck_update_lpf_entry(const struct lu_env *env,
1205                                   struct lfsck_instance *lfsck,
1206                                   struct dt_object *parent,
1207                                   struct dt_object *child,
1208                                   const char *name,
1209                                   enum lfsck_verify_lpf_types type)
1210 {
1211         int rc;
1212
1213         if (type == LVLT_BY_BOOKMARK) {
1214                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1215                                              lfsck_dto2fid(child), S_IFDIR);
1216         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1217                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1218                 rc = lfsck_bookmark_store(env, lfsck);
1219
1220                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1221                        " in the bookmark file: rc = %d\n",
1222                        lfsck_lfsck2name(lfsck),
1223                        PFID(lfsck_dto2fid(child)), rc);
1224         }
1225
1226         return rc;
1227 }
1228
1229 /**
1230  * Check whether the @child back references the @parent.
1231  *
1232  * Two cases:
1233  * 1) The child's FID is stored in the bookmark file. If the child back
1234  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1235  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1236  *    the child back references another parent2, then:
1237  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1238  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1239  *      references the child. So keep them there. As the LFSCK processing,
1240  *      the parent3 may be found, then when the LFSCK run next time, the
1241  *      inconsistency can be repaired.
1242  *
1243  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1244  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1245  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1246  *    back references another parent2, then:
1247  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1248  *      from .lustre/lost+found/;
1249  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1250  *      sub-directory name entry and update the child;
1251  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1252  *      or not, then keep them there.
1253  *
1254  * \param[in] env       pointer to the thread context
1255  * \param[in] lfsck     pointer to the lfsck instance
1256  * \param[in] child     pointer to the lost+found sub-directory object
1257  * \param[in] name      the name for lost+found sub-directory object
1258  * \param[out] fid      pointer to the buffer to hold the FID of the object
1259  *                      (called it as parent2) that is referenced via the
1260  *                      child's dotdot entry; it also can be the FID that
1261  *                      is referenced by the name entry under the parent2.
1262  * \param[in] type      to indicate where the child's FID is stored in
1263  *
1264  * \retval              positive number for uncertain inconsistency
1265  * \retval              0 for success
1266  * \retval              negative error number on failure
1267  */
1268 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1269                                   struct lfsck_instance *lfsck,
1270                                   struct dt_object *child, const char *name,
1271                                   struct lu_fid *fid,
1272                                   enum lfsck_verify_lpf_types type)
1273 {
1274         struct dt_object *parent = lfsck->li_lpf_root_obj;
1275         struct lfsck_thread_info *info = lfsck_env_info(env);
1276         char *name2 = info->lti_key;
1277         struct lu_fid *fid2 = &info->lti_fid3;
1278         struct dt_object *parent2 = NULL;
1279         struct lustre_handle lh = { 0 };
1280         int rc;
1281
1282         ENTRY;
1283         fid_zero(fid);
1284         rc = dt_lookup_dir(env, child, dotdot, fid);
1285         if (rc != 0)
1286                 GOTO(linkea, rc);
1287
1288         if (!fid_is_sane(fid))
1289                 GOTO(linkea, rc = -EINVAL);
1290
1291         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1292                 const struct lu_name *cname;
1293
1294                 if (lfsck->li_lpf_obj == NULL) {
1295                         lu_object_get(&child->do_lu);
1296                         lfsck->li_lpf_obj = child;
1297                 }
1298
1299                 cname = lfsck_name_get_const(env, name, strlen(name));
1300                 rc = lfsck_verify_linkea(env, lfsck, child, cname, &LU_LPF_FID);
1301                 if (rc == 0)
1302                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1303                                                     name, type);
1304
1305                 GOTO(out_done, rc);
1306         }
1307
1308         parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1309         if (IS_ERR(parent2))
1310                 GOTO(linkea, parent2);
1311
1312         if (!dt_try_as_dir(env, parent2, true)) {
1313                 lfsck_object_put(env, parent2);
1314
1315                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1316         }
1317
1318 linkea:
1319         /* To prevent rename/unlink race */
1320         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1321                               MDS_INODELOCK_UPDATE, LCK_PR);
1322         if (rc != 0)
1323                 GOTO(out_put, rc);
1324
1325         dt_read_lock(env, child, 0);
1326         rc = lfsck_links_get_first(env, child, name2, fid2);
1327         if (rc != 0) {
1328                 dt_read_unlock(env, child);
1329                 lfsck_ibits_unlock(&lh, LCK_PR);
1330
1331                 GOTO(out_put, rc = 1);
1332         }
1333
1334         /* It is almost impossible that the bookmark file (or the name entry)
1335          * and the linkEA hit the same data corruption. Trust the linkEA.
1336          */
1337         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1338                 dt_read_unlock(env, child);
1339                 lfsck_ibits_unlock(&lh, LCK_PR);
1340
1341                 *fid = *fid2;
1342                 if (lfsck->li_lpf_obj == NULL) {
1343                         lu_object_get(&child->do_lu);
1344                         lfsck->li_lpf_obj = child;
1345                 }
1346
1347                 /* Update the child's dotdot entry */
1348                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1349                                              &LU_LPF_FID, S_IFDIR);
1350                 if (rc == 0)
1351                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1352                                                     name, type);
1353
1354                 GOTO(out_put, rc);
1355         }
1356
1357         if (parent2 == NULL || IS_ERR(parent2)) {
1358                 dt_read_unlock(env, child);
1359                 lfsck_ibits_unlock(&lh, LCK_PR);
1360
1361                 GOTO(out_done, rc = 1);
1362         }
1363
1364         rc = dt_lookup_dir(env, parent2, name2, fid);
1365         dt_read_unlock(env, child);
1366         lfsck_ibits_unlock(&lh, LCK_PR);
1367         if (rc != 0 && rc != -ENOENT)
1368                 GOTO(out_put, rc);
1369
1370         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1371                 if (type == LVLT_BY_BOOKMARK)
1372                         GOTO(out_put, rc = 1);
1373
1374                 /* Trust the name entry, update the child's dotdot entry. */
1375                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1376                                              &LU_LPF_FID, S_IFDIR);
1377
1378                 GOTO(out_put, rc);
1379         }
1380
1381         if (type == LVLT_BY_BOOKMARK) {
1382                 /* Invalid FID record in the bookmark file, reset it. */
1383                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1384                 rc = lfsck_bookmark_store(env, lfsck);
1385
1386                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1387                        " in the bookmark file: rc = %d\n",
1388                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1389         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1390                 /* The name entry is wrong, remove it. */
1391                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1392         }
1393
1394         GOTO(out_put, rc);
1395
1396 out_put:
1397         if (parent2 != NULL && !IS_ERR(parent2))
1398                 lfsck_object_put(env, parent2);
1399
1400 out_done:
1401         return rc;
1402 }
1403
1404 /**
1405  * Verify the /ROOT/.lustre/lost+found/ directory.
1406  *
1407  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1408  * the LFSCK does not exactly know how to handle, such as orphans. So before
1409  * the LFSCK scanning the system, the consistency of such directory needs to
1410  * be verified firstly to allow the users to use it during the LFSCK.
1411  *
1412  * \param[in] env       pointer to the thread context
1413  * \param[in] lfsck     pointer to the lfsck instance
1414  *
1415  * \retval              positive number for uncertain inconsistency
1416  * \retval              0 for success
1417  * \retval              negative error number on failure
1418  */
1419 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1420 {
1421         struct lfsck_thread_info *info = lfsck_env_info(env);
1422         struct lu_fid *pfid = &info->lti_fid;
1423         struct lu_fid *cfid = &info->lti_fid2;
1424         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1425         struct dt_object *parent;
1426         /* child1's FID is in the bookmark file. */
1427         struct dt_object *child1 = NULL;
1428         /* child2's FID is in the name entry MDTxxxx. */
1429         struct dt_object *child2 = NULL;
1430         const struct lu_name *cname;
1431         char name[8];
1432         int node = lfsck_dev_idx(lfsck);
1433         int rc = 0;
1434
1435         ENTRY;
1436         LASSERT(lfsck->li_master);
1437
1438         if (lfsck_is_dryrun(lfsck))
1439                 RETURN(0);
1440
1441         if (lfsck->li_lpf_root_obj != NULL)
1442                 RETURN(0);
1443
1444         if (node == 0) {
1445                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1446                                                   &LU_LPF_FID);
1447         } else {
1448                 struct lfsck_tgt_desc *ltd;
1449
1450                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1451                 if (unlikely(ltd == NULL))
1452                         RETURN(-ENXIO);
1453
1454                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1455                                                   &LU_LPF_FID);
1456                 lfsck_tgt_put(ltd);
1457         }
1458
1459         if (IS_ERR(parent))
1460                 RETURN(PTR_ERR(parent));
1461
1462         LASSERT(dt_object_exists(parent));
1463
1464         if (unlikely(!dt_try_as_dir(env, parent, true))) {
1465                 lfsck_object_put(env, parent);
1466
1467                 GOTO(put, rc = -ENOTDIR);
1468         }
1469
1470         lfsck->li_lpf_root_obj = parent;
1471         if (node == 0) {
1472                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1473                 if (rc != 0)
1474                         CDEBUG(D_LFSCK,
1475                                "%s: scan .lustre/lost+found/ for bad sub-directories: rc = %d\n",
1476                                lfsck_lfsck2name(lfsck), rc);
1477         }
1478
1479         /* child2 */
1480         snprintf(name, 8, "MDT%04x", node);
1481         rc = dt_lookup_dir(env, parent, name, cfid);
1482         if (rc == -ENOENT) {
1483                 rc = 0;
1484                 goto find_child1;
1485         }
1486
1487         if (rc != 0)
1488                 GOTO(put, rc);
1489
1490         /* Invalid FID in the name entry, remove the name entry. */
1491         if (!fid_is_norm(cfid)) {
1492                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1493                 if (rc != 0)
1494                         GOTO(put, rc);
1495
1496                 goto find_child1;
1497         }
1498
1499         child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1500         if (IS_ERR(child2))
1501                 GOTO(put, rc = PTR_ERR(child2));
1502
1503         if (unlikely(!dt_object_exists(child2) ||
1504                      dt_object_remote(child2)) ||
1505                      !S_ISDIR(lfsck_object_type(child2))) {
1506                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1507                 if (rc != 0)
1508                         GOTO(put, rc);
1509
1510                 goto find_child1;
1511         }
1512
1513         if (unlikely(!dt_try_as_dir(env, child2, true)))
1514                 GOTO(put, rc = -ENOTDIR);
1515
1516 find_child1:
1517         if (fid_is_zero(&bk->lb_lpf_fid))
1518                 goto check_child2;
1519
1520         if (likely(lu_fid_eq(cfid, &bk->lb_lpf_fid))) {
1521                 if (lfsck->li_lpf_obj == NULL) {
1522                         lu_object_get(&child2->do_lu);
1523                         lfsck->li_lpf_obj = child2;
1524                 }
1525
1526                 cname = lfsck_name_get_const(env, name, strlen(name));
1527                 rc = lfsck_verify_linkea(env, lfsck, child2, cname,
1528                                          &LU_LPF_FID);
1529
1530                 GOTO(put, rc);
1531         }
1532
1533         if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1534                 struct lu_fid tfid = bk->lb_lpf_fid;
1535
1536                 /* Invalid FID record in the bookmark file, reset it. */
1537                 fid_zero(&bk->lb_lpf_fid);
1538                 rc = lfsck_bookmark_store(env, lfsck);
1539
1540                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1541                        " in the bookmark file: rc = %d\n",
1542                        lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1543
1544                 if (rc != 0)
1545                         GOTO(put, rc);
1546
1547                 goto check_child2;
1548         }
1549
1550         child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid);
1551         if (IS_ERR(child1)) {
1552                 child1 = NULL;
1553                 goto check_child2;
1554         }
1555
1556         if (unlikely(!dt_object_exists(child1) ||
1557                      dt_object_remote(child1)) ||
1558                      !S_ISDIR(lfsck_object_type(child1))) {
1559                 /* Invalid FID record in the bookmark file, reset it. */
1560                 fid_zero(&bk->lb_lpf_fid);
1561                 rc = lfsck_bookmark_store(env, lfsck);
1562
1563                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1564                        " in the bookmark file: rc = %d\n",
1565                        lfsck_lfsck2name(lfsck),
1566                        PFID(lfsck_dto2fid(child1)), rc);
1567
1568                 if (rc != 0)
1569                         GOTO(put, rc);
1570
1571                 lfsck_object_put(env, child1);
1572                 child1 = NULL;
1573                 goto check_child2;
1574         }
1575
1576         if (unlikely(!dt_try_as_dir(env, child1, true))) {
1577                 lfsck_object_put(env, child1);
1578                 child1 = NULL;
1579                 rc = -ENOTDIR;
1580                 goto check_child2;
1581         }
1582
1583         rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name, pfid,
1584                                     LVLT_BY_BOOKMARK);
1585         if (lu_fid_eq(pfid, &LU_LPF_FID))
1586                 GOTO(put, rc);
1587
1588 check_child2:
1589         if (child2 != NULL)
1590                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1591                                             pfid, LVLT_BY_NAMEENTRY);
1592
1593         GOTO(put, rc);
1594
1595 put:
1596         if (lfsck->li_lpf_obj != NULL) {
1597                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj, true))) {
1598                         lfsck_object_put(env, lfsck->li_lpf_obj);
1599                         lfsck->li_lpf_obj = NULL;
1600                         rc = -ENOTDIR;
1601                 }
1602         } else if (rc == 0) {
1603                 rc = lfsck_create_lpf(env, lfsck);
1604         }
1605
1606         if (child2 != NULL && !IS_ERR(child2))
1607                 lfsck_object_put(env, child2);
1608         if (child1 != NULL && !IS_ERR(child1))
1609                 lfsck_object_put(env, child1);
1610
1611         return rc;
1612 }
1613
1614 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1615 {
1616         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1617         struct seq_server_site *ss = lfsck_dev_site(lfsck);
1618         char *prefix;
1619         int rc = 0;
1620
1621         ENTRY;
1622         if (unlikely(ss == NULL))
1623                 RETURN(-ENXIO);
1624
1625         OBD_ALLOC_PTR(lfsck->li_seq);
1626         if (lfsck->li_seq == NULL)
1627                 RETURN(-ENOMEM);
1628
1629         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1630         if (prefix == NULL)
1631                 GOTO(out, rc = -ENOMEM);
1632
1633         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1634         seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1635                              ss->ss_server_seq);
1636         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1637
1638         if (fid_is_sane(&bk->lb_last_fid))
1639                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1640
1641         RETURN(0);
1642
1643 out:
1644         OBD_FREE_PTR(lfsck->li_seq);
1645         lfsck->li_seq = NULL;
1646
1647         return rc;
1648 }
1649
1650 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1651 {
1652         if (lfsck->li_seq != NULL) {
1653                 seq_client_fini(lfsck->li_seq);
1654                 OBD_FREE_PTR(lfsck->li_seq);
1655                 lfsck->li_seq = NULL;
1656         }
1657 }
1658
1659 void lfsck_instance_cleanup(const struct lu_env *env,
1660                             struct lfsck_instance *lfsck)
1661 {
1662         struct ptlrpc_thread *thread = &lfsck->li_thread;
1663         struct lfsck_component *com;
1664         struct lfsck_component *next;
1665         struct lfsck_lmv_unit *llu;
1666         struct lfsck_lmv_unit *llu_next;
1667         struct lfsck_lmv *llmv;
1668
1669         ENTRY;
1670         LASSERT(list_empty(&lfsck->li_link));
1671         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1672
1673         if (lfsck->li_obj_oit != NULL) {
1674                 lfsck_object_put(env, lfsck->li_obj_oit);
1675                 lfsck->li_obj_oit = NULL;
1676         }
1677
1678         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1679                 llmv = &llu->llu_lmv;
1680
1681                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1682                          "still in using: %u\n",
1683                          atomic_read(&llmv->ll_ref));
1684
1685                 lfsck_lmv_put(env, llmv);
1686         }
1687
1688         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1689                 lfsck_component_cleanup(env, com);
1690         }
1691
1692         LASSERT(list_empty(&lfsck->li_list_dir));
1693
1694         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1695                                  lc_link) {
1696                 lfsck_component_cleanup(env, com);
1697         }
1698
1699         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1700                 lfsck_component_cleanup(env, com);
1701         }
1702
1703         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1704         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1705
1706         if (lfsck->li_lfsck_dir != NULL) {
1707                 lfsck_object_put(env, lfsck->li_lfsck_dir);
1708                 lfsck->li_lfsck_dir = NULL;
1709         }
1710
1711         if (lfsck->li_bookmark_obj != NULL) {
1712                 lfsck_object_put(env, lfsck->li_bookmark_obj);
1713                 lfsck->li_bookmark_obj = NULL;
1714         }
1715
1716         if (lfsck->li_lpf_obj != NULL) {
1717                 lfsck_object_put(env, lfsck->li_lpf_obj);
1718                 lfsck->li_lpf_obj = NULL;
1719         }
1720
1721         if (lfsck->li_lpf_root_obj != NULL) {
1722                 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1723                 lfsck->li_lpf_root_obj = NULL;
1724         }
1725
1726         if (lfsck->li_los != NULL) {
1727                 local_oid_storage_fini(env, lfsck->li_los);
1728                 lfsck->li_los = NULL;
1729         }
1730
1731         lfsck_fid_fini(lfsck);
1732
1733         OBD_FREE_PTR(lfsck);
1734 }
1735
1736 static inline struct lfsck_instance *
1737 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1738 {
1739         struct lfsck_instance *lfsck;
1740
1741         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1742                 if (lfsck->li_bottom == key) {
1743                         if (ref)
1744                                 lfsck_instance_get(lfsck);
1745                         if (unlink)
1746                                 list_del_init(&lfsck->li_link);
1747
1748                         return lfsck;
1749                 }
1750         }
1751
1752         return NULL;
1753 }
1754
1755 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1756                                            bool unlink)
1757 {
1758         struct lfsck_instance *lfsck;
1759
1760         spin_lock(&lfsck_instance_lock);
1761         lfsck = __lfsck_instance_find(key, ref, unlink);
1762         spin_unlock(&lfsck_instance_lock);
1763
1764         return lfsck;
1765 }
1766
1767 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1768 {
1769         struct lfsck_instance *tmp;
1770
1771         spin_lock(&lfsck_instance_lock);
1772         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1773                 if (lfsck->li_bottom == tmp->li_bottom) {
1774                         spin_unlock(&lfsck_instance_lock);
1775                         return -EEXIST;
1776                 }
1777         }
1778
1779         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1780         spin_unlock(&lfsck_instance_lock);
1781         return 0;
1782 }
1783
1784 void lfsck_bits_dump(struct seq_file *m, int bits, const char *const names[],
1785                      const char *prefix)
1786 {
1787         int flag;
1788         int i;
1789         bool newline = (bits != 0 ? false : true);
1790
1791         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1792
1793         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
1794                 if (flag & bits) {
1795                         bits &= ~flag;
1796                         if (names[i] != NULL) {
1797                                 if (bits == 0)
1798                                         newline = true;
1799
1800                                 seq_printf(m, "%s%c", names[i],
1801                                            newline ? '\n' : ',');
1802                         }
1803                 }
1804         }
1805
1806         if (!newline)
1807                 seq_putc(m, '\n');
1808 }
1809
1810 void lfsck_time_dump(struct seq_file *m, time64_t time, const char *name)
1811 {
1812         if (time == 0) {
1813                 seq_printf(m, "%s_time: N/A\n", name);
1814                 seq_printf(m, "time_since_%s: N/A\n", name);
1815         } else {
1816                 seq_printf(m, "%s_time: %lld\n", name, time);
1817                 seq_printf(m, "time_since_%s: %lld seconds\n",
1818                            name, ktime_get_real_seconds() - time);
1819         }
1820 }
1821
1822 void lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1823                     const char *prefix)
1824 {
1825         if (fid_is_zero(&pos->lp_dir_parent)) {
1826                 if (pos->lp_oit_cookie == 0) {
1827                         seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
1828                         return;
1829                 }
1830                 seq_printf(m, "%s: %llu, N/A, N/A\n",
1831                            prefix, pos->lp_oit_cookie);
1832         } else {
1833                 seq_printf(m, "%s: %llu, "DFID", %#llx\n",
1834                            prefix, pos->lp_oit_cookie,
1835                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1836         }
1837 }
1838
1839 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1840                     struct lfsck_position *pos, bool init)
1841 {
1842         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1843
1844         if (unlikely(lfsck->li_di_oit == NULL)) {
1845                 memset(pos, 0, sizeof(*pos));
1846                 return;
1847         }
1848
1849         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1850         if (!lfsck->li_current_oit_processed && !init)
1851                 pos->lp_oit_cookie--;
1852
1853         if (unlikely(pos->lp_oit_cookie == 0))
1854                 pos->lp_oit_cookie = 1;
1855
1856         spin_lock(&lfsck->li_lock);
1857         if (lfsck->li_di_dir != NULL) {
1858                 struct dt_object *dto = lfsck->li_obj_dir;
1859
1860                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1861                                                         lfsck->li_di_dir);
1862
1863                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1864                         fid_zero(&pos->lp_dir_parent);
1865                         pos->lp_dir_cookie = 0;
1866                 } else {
1867                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1868                 }
1869         } else {
1870                 fid_zero(&pos->lp_dir_parent);
1871                 pos->lp_dir_cookie = 0;
1872         }
1873         spin_unlock(&lfsck->li_lock);
1874 }
1875
1876 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1877 {
1878         bool dirty = false;
1879
1880         if (limit != LFSCK_SPEED_NO_LIMIT) {
1881                 if (limit > cfs_time_seconds(1)) {
1882                         lfsck->li_sleep_rate = limit / cfs_time_seconds(1);
1883                         lfsck->li_sleep_jif = 1;
1884                 } else {
1885                         lfsck->li_sleep_rate = 1;
1886                         lfsck->li_sleep_jif = cfs_time_seconds(1) / limit;
1887                 }
1888         } else {
1889                 lfsck->li_sleep_jif = 0;
1890                 lfsck->li_sleep_rate = 0;
1891         }
1892
1893         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1894                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1895                 dirty = true;
1896         }
1897
1898         return dirty;
1899 }
1900
1901 void lfsck_control_speed(struct lfsck_instance *lfsck)
1902 {
1903         struct ptlrpc_thread *thread = &lfsck->li_thread;
1904
1905         if (lfsck->li_sleep_jif > 0 &&
1906             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1907                 wait_event_idle_timeout(thread->t_ctl_waitq,
1908                                         !thread_is_running(thread),
1909                                         lfsck->li_sleep_jif);
1910                 lfsck->li_new_scanned = 0;
1911         }
1912 }
1913
1914 void lfsck_control_speed_by_self(struct lfsck_component *com)
1915 {
1916         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1917         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1918
1919         if (lfsck->li_sleep_jif > 0 &&
1920             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1921                 wait_event_idle_timeout(thread->t_ctl_waitq,
1922                                         !thread_is_running(thread),
1923                                         lfsck->li_sleep_jif);
1924                 com->lc_new_scanned = 0;
1925         }
1926 }
1927
1928 static struct lfsck_thread_args *
1929 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1930                        struct lfsck_component *com,
1931                        struct lfsck_start_param *lsp)
1932 {
1933         struct lfsck_thread_args *lta;
1934         int rc;
1935
1936         OBD_ALLOC_PTR(lta);
1937         if (lta == NULL)
1938                 return ERR_PTR(-ENOMEM);
1939
1940         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1941         if (rc != 0) {
1942                 OBD_FREE_PTR(lta);
1943                 return ERR_PTR(rc);
1944         }
1945
1946         lta->lta_lfsck = lfsck_instance_get(lfsck);
1947         if (com != NULL)
1948                 lta->lta_com = lfsck_component_get(com);
1949
1950         lta->lta_lsp = lsp;
1951
1952         return lta;
1953 }
1954
1955 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1956 {
1957         if (lta->lta_com != NULL)
1958                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1959         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1960         lu_env_fini(&lta->lta_env);
1961         OBD_FREE_PTR(lta);
1962 }
1963
1964 struct lfsck_assistant_data *
1965 lfsck_assistant_data_init(const struct lfsck_assistant_operations *lao,
1966                           const char *name)
1967 {
1968         struct lfsck_assistant_data *lad;
1969
1970         OBD_ALLOC_PTR(lad);
1971         if (lad != NULL) {
1972                 lad->lad_bitmap = bitmap_zalloc(BITS_PER_LONG, GFP_KERNEL);
1973                 if (lad->lad_bitmap == NULL) {
1974                         OBD_FREE_PTR(lad);
1975                         return NULL;
1976                 }
1977                 lad->lad_bitmap_count = BITS_PER_LONG;
1978
1979                 INIT_LIST_HEAD(&lad->lad_req_list);
1980                 spin_lock_init(&lad->lad_lock);
1981                 INIT_LIST_HEAD(&lad->lad_ost_list);
1982                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1983                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1984                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1985                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1986                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1987                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1988                 lad->lad_ops = lao;
1989                 lad->lad_name = name;
1990         }
1991
1992         return lad;
1993 }
1994
1995 struct lfsck_assistant_object *
1996 lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
1997                             const struct lu_attr *attr, __u64 cookie,
1998                             bool is_dir)
1999 {
2000         struct lfsck_assistant_object *lso;
2001
2002         OBD_ALLOC_PTR(lso);
2003         if (lso == NULL)
2004                 return ERR_PTR(-ENOMEM);
2005
2006         lso->lso_fid = *fid;
2007         if (attr != NULL)
2008                 lso->lso_attr = *attr;
2009
2010         kref_init(&lso->lso_ref);
2011         lso->lso_oit_cookie = cookie;
2012         if (is_dir)
2013                 lso->lso_is_dir = 1;
2014
2015         return lso;
2016 }
2017
2018 struct dt_object *
2019 lfsck_assistant_object_load(const struct lu_env *env,
2020                             struct lfsck_instance *lfsck,
2021                             struct lfsck_assistant_object *lso)
2022 {
2023         struct dt_object *obj;
2024
2025         obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
2026         if (IS_ERR(obj))
2027                 return obj;
2028
2029         if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
2030                 lso->lso_dead = 1;
2031                 lfsck_object_put(env, obj);
2032
2033                 return ERR_PTR(-ENOENT);
2034         }
2035
2036         if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj, true))) {
2037                 lfsck_object_put(env, obj);
2038
2039                 return ERR_PTR(-ENOTDIR);
2040         }
2041
2042         return obj;
2043 }
2044
2045 /**
2046  * Generic LFSCK asynchronous communication interpretor function.
2047  * The LFSCK RPC reply for both the event notification and status
2048  * querying will be handled here.
2049  *
2050  * \param[in] env       pointer to the thread context
2051  * \param[in] req       pointer to the LFSCK request
2052  * \param[in] args      pointer to the lfsck_async_interpret_args
2053  * \param[in] rc        the result for handling the LFSCK request
2054  *
2055  * \retval              0 for success
2056  * \retval              negative error number on failure
2057  */
2058 int lfsck_async_interpret_common(const struct lu_env *env,
2059                                  struct ptlrpc_request *req,
2060                                  void *args, int rc)
2061 {
2062         struct lfsck_async_interpret_args *laia = args;
2063         struct lfsck_component *com = laia->laia_com;
2064         struct lfsck_assistant_data *lad = com->lc_data;
2065         struct lfsck_tgt_descs *ltds = laia->laia_ltds;
2066         struct lfsck_tgt_desc *ltd = laia->laia_ltd;
2067         struct lfsck_request *lr = laia->laia_lr;
2068
2069         LASSERT(com->lc_lfsck->li_master);
2070
2071         switch (lr->lr_event) {
2072         case LE_START:
2073                 if (unlikely(rc == -EINPROGRESS)) {
2074                         ltd->ltd_retry_start = 1;
2075                         break;
2076                 }
2077
2078                 if (rc != 0) {
2079                         CDEBUG(D_LFSCK,
2080                                "%s: fail to notify %s %x for %s start: rc = %d\n",
2081                                lfsck_lfsck2name(com->lc_lfsck),
2082                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2083                                ltd->ltd_index, lad->lad_name, rc);
2084
2085                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2086                                 struct lfsck_layout *lo = com->lc_file_ram;
2087
2088                                 if (lr->lr_flags & LEF_TO_OST)
2089                                         lfsck_lad_set_bitmap(env, com,
2090                                                              ltd->ltd_index);
2091                                 else
2092                                         lo->ll_flags |= LF_INCOMPLETE;
2093                         } else {
2094                                 struct lfsck_namespace *ns = com->lc_file_ram;
2095
2096                                 /* If some MDT does not join the namespace
2097                                  * LFSCK, then we cannot know whether there
2098                                  * is some name entry on such MDT that with
2099                                  * the referenced MDT-object on this MDT or
2100                                  * not. So the namespace LFSCK on this MDT
2101                                  * cannot handle orphan MDT-objects properly.
2102                                  * So we mark the LFSCK as LF_INCOMPLETE and
2103                                  * skip orphan MDT-objects handling.
2104                                  */
2105                                 ns->ln_flags |= LF_INCOMPLETE;
2106                         }
2107                         break;
2108                 }
2109
2110                 spin_lock(&ltds->ltd_lock);
2111                 if (ltd->ltd_dead) {
2112                         spin_unlock(&ltds->ltd_lock);
2113                         break;
2114                 }
2115
2116                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2117                         struct list_head *list;
2118                         struct list_head *phase_list;
2119
2120                         if (ltd->ltd_layout_done) {
2121                                 spin_unlock(&ltds->ltd_lock);
2122                                 break;
2123                         }
2124
2125                         if (lr->lr_flags & LEF_TO_OST) {
2126                                 list = &lad->lad_ost_list;
2127                                 phase_list = &lad->lad_ost_phase1_list;
2128                         } else {
2129                                 list = &lad->lad_mdt_list;
2130                                 phase_list = &lad->lad_mdt_phase1_list;
2131                         }
2132
2133                         if (list_empty(&ltd->ltd_layout_list))
2134                                 list_add_tail(&ltd->ltd_layout_list, list);
2135                         if (list_empty(&ltd->ltd_layout_phase_list))
2136                                 list_add_tail(&ltd->ltd_layout_phase_list,
2137                                               phase_list);
2138                 } else {
2139                         if (ltd->ltd_namespace_done) {
2140                                 spin_unlock(&ltds->ltd_lock);
2141                                 break;
2142                         }
2143
2144                         if (list_empty(&ltd->ltd_namespace_list))
2145                                 list_add_tail(&ltd->ltd_namespace_list,
2146                                               &lad->lad_mdt_list);
2147                         if (list_empty(&ltd->ltd_namespace_phase_list))
2148                                 list_add_tail(&ltd->ltd_namespace_phase_list,
2149                                               &lad->lad_mdt_phase1_list);
2150                 }
2151                 spin_unlock(&ltds->ltd_lock);
2152                 break;
2153         case LE_STOP:
2154         case LE_PHASE1_DONE:
2155         case LE_PHASE2_DONE:
2156         case LE_PEER_EXIT:
2157                 if (rc != 0 && rc != -EALREADY)
2158                         CDEBUG(D_LFSCK,
2159                                "%s: fail to notify %s %x for %s: event = %d, rc = %d\n",
2160                               lfsck_lfsck2name(com->lc_lfsck),
2161                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2162                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
2163                 break;
2164         case LE_QUERY: {
2165                 struct lfsck_reply *reply;
2166                 struct list_head *list;
2167                 struct list_head *phase_list;
2168
2169                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2170                         list = &ltd->ltd_layout_list;
2171                         phase_list = &ltd->ltd_layout_phase_list;
2172                 } else {
2173                         list = &ltd->ltd_namespace_list;
2174                         phase_list = &ltd->ltd_namespace_phase_list;
2175                 }
2176
2177                 if (rc != 0) {
2178                         if (lr->lr_flags & LEF_QUERY_ALL) {
2179                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2180                                 break;
2181                         }
2182
2183                         spin_lock(&ltds->ltd_lock);
2184                         list_del_init(phase_list);
2185                         list_del_init(list);
2186                         spin_unlock(&ltds->ltd_lock);
2187                         break;
2188                 }
2189
2190                 reply = req_capsule_server_get(&req->rq_pill,
2191                                                &RMF_LFSCK_REPLY);
2192                 if (reply == NULL) {
2193                         rc = -EPROTO;
2194                         CDEBUG(D_LFSCK,
2195                                "%s: invalid query reply for %s: rc = %d\n",
2196                                lfsck_lfsck2name(com->lc_lfsck),
2197                                lad->lad_name, rc);
2198
2199                         if (lr->lr_flags & LEF_QUERY_ALL) {
2200                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2201                                 break;
2202                         }
2203
2204                         spin_lock(&ltds->ltd_lock);
2205                         list_del_init(phase_list);
2206                         list_del_init(list);
2207                         spin_unlock(&ltds->ltd_lock);
2208                         break;
2209                 }
2210
2211                 if (lr->lr_flags & LEF_QUERY_ALL) {
2212                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2213                                 ltd->ltd_layout_status = reply->lr_status;
2214                                 ltd->ltd_layout_repaired = reply->lr_repaired;
2215                         } else {
2216                                 ltd->ltd_namespace_status = reply->lr_status;
2217                                 ltd->ltd_namespace_repaired =
2218                                                         reply->lr_repaired;
2219                         }
2220                         break;
2221                 }
2222
2223                 switch (reply->lr_status) {
2224                 case LS_SCANNING_PHASE1:
2225                         break;
2226                 case LS_SCANNING_PHASE2:
2227                         spin_lock(&ltds->ltd_lock);
2228                         list_del_init(phase_list);
2229                         if (ltd->ltd_dead) {
2230                                 spin_unlock(&ltds->ltd_lock);
2231                                 break;
2232                         }
2233
2234                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2235                                 if (ltd->ltd_layout_done) {
2236                                         spin_unlock(&ltds->ltd_lock);
2237                                         break;
2238                                 }
2239
2240                                 if (lr->lr_flags & LEF_TO_OST)
2241                                         list_add_tail(phase_list,
2242                                                 &lad->lad_ost_phase2_list);
2243                                 else
2244                                         list_add_tail(phase_list,
2245                                                 &lad->lad_mdt_phase2_list);
2246                         } else {
2247                                 if (ltd->ltd_namespace_done) {
2248                                         spin_unlock(&ltds->ltd_lock);
2249                                         break;
2250                                 }
2251
2252                                 list_add_tail(phase_list,
2253                                               &lad->lad_mdt_phase2_list);
2254                         }
2255                         spin_unlock(&ltds->ltd_lock);
2256                         break;
2257                 default:
2258                         spin_lock(&ltds->ltd_lock);
2259                         list_del_init(phase_list);
2260                         list_del_init(list);
2261                         spin_unlock(&ltds->ltd_lock);
2262                         break;
2263                 }
2264                 break;
2265         }
2266         default:
2267                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2268                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2269                 break;
2270         }
2271
2272         if (!laia->laia_shared) {
2273                 lfsck_tgt_put(ltd);
2274                 lfsck_component_put(env, com);
2275         }
2276
2277         return 0;
2278 }
2279
2280 static void lfsck_interpret(const struct lu_env *env,
2281                             struct lfsck_instance *lfsck,
2282                             struct ptlrpc_request *req, void *args, int result)
2283 {
2284         struct lfsck_async_interpret_args *laia = args;
2285         struct lfsck_component *com;
2286
2287         LASSERT(laia->laia_com == NULL);
2288         LASSERT(laia->laia_shared);
2289
2290         spin_lock(&lfsck->li_lock);
2291         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2292                 laia->laia_com = com;
2293                 lfsck_async_interpret_common(env, req, laia, result);
2294         }
2295
2296         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2297                 laia->laia_com = com;
2298                 lfsck_async_interpret_common(env, req, laia, result);
2299         }
2300         spin_unlock(&lfsck->li_lock);
2301 }
2302
2303 static int lfsck_stop_notify(const struct lu_env *env,
2304                              struct lfsck_instance *lfsck,
2305                              struct lfsck_tgt_descs *ltds,
2306                              struct lfsck_tgt_desc *ltd, __u16 type)
2307 {
2308         struct lfsck_component *com;
2309         int rc = 0;
2310
2311         ENTRY;
2312         LASSERT(lfsck->li_master);
2313
2314         spin_lock(&lfsck->li_lock);
2315         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2316         if (com == NULL)
2317                 com = __lfsck_component_find(lfsck, type,
2318                                              &lfsck->li_list_double_scan);
2319         if (com != NULL)
2320                 lfsck_component_get(com);
2321         spin_unlock(&lfsck->li_lock);
2322
2323         if (com != NULL) {
2324                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2325                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2326                 struct lfsck_request              *lr    = &info->lti_lr;
2327                 struct lfsck_assistant_data       *lad   = com->lc_data;
2328                 struct list_head                  *list;
2329                 struct list_head                  *phase_list;
2330                 struct ptlrpc_request_set         *set;
2331
2332                 set = ptlrpc_prep_set();
2333                 if (set == NULL) {
2334                         lfsck_component_put(env, com);
2335
2336                         RETURN(-ENOMEM);
2337                 }
2338
2339                 if (type == LFSCK_TYPE_LAYOUT) {
2340                         list = &ltd->ltd_layout_list;
2341                         phase_list = &ltd->ltd_layout_phase_list;
2342                 } else {
2343                         list = &ltd->ltd_namespace_list;
2344                         phase_list = &ltd->ltd_namespace_phase_list;
2345                 }
2346
2347                 spin_lock(&ltds->ltd_lock);
2348                 if (list_empty(list)) {
2349                         LASSERT(list_empty(phase_list));
2350                         spin_unlock(&ltds->ltd_lock);
2351                         ptlrpc_set_destroy(set);
2352
2353                         RETURN(0);
2354                 }
2355
2356                 list_del_init(phase_list);
2357                 list_del_init(list);
2358                 spin_unlock(&ltds->ltd_lock);
2359
2360                 memset(lr, 0, sizeof(*lr));
2361                 lr->lr_index = lfsck_dev_idx(lfsck);
2362                 lr->lr_event = LE_PEER_EXIT;
2363                 lr->lr_active = type;
2364                 lr->lr_status = LS_CO_PAUSED;
2365                 if (ltds == &lfsck->li_ost_descs)
2366                         lr->lr_flags = LEF_TO_OST;
2367
2368                 memset(laia, 0, sizeof(*laia));
2369                 laia->laia_com = com;
2370                 laia->laia_ltds = ltds;
2371                 atomic_inc(&ltd->ltd_ref);
2372                 laia->laia_ltd = ltd;
2373                 laia->laia_lr = lr;
2374
2375                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2376                                          lfsck_async_interpret_common,
2377                                          laia, LFSCK_NOTIFY);
2378                 if (rc != 0) {
2379                         CDEBUG(D_LFSCK,
2380                                "%s: fail to notify %s %x for co-stop for %s: rc = %d\n",
2381                                lfsck_lfsck2name(lfsck),
2382                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2383                                ltd->ltd_index, lad->lad_name, rc);
2384                         lfsck_tgt_put(ltd);
2385                 } else {
2386                         rc = ptlrpc_set_wait(env, set);
2387                 }
2388
2389                 ptlrpc_set_destroy(set);
2390                 lfsck_component_put(env, com);
2391         }
2392
2393         RETURN(rc);
2394 }
2395
2396 static int lfsck_async_interpret(const struct lu_env *env,
2397                                  struct ptlrpc_request *req,
2398                                  void *args, int rc)
2399 {
2400         struct lfsck_async_interpret_args *laia = args;
2401         struct lfsck_instance *lfsck;
2402
2403         lfsck = container_of(laia->laia_ltds, struct lfsck_instance,
2404                              li_mdt_descs);
2405         lfsck_interpret(env, lfsck, req, laia, rc);
2406         lfsck_tgt_put(laia->laia_ltd);
2407         if (rc != 0 && laia->laia_result != -EALREADY)
2408                 laia->laia_result = rc;
2409
2410         return 0;
2411 }
2412
2413 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2414                         struct lfsck_request *lr,
2415                         struct ptlrpc_request_set *set,
2416                         ptlrpc_interpterer_t interpreter,
2417                         void *args, int request)
2418 {
2419         struct lfsck_async_interpret_args *laia;
2420         struct ptlrpc_request *req;
2421         struct lfsck_request *tmp;
2422         struct req_format *format;
2423         int rc;
2424
2425         switch (request) {
2426         case LFSCK_NOTIFY:
2427                 format = &RQF_LFSCK_NOTIFY;
2428                 break;
2429         case LFSCK_QUERY:
2430                 format = &RQF_LFSCK_QUERY;
2431                 break;
2432         default:
2433                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2434                        exp->exp_obd->obd_name, request, -EINVAL);
2435                 return -EINVAL;
2436         }
2437
2438         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2439         if (req == NULL)
2440                 return -ENOMEM;
2441
2442         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2443         if (rc != 0) {
2444                 ptlrpc_request_free(req);
2445
2446                 return rc;
2447         }
2448
2449         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2450         *tmp = *lr;
2451         ptlrpc_request_set_replen(req);
2452
2453         laia = ptlrpc_req_async_args(laia, req);
2454         *laia = *(struct lfsck_async_interpret_args *)args;
2455         if (laia->laia_com != NULL)
2456                 lfsck_component_get(laia->laia_com);
2457         req->rq_interpret_reply = interpreter;
2458         req->rq_allow_intr = 1;
2459         req->rq_no_delay = 1;
2460         ptlrpc_set_add_req(set, req);
2461
2462         return 0;
2463 }
2464
2465 int lfsck_query_all(const struct lu_env *env, struct lfsck_component *com)
2466 {
2467         struct lfsck_thread_info *info = lfsck_env_info(env);
2468         struct lfsck_request *lr = &info->lti_lr;
2469         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2470         struct lfsck_instance *lfsck = com->lc_lfsck;
2471         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2472         struct lfsck_tgt_desc *ltd;
2473         struct ptlrpc_request_set *set;
2474         int idx;
2475         int rc;
2476
2477         ENTRY;
2478         memset(lr, 0, sizeof(*lr));
2479         lr->lr_event = LE_QUERY;
2480         lr->lr_active = com->lc_type;
2481         lr->lr_flags = LEF_QUERY_ALL;
2482
2483         memset(laia, 0, sizeof(*laia));
2484         laia->laia_com = com;
2485         laia->laia_lr = lr;
2486
2487         set = ptlrpc_prep_set();
2488         if (set == NULL)
2489                 RETURN(-ENOMEM);
2490
2491 again:
2492         laia->laia_ltds = ltds;
2493         down_read(&ltds->ltd_rw_sem);
2494         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
2495                 ltd = lfsck_tgt_get(ltds, idx);
2496                 LASSERT(ltd != NULL);
2497
2498                 laia->laia_ltd = ltd;
2499                 up_read(&ltds->ltd_rw_sem);
2500                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2501                                          lfsck_async_interpret_common,
2502                                          laia, LFSCK_QUERY);
2503                 if (rc != 0) {
2504                         struct lfsck_assistant_data *lad = com->lc_data;
2505
2506                         CDEBUG(D_LFSCK,
2507                                "%s: Fail to query %s %x for stat %s: rc = %d\n",
2508                                lfsck_lfsck2name(lfsck),
2509                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2510                                ltd->ltd_index, lad->lad_name, rc);
2511                         lfsck_reset_ltd_status(ltd, com->lc_type);
2512                         lfsck_tgt_put(ltd);
2513                 }
2514                 down_read(&ltds->ltd_rw_sem);
2515         }
2516         up_read(&ltds->ltd_rw_sem);
2517
2518         if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST)) {
2519                 ltds = &lfsck->li_ost_descs;
2520                 lr->lr_flags |= LEF_TO_OST;
2521                 goto again;
2522         }
2523
2524         rc = ptlrpc_set_wait(env, set);
2525         ptlrpc_set_destroy(set);
2526
2527         RETURN(rc);
2528 }
2529
2530 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2531                           struct lfsck_start_param *lsp)
2532 {
2533         struct lfsck_instance *lfsck = com->lc_lfsck;
2534         struct lfsck_assistant_data *lad = com->lc_data;
2535         struct ptlrpc_thread *mthread = &lfsck->li_thread;
2536         struct ptlrpc_thread *athread = &lad->lad_thread;
2537         struct lfsck_thread_args *lta;
2538         struct task_struct *task;
2539         int rc;
2540
2541         ENTRY;
2542         lad->lad_assistant_status = 0;
2543         lad->lad_post_result = 0;
2544         lad->lad_flags = 0;
2545         lad->lad_advance_lock = false;
2546         thread_set_flags(athread, 0);
2547
2548         lta = lfsck_thread_args_init(lfsck, com, lsp);
2549         if (IS_ERR(lta))
2550                 RETURN(PTR_ERR(lta));
2551
2552         task = kthread_run(lfsck_assistant_engine, lta, "%s", lad->lad_name);
2553         if (IS_ERR(task)) {
2554                 rc = PTR_ERR(task);
2555                 CERROR("%s: cannot start LFSCK assistant thread for %s: rc = %d\n",
2556                        lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2557                 lfsck_thread_args_fini(lta);
2558         } else {
2559                 wait_event_idle(mthread->t_ctl_waitq,
2560                                 thread_is_running(athread) ||
2561                                 thread_is_stopped(athread) ||
2562                                 !thread_is_starting(mthread));
2563                 if (unlikely(!thread_is_starting(mthread)))
2564                         /* stopped by race */
2565                         rc = -ESRCH;
2566                 else if (unlikely(!thread_is_running(athread)))
2567                         rc = lad->lad_assistant_status;
2568                 else
2569                         rc = 0;
2570         }
2571
2572         RETURN(rc);
2573 }
2574
2575 int lfsck_checkpoint_generic(const struct lu_env *env,
2576                              struct lfsck_component *com)
2577 {
2578         struct lfsck_assistant_data *lad = com->lc_data;
2579         struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2580         struct ptlrpc_thread *athread = &lad->lad_thread;
2581
2582         wait_event_idle(mthread->t_ctl_waitq,
2583                         list_empty(&lad->lad_req_list) ||
2584                         !thread_is_running(mthread) ||
2585                         thread_is_stopped(athread));
2586
2587         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2588                 return LFSCK_CHECKPOINT_SKIP;
2589
2590         return 0;
2591 }
2592
2593 void lfsck_post_generic(const struct lu_env *env,
2594                         struct lfsck_component *com, int *result)
2595 {
2596         struct lfsck_assistant_data *lad = com->lc_data;
2597         struct ptlrpc_thread *athread = &lad->lad_thread;
2598         struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2599
2600         lad->lad_post_result = *result;
2601         if (*result <= 0)
2602                 set_bit(LAD_EXIT, &lad->lad_flags);
2603         set_bit(LAD_TO_POST, &lad->lad_flags);
2604
2605         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s post, rc = %d\n",
2606                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2607
2608         wake_up(&athread->t_ctl_waitq);
2609         wait_event_idle(mthread->t_ctl_waitq,
2610                         (*result > 0 && list_empty(&lad->lad_req_list)) ||
2611                         thread_is_stopped(athread));
2612
2613         if (lad->lad_assistant_status < 0)
2614                 *result = lad->lad_assistant_status;
2615
2616         CDEBUG(D_LFSCK, "%s: the assistant has done %s post, rc = %d\n",
2617                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2618 }
2619
2620 int lfsck_double_scan_generic(const struct lu_env *env,
2621                               struct lfsck_component *com, int status)
2622 {
2623         struct lfsck_assistant_data *lad = com->lc_data;
2624         struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2625         struct ptlrpc_thread *athread = &lad->lad_thread;
2626
2627         if (status != LS_SCANNING_PHASE2)
2628                 set_bit(LAD_EXIT, &lad->lad_flags);
2629         else
2630                 set_bit(LAD_TO_DOUBLE_SCAN, &lad->lad_flags);
2631
2632         CDEBUG(D_LFSCK,
2633                "%s: waiting for assistant to do %s double_scan, status %d\n",
2634                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, status);
2635
2636         wake_up(&athread->t_ctl_waitq);
2637         wait_event_idle(mthread->t_ctl_waitq,
2638                         test_bit(LAD_IN_DOUBLE_SCAN, &lad->lad_flags) ||
2639                         thread_is_stopped(athread));
2640
2641         CDEBUG(D_LFSCK,
2642                "%s: the assistant has done %s double_scan, status %d\n",
2643                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name,
2644                lad->lad_assistant_status);
2645
2646         if (lad->lad_assistant_status < 0)
2647                 return lad->lad_assistant_status;
2648
2649         return 0;
2650 }
2651
2652 void lfsck_quit_generic(const struct lu_env *env,
2653                         struct lfsck_component *com)
2654 {
2655         struct lfsck_assistant_data *lad = com->lc_data;
2656         struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2657         struct ptlrpc_thread *athread = &lad->lad_thread;
2658
2659         set_bit(LAD_EXIT, &lad->lad_flags);
2660         wake_up(&athread->t_ctl_waitq);
2661         wait_event_idle(mthread->t_ctl_waitq,
2662                         thread_is_init(athread) ||
2663                         thread_is_stopped(athread));
2664 }
2665
2666 int lfsck_load_one_trace_file(const struct lu_env *env,
2667                               struct lfsck_component *com,
2668                               struct dt_object *parent,
2669                               struct dt_object **child,
2670                               const struct dt_index_features *ft,
2671                               const char *name, bool reset)
2672 {
2673         struct lfsck_instance *lfsck = com->lc_lfsck;
2674         struct dt_object *obj;
2675         int rc;
2676
2677         ENTRY;
2678         if (*child != NULL) {
2679                 struct dt_it *it;
2680                 const struct dt_it_ops *iops;
2681                 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid3;
2682
2683                 if (!reset)
2684                         RETURN(0);
2685
2686                 obj = *child;
2687                 rc = obj->do_ops->do_index_try(env, obj, ft);
2688                 if (rc)
2689                         /* unlink by force */
2690                         goto unlink;
2691
2692                 iops = &obj->do_index_ops->dio_it;
2693                 it = iops->init(env, obj, 0);
2694                 if (IS_ERR(it))
2695                         /* unlink by force */
2696                         goto unlink;
2697
2698                 fid_zero(fid);
2699                 rc = iops->get(env, it, (const struct dt_key *)fid);
2700                 if (rc >= 0) {
2701                         rc = iops->next(env, it);
2702                         iops->put(env, it);
2703                 }
2704                 iops->fini(env, it);
2705                 if (rc > 0)
2706                         /* "rc > 0" means the index file is empty. */
2707                         RETURN(0);
2708
2709 unlink:
2710                 /* The old index is not empty, remove it firstly. */
2711                 rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
2712                 CDEBUG_LIMIT(rc ? D_ERROR : D_LFSCK,
2713                              "%s: unlink lfsck sub trace file %s: rc = %d\n",
2714                              lfsck_lfsck2name(com->lc_lfsck), name, rc);
2715                 if (rc)
2716                         RETURN(rc);
2717
2718                 if (*child) {
2719                         lfsck_object_put(env, *child);
2720                         *child = NULL;
2721                 }
2722         } else if (reset) {
2723                 goto unlink;
2724         }
2725
2726         obj = local_index_find_or_create(env, lfsck->li_los, parent, name,
2727                                          S_IFREG | S_IRUGO | S_IWUSR, ft);
2728         if (IS_ERR(obj))
2729                 RETURN(PTR_ERR(obj));
2730
2731         rc = obj->do_ops->do_index_try(env, obj, ft);
2732         if (rc) {
2733                 lfsck_object_put(env, obj);
2734                 CDEBUG(D_LFSCK,
2735                        "%s: LFSCK fail to load sub trace file %s: rc = %d\n",
2736                        lfsck_lfsck2name(com->lc_lfsck), name, rc);
2737         } else {
2738                 *child = obj;
2739         }
2740
2741         RETURN(rc);
2742 }
2743
2744 int lfsck_load_sub_trace_files(const struct lu_env *env,
2745                                struct lfsck_component *com,
2746                                const struct dt_index_features *ft,
2747                                const char *prefix, bool reset)
2748 {
2749         char *name = lfsck_env_info(env)->lti_key;
2750         struct lfsck_sub_trace_obj *lsto;
2751         int rc;
2752         int i;
2753
2754         for (i = 0, rc = 0, lsto = &com->lc_sub_trace_objs[0];
2755              i < LFSCK_STF_COUNT && rc == 0; i++, lsto++) {
2756                 snprintf(name, NAME_MAX, "%s_%02d", prefix, i);
2757                 rc = lfsck_load_one_trace_file(env, com,
2758                                 com->lc_lfsck->li_lfsck_dir,
2759                                 &lsto->lsto_obj, ft, name, reset);
2760         }
2761
2762         return rc;
2763 }
2764
2765 /* external interfaces */
2766 int lfsck_get_speed(char *buf, struct dt_device *key)
2767 {
2768         struct lu_env env;
2769         struct lfsck_instance *lfsck;
2770         int rc;
2771
2772         ENTRY;
2773         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2774         if (rc != 0)
2775                 RETURN(rc);
2776
2777         lfsck = lfsck_instance_find(key, true, false);
2778         if (lfsck && buf) {
2779                 rc = sprintf(buf, "%u\n",
2780                              lfsck->li_bookmark_ram.lb_speed_limit);
2781                 lfsck_instance_put(&env, lfsck);
2782         } else {
2783                 rc = -ENXIO;
2784         }
2785
2786         lu_env_fini(&env);
2787
2788         RETURN(rc);
2789 }
2790 EXPORT_SYMBOL(lfsck_get_speed);
2791
2792 int lfsck_set_speed(struct dt_device *key, __u32 val)
2793 {
2794         struct lu_env env;
2795         struct lfsck_instance *lfsck;
2796         int rc;
2797
2798         ENTRY;
2799         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2800         if (rc != 0)
2801                 RETURN(rc);
2802
2803         lfsck = lfsck_instance_find(key, true, false);
2804         if (likely(lfsck != NULL)) {
2805                 mutex_lock(&lfsck->li_mutex);
2806                 if (__lfsck_set_speed(lfsck, val))
2807                         rc = lfsck_bookmark_store(&env, lfsck);
2808                 mutex_unlock(&lfsck->li_mutex);
2809                 lfsck_instance_put(&env, lfsck);
2810         } else {
2811                 rc = -ENXIO;
2812         }
2813
2814         lu_env_fini(&env);
2815
2816         RETURN(rc);
2817 }
2818 EXPORT_SYMBOL(lfsck_set_speed);
2819
2820 int lfsck_get_windows(char *buf, struct dt_device *key)
2821 {
2822         struct lu_env env;
2823         struct lfsck_instance *lfsck;
2824         int rc;
2825
2826         ENTRY;
2827         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2828         if (rc != 0)
2829                 RETURN(rc);
2830
2831         lfsck = lfsck_instance_find(key, true, false);
2832         if (likely(lfsck != NULL)) {
2833                 rc = sprintf(buf, "%u\n",
2834                              lfsck->li_bookmark_ram.lb_async_windows);
2835                 lfsck_instance_put(&env, lfsck);
2836         } else {
2837                 rc = -ENXIO;
2838         }
2839
2840         lu_env_fini(&env);
2841
2842         RETURN(rc);
2843 }
2844 EXPORT_SYMBOL(lfsck_get_windows);
2845
2846 int lfsck_set_windows(struct dt_device *key, unsigned int val)
2847 {
2848         struct lu_env env;
2849         struct lfsck_instance *lfsck;
2850         int rc;
2851
2852         ENTRY;
2853         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2854         if (rc != 0)
2855                 RETURN(rc);
2856
2857         lfsck = lfsck_instance_find(key, true, false);
2858         if (likely(lfsck != NULL)) {
2859                 if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) {
2860                         CWARN("%s: invalid async windows size that may cause memory issues. The valid range is [1 - %u].\n",
2861                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2862                         rc = -EINVAL;
2863                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2864                         mutex_lock(&lfsck->li_mutex);
2865                         lfsck->li_bookmark_ram.lb_async_windows = val;
2866                         rc = lfsck_bookmark_store(&env, lfsck);
2867                         mutex_unlock(&lfsck->li_mutex);
2868                 }
2869                 lfsck_instance_put(&env, lfsck);
2870         } else {
2871                 rc = -ENXIO;
2872         }
2873
2874         lu_env_fini(&env);
2875
2876         RETURN(rc);
2877 }
2878 EXPORT_SYMBOL(lfsck_set_windows);
2879
2880 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2881 {
2882         struct lu_env env;
2883         struct lfsck_instance *lfsck;
2884         struct lfsck_component *com;
2885         int rc;
2886
2887         ENTRY;
2888         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2889         if (rc != 0)
2890                 RETURN(rc);
2891
2892         lfsck = lfsck_instance_find(key, true, false);
2893         if (likely(lfsck != NULL)) {
2894                 com = lfsck_component_find(lfsck, type);
2895                 if (likely(com != NULL)) {
2896                         com->lc_ops->lfsck_dump(&env, com, m);
2897                         lfsck_component_put(&env, com);
2898                 } else {
2899                         rc = -ENOTSUPP;
2900                 }
2901
2902                 lfsck_instance_put(&env, lfsck);
2903         } else {
2904                 rc = -ENXIO;
2905         }
2906
2907         lu_env_fini(&env);
2908
2909         RETURN(rc);
2910 }
2911 EXPORT_SYMBOL(lfsck_dump);
2912
2913 static int lfsck_stop_all(const struct lu_env *env,
2914                           struct lfsck_instance *lfsck,
2915                           struct lfsck_stop *stop)
2916 {
2917         struct lfsck_thread_info *info = lfsck_env_info(env);
2918         struct lfsck_request *lr = &info->lti_lr;
2919         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2920         struct ptlrpc_request_set *set;
2921         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2922         struct lfsck_tgt_desc *ltd;
2923         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2924         int idx;
2925         int rc = 0;
2926         int rc1 = 0;
2927
2928         ENTRY;
2929         LASSERT(stop->ls_flags & LPF_BROADCAST);
2930
2931         set = ptlrpc_prep_set();
2932         if (unlikely(set == NULL))
2933                 RETURN(-ENOMEM);
2934
2935         memset(lr, 0, sizeof(*lr));
2936         lr->lr_event = LE_STOP;
2937         lr->lr_index = lfsck_dev_idx(lfsck);
2938         lr->lr_status = stop->ls_status;
2939         lr->lr_version = bk->lb_version;
2940         lr->lr_active = LFSCK_TYPES_ALL;
2941         lr->lr_param = stop->ls_flags;
2942
2943         memset(laia, 0, sizeof(*laia));
2944         laia->laia_ltds = ltds;
2945         laia->laia_lr = lr;
2946         laia->laia_shared = 1;
2947
2948         down_read(&ltds->ltd_rw_sem);
2949         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
2950                 ltd = lfsck_tgt_get(ltds, idx);
2951                 LASSERT(ltd != NULL);
2952
2953                 laia->laia_ltd = ltd;
2954                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2955                                          lfsck_async_interpret, laia,
2956                                          LFSCK_NOTIFY);
2957                 if (rc != 0) {
2958                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2959                         lfsck_tgt_put(ltd);
2960                         CERROR("%s: cannot notify MDT %x for LFSCK stop: rc = %d\n",
2961                                lfsck_lfsck2name(lfsck), idx, rc);
2962                         rc1 = rc;
2963                 }
2964         }
2965         up_read(&ltds->ltd_rw_sem);
2966
2967         rc = ptlrpc_set_wait(env, set);
2968         ptlrpc_set_destroy(set);
2969
2970         if (rc == 0)
2971                 rc = laia->laia_result;
2972
2973         if (rc == -EALREADY)
2974                 rc = 0;
2975
2976         if (rc != 0)
2977                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2978                        lfsck_lfsck2name(lfsck), rc);
2979
2980         RETURN(rc != 0 ? rc : rc1);
2981 }
2982
2983 static int lfsck_start_all(const struct lu_env *env,
2984                            struct lfsck_instance *lfsck,
2985                            struct lfsck_start *start)
2986 {
2987         struct lfsck_thread_info *info = lfsck_env_info(env);
2988         struct lfsck_request *lr = &info->lti_lr;
2989         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2990         struct ptlrpc_request_set *set;
2991         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2992         struct lfsck_tgt_desc *ltd;
2993         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2994         int idx;
2995         int rc = 0;
2996         bool retry = false;
2997
2998         ENTRY;
2999         LASSERT(start->ls_flags & LPF_BROADCAST);
3000
3001         memset(lr, 0, sizeof(*lr));
3002         lr->lr_event = LE_START;
3003         lr->lr_index = lfsck_dev_idx(lfsck);
3004         lr->lr_speed = bk->lb_speed_limit;
3005         lr->lr_version = bk->lb_version;
3006         lr->lr_active = start->ls_active;
3007         lr->lr_param = start->ls_flags;
3008         lr->lr_async_windows = bk->lb_async_windows;
3009         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
3010                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
3011                        LSV_CREATE_MDTOBJ;
3012
3013         memset(laia, 0, sizeof(*laia));
3014         laia->laia_ltds = ltds;
3015         laia->laia_lr = lr;
3016         laia->laia_shared = 1;
3017
3018 again:
3019         set = ptlrpc_prep_set();
3020         if (unlikely(!set))
3021                 RETURN(-ENOMEM);
3022
3023         down_read(&ltds->ltd_rw_sem);
3024         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3025                 ltd = lfsck_tgt_get(ltds, idx);
3026                 LASSERT(ltd != NULL);
3027
3028                 if (retry && !ltd->ltd_retry_start) {
3029                         lfsck_tgt_put(ltd);
3030                         continue;
3031                 }
3032
3033                 laia->laia_ltd = ltd;
3034                 ltd->ltd_retry_start = 0;
3035                 ltd->ltd_layout_done = 0;
3036                 ltd->ltd_namespace_done = 0;
3037                 ltd->ltd_synced_failures = 0;
3038                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
3039                                          lfsck_async_interpret, laia,
3040                                          LFSCK_NOTIFY);
3041                 if (rc != 0) {
3042                         lfsck_interpret(env, lfsck, NULL, laia, rc);
3043                         lfsck_tgt_put(ltd);
3044                         CERROR("%s: cannot notify MDT %x for LFSCK start, failout: rc = %d\n",
3045                                lfsck_lfsck2name(lfsck), idx, rc);
3046                         break;
3047                 }
3048         }
3049         up_read(&ltds->ltd_rw_sem);
3050
3051         if (rc != 0) {
3052                 ptlrpc_set_destroy(set);
3053
3054                 RETURN(rc);
3055         }
3056
3057         rc = ptlrpc_set_wait(env, set);
3058         ptlrpc_set_destroy(set);
3059
3060         if (rc == 0)
3061                 rc = laia->laia_result;
3062
3063         if (unlikely(rc == -EINPROGRESS)) {
3064                 retry = true;
3065                 schedule_timeout_interruptible(cfs_time_seconds(1));
3066                 set_current_state(TASK_RUNNING);
3067                 if (!signal_pending(current) &&
3068                     thread_is_running(&lfsck->li_thread))
3069                         goto again;
3070
3071                 rc = -EINTR;
3072         }
3073
3074         if (rc != 0) {
3075                 struct lfsck_stop *stop = &info->lti_stop;
3076
3077                 CERROR("%s: cannot start LFSCK on some MDTs, stop all: rc = %d\n",
3078                        lfsck_lfsck2name(lfsck), rc);
3079                 if (rc != -EALREADY) {
3080                         stop->ls_status = LS_FAILED;
3081                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
3082                         lfsck_stop_all(env, lfsck, stop);
3083                 }
3084         }
3085
3086         RETURN(rc);
3087 }
3088
3089 int lfsck_start(const struct lu_env *env, struct dt_device *key,
3090                 struct lfsck_start_param *lsp)
3091 {
3092         struct lfsck_start *start = lsp->lsp_start;
3093         struct lfsck_instance *lfsck;
3094         struct lfsck_bookmark *bk;
3095         struct ptlrpc_thread *thread;
3096         struct lfsck_component *com;
3097         struct lfsck_thread_args *lta;
3098         struct task_struct *task;
3099         struct lfsck_tgt_descs *ltds;
3100         struct lfsck_tgt_desc *ltd;
3101         int idx;
3102         int rc = 0;
3103         __u16 valid  = 0;
3104         __u16 flags  = 0;
3105         __u16 type   = 1;
3106
3107         ENTRY;
3108         if (key->dd_rdonly)
3109                 RETURN(-EROFS);
3110
3111         lfsck = lfsck_instance_find(key, true, false);
3112         if (unlikely(lfsck == NULL))
3113                 RETURN(-ENXIO);
3114
3115         if (unlikely(lfsck->li_stopping))
3116                 GOTO(put, rc = -ENXIO);
3117
3118         /* System is not ready, try again later. */
3119         if (unlikely(lfsck->li_namespace == NULL ||
3120                      lfsck_dev_site(lfsck)->ss_server_fld == NULL))
3121                 GOTO(put, rc = -EINPROGRESS);
3122
3123         /* start == NULL means auto trigger paused LFSCK. */
3124         if (!start) {
3125                 if (list_empty(&lfsck->li_list_scan) ||
3126                     CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO))
3127                         GOTO(put, rc = 0);
3128         } else if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3129                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
3130                        lfsck_lfsck2name(lfsck));
3131
3132                 GOTO(put, rc = -EPERM);
3133         }
3134
3135         bk = &lfsck->li_bookmark_ram;
3136         thread = &lfsck->li_thread;
3137         mutex_lock(&lfsck->li_mutex);
3138         spin_lock(&lfsck->li_lock);
3139         if (unlikely(thread_is_stopping(thread))) {
3140                 /* Someone is stopping the LFSCK. */
3141                 spin_unlock(&lfsck->li_lock);
3142                 GOTO(out, rc = -EBUSY);
3143         }
3144
3145         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
3146                 rc = -EALREADY;
3147                 if (unlikely(start == NULL)) {
3148                         spin_unlock(&lfsck->li_lock);
3149                         GOTO(out, rc);
3150                 }
3151
3152                 while (start->ls_active != 0) {
3153                         if (!(type & start->ls_active)) {
3154                                 type <<= 1;
3155                                 continue;
3156                         }
3157
3158                         com = __lfsck_component_find(lfsck, type,
3159                                                      &lfsck->li_list_scan);
3160                         if (com == NULL)
3161                                 com = __lfsck_component_find(lfsck, type,
3162                                                 &lfsck->li_list_double_scan);
3163                         if (com == NULL) {
3164                                 rc = -EOPNOTSUPP;
3165                                 break;
3166                         }
3167
3168                         if (com->lc_ops->lfsck_join != NULL) {
3169                                 rc = com->lc_ops->lfsck_join(env, com, lsp);
3170                                 if (rc != 0 && rc != -EALREADY)
3171                                         break;
3172                         }
3173                         start->ls_active &= ~type;
3174                         type <<= 1;
3175                 }
3176                 spin_unlock(&lfsck->li_lock);
3177                 GOTO(out, rc);
3178         }
3179         spin_unlock(&lfsck->li_lock);
3180
3181         lfsck->li_status = 0;
3182         lfsck->li_oit_over = 0;
3183         lfsck->li_start_unplug = 0;
3184         lfsck->li_drop_dryrun = 0;
3185         lfsck->li_new_scanned = 0;
3186
3187         /* For auto trigger. */
3188         if (start == NULL)
3189                 goto trigger;
3190
3191         start->ls_version = bk->lb_version;
3192
3193         if (start->ls_active != 0) {
3194                 struct lfsck_component *next;
3195
3196                 if (start->ls_active == LFSCK_TYPES_ALL)
3197                         start->ls_active = LFSCK_TYPES_SUPPORTED;
3198
3199                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
3200                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
3201                         GOTO(out, rc = -ENOTSUPP);
3202                 }
3203
3204                 list_for_each_entry_safe(com, next,
3205                                          &lfsck->li_list_scan, lc_link) {
3206                         if (!(com->lc_type & start->ls_active)) {
3207                                 rc = com->lc_ops->lfsck_post(env, com, 0,
3208                                                              false);
3209                                 if (rc != 0)
3210                                         GOTO(out, rc);
3211                         }
3212                 }
3213
3214                 while (start->ls_active != 0) {
3215                         if (type & start->ls_active) {
3216                                 com = __lfsck_component_find(lfsck, type,
3217                                                         &lfsck->li_list_idle);
3218                                 if (com != NULL)
3219                                         /* Component status will be updated when
3220                                          * its prep() is called later by LFSCK
3221                                          * main engine.
3222                                          */
3223                                         list_move_tail(&com->lc_link,
3224                                                        &lfsck->li_list_scan);
3225                                 start->ls_active &= ~type;
3226                         }
3227                         type <<= 1;
3228                 }
3229         }
3230
3231         if (list_empty(&lfsck->li_list_scan)) {
3232                 /* The speed limit will be used to control both the LFSCK and
3233                  * low layer scrub (if applied), need to be handled firstly.
3234                  */
3235                 if (start->ls_valid & LSV_SPEED_LIMIT) {
3236                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
3237                                 rc = lfsck_bookmark_store(env, lfsck);
3238                                 if (rc != 0)
3239                                         GOTO(out, rc);
3240                         }
3241                 }
3242
3243                 goto trigger;
3244         }
3245
3246         if (start->ls_flags & LPF_RESET)
3247                 flags |= DOIF_RESET;
3248
3249         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
3250         if (rc != 0)
3251                 GOTO(out, rc);
3252
3253         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3254                 start->ls_active |= com->lc_type;
3255                 if (flags & DOIF_RESET) {
3256                         rc = com->lc_ops->lfsck_reset(env, com, false);
3257                         if (rc != 0)
3258                                 GOTO(out, rc);
3259                 }
3260         }
3261
3262         ltds = &lfsck->li_mdt_descs;
3263         down_read(&ltds->ltd_rw_sem);
3264         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3265                 ltd = lfsck_ltd2tgt(ltds, idx);
3266                 LASSERT(ltd != NULL);
3267
3268                 ltd->ltd_layout_done = 0;
3269                 ltd->ltd_namespace_done = 0;
3270                 ltd->ltd_synced_failures = 0;
3271                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_NAMESPACE);
3272                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3273                 list_del_init(&ltd->ltd_layout_phase_list);
3274                 list_del_init(&ltd->ltd_layout_list);
3275                 list_del_init(&ltd->ltd_namespace_phase_list);
3276                 list_del_init(&ltd->ltd_namespace_list);
3277         }
3278         up_read(&ltds->ltd_rw_sem);
3279
3280         ltds = &lfsck->li_ost_descs;
3281         down_read(&ltds->ltd_rw_sem);
3282         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3283                 ltd = lfsck_ltd2tgt(ltds, idx);
3284                 LASSERT(ltd != NULL);
3285
3286                 ltd->ltd_layout_done = 0;
3287                 ltd->ltd_synced_failures = 0;
3288                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3289                 list_del_init(&ltd->ltd_layout_phase_list);
3290                 list_del_init(&ltd->ltd_layout_list);
3291         }
3292         up_read(&ltds->ltd_rw_sem);
3293
3294 trigger:
3295         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
3296         if (bk->lb_param & LPF_DRYRUN)
3297                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
3298
3299         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
3300                 valid |= DOIV_ERROR_HANDLE;
3301                 if (start->ls_flags & LPF_FAILOUT)
3302                         flags |= DOIF_FAILOUT;
3303         }
3304
3305         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
3306                 valid |= DOIV_DRYRUN;
3307                 if (start->ls_flags & LPF_DRYRUN)
3308                         flags |= DOIF_DRYRUN;
3309         }
3310
3311         if (!list_empty(&lfsck->li_list_scan))
3312                 flags |= DOIF_OUTUSED;
3313
3314         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
3315         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
3316         if (IS_ERR(lta))
3317                 GOTO(out, rc = PTR_ERR(lta));
3318
3319         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
3320         spin_lock(&lfsck->li_lock);
3321         thread_set_flags(thread, SVC_STARTING);
3322         spin_unlock(&lfsck->li_lock);
3323         task = kthread_run(lfsck_master_engine, lta, "lfsck");
3324         if (IS_ERR(task)) {
3325                 rc = PTR_ERR(task);
3326                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
3327                        lfsck_lfsck2name(lfsck), rc);
3328                 lfsck_thread_args_fini(lta);
3329
3330                 GOTO(out, rc);
3331         }
3332
3333         wait_event_idle(thread->t_ctl_waitq,
3334                         thread_is_running(thread) ||
3335                         thread_is_stopped(thread));
3336         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
3337                 lfsck->li_start_unplug = 1;
3338                 wake_up(&thread->t_ctl_waitq);
3339
3340                 GOTO(out, rc = 0);
3341         }
3342
3343         /* release lfsck::li_mutex to avoid deadlock. */
3344         mutex_unlock(&lfsck->li_mutex);
3345         rc = lfsck_start_all(env, lfsck, start);
3346         if (rc != 0) {
3347                 spin_lock(&lfsck->li_lock);
3348                 if (thread_is_stopped(thread)) {
3349                         spin_unlock(&lfsck->li_lock);
3350                 } else {
3351                         lfsck->li_status = LS_FAILED;
3352                         lfsck->li_flags = 0;
3353                         thread_set_flags(thread, SVC_STOPPING);
3354                         spin_unlock(&lfsck->li_lock);
3355
3356                         lfsck->li_start_unplug = 1;
3357                         wake_up(&thread->t_ctl_waitq);
3358                         wait_event_idle(thread->t_ctl_waitq,
3359                                         thread_is_stopped(thread));
3360                 }
3361         } else {
3362                 lfsck->li_start_unplug = 1;
3363                 wake_up(&thread->t_ctl_waitq);
3364         }
3365
3366         GOTO(put, rc);
3367
3368 out:
3369         mutex_unlock(&lfsck->li_mutex);
3370
3371 put:
3372         lfsck_instance_put(env, lfsck);
3373
3374         return rc < 0 ? rc : 0;
3375 }
3376 EXPORT_SYMBOL(lfsck_start);
3377
3378 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
3379                struct lfsck_stop *stop)
3380 {
3381         struct lfsck_instance *lfsck;
3382         struct ptlrpc_thread *thread;
3383         int rc = 0;
3384         int rc1 = 0;
3385
3386         ENTRY;
3387         lfsck = lfsck_instance_find(key, true, false);
3388         if (unlikely(lfsck == NULL))
3389                 RETURN(-ENXIO);
3390
3391         thread = &lfsck->li_thread;
3392         if (stop && stop->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3393                 CERROR("%s: only allow to specify '-A' via MDS\n",
3394                        lfsck_lfsck2name(lfsck));
3395                 GOTO(put, rc = -EPERM);
3396         }
3397
3398         spin_lock(&lfsck->li_lock);
3399         /* The target is umounted */
3400         if (stop && stop->ls_status == LS_PAUSED)
3401                 lfsck->li_stopping = 1;
3402
3403         if (thread_is_init(thread) || thread_is_stopped(thread))
3404                 /* no error if LFSCK stopped already, or not started */
3405                 GOTO(unlock, rc = 0);
3406
3407         if (thread_is_stopping(thread))
3408                 /* Someone is stopping LFSCK. */
3409                 GOTO(unlock, rc = -EINPROGRESS);
3410
3411         if (stop) {
3412                 lfsck->li_status = stop->ls_status;
3413                 lfsck->li_flags = stop->ls_flags;
3414         } else {
3415                 lfsck->li_status = LS_STOPPED;
3416                 lfsck->li_flags = 0;
3417         }
3418
3419         thread_set_flags(thread, SVC_STOPPING);
3420
3421         LASSERT(lfsck->li_task);
3422         send_sig(SIGINT, lfsck->li_task, 1);
3423
3424         if (lfsck->li_master) {
3425                 struct lfsck_component *com;
3426                 struct lfsck_assistant_data *lad;
3427
3428                 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3429                         lad = com->lc_data;
3430                         spin_lock(&lad->lad_lock);
3431                         if (lad->lad_task)
3432                                 send_sig(SIGINT, lad->lad_task, 1);
3433                         spin_unlock(&lad->lad_lock);
3434                 }
3435
3436                 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
3437                         lad = com->lc_data;
3438                         spin_lock(&lad->lad_lock);
3439                         if (lad->lad_task)
3440                                 send_sig(SIGINT, lad->lad_task, 1);
3441                         spin_unlock(&lad->lad_lock);
3442                 }
3443         }
3444
3445         wake_up(&thread->t_ctl_waitq);
3446         spin_unlock(&lfsck->li_lock);
3447         if (stop && stop->ls_flags & LPF_BROADCAST)
3448                 rc1 = lfsck_stop_all(env, lfsck, stop);
3449
3450         /* It was me set the status as 'stopping' just now, if it is not
3451          * 'stopping' now, then either stopped, or re-started by race.
3452          */
3453         wait_event_idle(thread->t_ctl_waitq,
3454                         !thread_is_stopping(thread));
3455
3456         GOTO(put, rc = 0);
3457
3458 unlock:
3459         spin_unlock(&lfsck->li_lock);
3460 put:
3461         lfsck_instance_put(env, lfsck);
3462
3463         return rc != 0 ? rc : rc1;
3464 }
3465 EXPORT_SYMBOL(lfsck_stop);
3466
3467 int lfsck_in_notify_local(const struct lu_env *env, struct dt_device *key,
3468                           struct lfsck_req_local *lrl, struct thandle *th)
3469 {
3470         struct lfsck_instance *lfsck;
3471         struct lfsck_component *com;
3472         int rc = -EOPNOTSUPP;
3473
3474         ENTRY;
3475         lfsck = lfsck_instance_find(key, true, false);
3476         if (unlikely(!lfsck))
3477                 RETURN(-ENXIO);
3478
3479         com = lfsck_component_find(lfsck, lrl->lrl_active);
3480         if (likely(com && com->lc_ops->lfsck_in_notify_local)) {
3481                 rc = com->lc_ops->lfsck_in_notify_local(env, com, lrl, th);
3482                 lfsck_component_put(env, com);
3483         }
3484
3485         lfsck_instance_put(env, lfsck);
3486
3487         RETURN(rc);
3488 }
3489 EXPORT_SYMBOL(lfsck_in_notify_local);
3490
3491 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
3492                     struct lfsck_request *lr)
3493 {
3494         int rc = -EOPNOTSUPP;
3495
3496         ENTRY;
3497         switch (lr->lr_event) {
3498         case LE_START: {
3499                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
3500                 struct lfsck_start_param  lsp;
3501
3502                 memset(start, 0, sizeof(*start));
3503                 start->ls_valid = lr->lr_valid;
3504                 start->ls_speed_limit = lr->lr_speed;
3505                 start->ls_version = lr->lr_version;
3506                 start->ls_active = lr->lr_active;
3507                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3508                 start->ls_async_windows = lr->lr_async_windows;
3509
3510                 lsp.lsp_start = start;
3511                 lsp.lsp_index = lr->lr_index;
3512                 lsp.lsp_index_valid = 1;
3513                 rc = lfsck_start(env, key, &lsp);
3514                 break;
3515         }
3516         case LE_STOP: {
3517                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3518
3519                 memset(stop, 0, sizeof(*stop));
3520                 stop->ls_status = lr->lr_status;
3521                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3522                 rc = lfsck_stop(env, key, stop);
3523                 break;
3524         }
3525         case LE_PHASE1_DONE:
3526         case LE_PHASE2_DONE:
3527         case LE_PEER_EXIT:
3528         case LE_CONDITIONAL_DESTROY:
3529         case LE_SET_LMV_MASTER:
3530         case LE_SET_LMV_SLAVE:
3531         case LE_PAIRS_VERIFY: {
3532                 struct lfsck_instance  *lfsck;
3533                 struct lfsck_component *com;
3534
3535                 lfsck = lfsck_instance_find(key, true, false);
3536                 if (unlikely(lfsck == NULL))
3537                         RETURN(-ENXIO);
3538
3539                 com = lfsck_component_find(lfsck, lr->lr_active);
3540                 if (likely(com)) {
3541                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
3542                         lfsck_component_put(env, com);
3543                 }
3544
3545                 lfsck_instance_put(env, lfsck);
3546                 break;
3547         }
3548         default:
3549                 break;
3550         }
3551
3552         RETURN(rc);
3553 }
3554 EXPORT_SYMBOL(lfsck_in_notify);
3555
3556 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3557                 struct lfsck_request *req, struct lfsck_reply *rep,
3558                 struct lfsck_query *que)
3559 {
3560         struct lfsck_instance *lfsck;
3561         struct lfsck_component *com;
3562         int i;
3563         int rc = 0;
3564         __u16 type;
3565
3566         ENTRY;
3567         lfsck = lfsck_instance_find(key, true, false);
3568         if (unlikely(lfsck == NULL))
3569                 RETURN(-ENXIO);
3570
3571         if (que != NULL) {
3572                 if (que->lu_types == LFSCK_TYPES_ALL)
3573                         que->lu_types =
3574                                 LFSCK_TYPES_SUPPORTED & ~LFSCK_TYPE_SCRUB;
3575
3576                 if (que->lu_types & ~LFSCK_TYPES_SUPPORTED) {
3577                         que->lu_types &= ~LFSCK_TYPES_SUPPORTED;
3578
3579                         GOTO(out, rc = -ENOTSUPP);
3580                 }
3581
3582                 for (i = 0, type = BIT(i); i < LFSCK_TYPE_BITS;
3583                      i++, type = BIT(i)) {
3584                         if (!(que->lu_types & type))
3585                                 continue;
3586
3587 again:
3588                         com = lfsck_component_find(lfsck, type);
3589                         if (unlikely(com == NULL))
3590                                 GOTO(out, rc = -ENOTSUPP);
3591
3592                         memset(que->lu_mdts_count[i], 0,
3593                                sizeof(__u32) * (LS_MAX + 1));
3594                         memset(que->lu_osts_count[i], 0,
3595                                sizeof(__u32) * (LS_MAX + 1));
3596                         que->lu_repaired[i] = 0;
3597                         rc = com->lc_ops->lfsck_query(env, com, req, rep,
3598                                                       que, i);
3599                         lfsck_component_put(env, com);
3600                         if  (rc < 0)
3601                                 GOTO(out, rc);
3602                 }
3603
3604                 if (!(que->lu_flags & LPF_WAIT))
3605                         GOTO(out, rc);
3606
3607                 for (i = 0, type = BIT(i); i < LFSCK_TYPE_BITS;
3608                      i++, type = BIT(i)) {
3609                         if (!(que->lu_types & type))
3610                                 continue;
3611
3612                         if (que->lu_mdts_count[i][LS_SCANNING_PHASE1] != 0 ||
3613                             que->lu_mdts_count[i][LS_SCANNING_PHASE2] != 0 ||
3614                             que->lu_osts_count[i][LS_SCANNING_PHASE1] != 0 ||
3615                             que->lu_osts_count[i][LS_SCANNING_PHASE2] != 0) {
3616                                 /* If it is required to wait, then sleep
3617                                  * 3 seconds and try to query again.
3618                                  */
3619                                 unsigned long timeout =
3620                                         msecs_to_jiffies(3000) + 1;
3621                                 while (timeout &&
3622                                        !fatal_signal_pending(current))
3623                                         timeout = schedule_timeout_killable(
3624                                                 timeout);
3625                                 if (timeout == 0)
3626                                         goto again;
3627                         }
3628                 }
3629         } else {
3630                 com = lfsck_component_find(lfsck, req->lr_active);
3631                 if (likely(com != NULL)) {
3632                         rc = com->lc_ops->lfsck_query(env, com, req, rep,
3633                                                       que, -1);
3634                         lfsck_component_put(env, com);
3635                 } else {
3636                         rc = -ENOTSUPP;
3637                 }
3638         }
3639
3640         GOTO(out, rc);
3641
3642 out:
3643         lfsck_instance_put(env, lfsck);
3644         return rc;
3645 }
3646 EXPORT_SYMBOL(lfsck_query);
3647
3648 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3649                              struct ldlm_namespace *ns)
3650 {
3651         struct lfsck_instance *lfsck;
3652         int rc = -ENXIO;
3653
3654         lfsck = lfsck_instance_find(key, true, false);
3655         if (likely(lfsck != NULL)) {
3656                 lfsck->li_namespace = ns;
3657                 lfsck_instance_put(env, lfsck);
3658                 rc = 0;
3659         }
3660
3661         return rc;
3662 }
3663 EXPORT_SYMBOL(lfsck_register_namespace);
3664
3665 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3666                    struct dt_device *next, struct obd_device *obd,
3667                    lfsck_out_notify notify, void *notify_data, bool master)
3668 {
3669         struct lfsck_instance *lfsck;
3670         struct dt_object *root  = NULL;
3671         struct dt_object *obj   = NULL;
3672         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
3673         int rc;
3674
3675         ENTRY;
3676         lfsck = lfsck_instance_find(key, false, false);
3677         if (unlikely(lfsck != NULL))
3678                 RETURN(-EEXIST);
3679
3680         OBD_ALLOC_PTR(lfsck);
3681         if (lfsck == NULL)
3682                 RETURN(-ENOMEM);
3683
3684         mutex_init(&lfsck->li_mutex);
3685         spin_lock_init(&lfsck->li_lock);
3686         INIT_LIST_HEAD(&lfsck->li_link);
3687         INIT_LIST_HEAD(&lfsck->li_list_scan);
3688         INIT_LIST_HEAD(&lfsck->li_list_dir);
3689         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3690         INIT_LIST_HEAD(&lfsck->li_list_idle);
3691         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3692         atomic_set(&lfsck->li_ref, 1);
3693         atomic_set(&lfsck->li_double_scan_count, 0);
3694         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3695         lfsck->li_out_notify = notify;
3696         lfsck->li_out_notify_data = notify_data;
3697         lfsck->li_next = next;
3698         lfsck->li_bottom = key;
3699         lfsck->li_obd = obd;
3700
3701         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3702         if (rc != 0)
3703                 GOTO(out, rc);
3704
3705         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3706         if (rc != 0)
3707                 GOTO(out, rc);
3708
3709         fid->f_seq = FID_SEQ_LOCAL_NAME;
3710         fid->f_oid = 1;
3711         fid->f_ver = 0;
3712         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3713         if (rc != 0)
3714                 GOTO(out, rc);
3715
3716         rc = dt_root_get(env, key, fid);
3717         if (rc != 0)
3718                 GOTO(out, rc);
3719
3720         root = dt_locate(env, key, fid);
3721         if (IS_ERR(root))
3722                 GOTO(out, rc = PTR_ERR(root));
3723
3724         lfsck->li_local_root_fid = *fid;
3725         if (master) {
3726                 lfsck->li_master = 1;
3727                 if (lfsck_dev_idx(lfsck) == 0) {
3728                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3729                         const struct lu_name *cname;
3730
3731                         rc = dt_lookup_dir(env, root, "ROOT",
3732                                            &lfsck->li_global_root_fid);
3733                         if (rc != 0)
3734                                 GOTO(out, rc);
3735
3736                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3737                         if (IS_ERR(obj))
3738                                 GOTO(out, rc = PTR_ERR(obj));
3739
3740                         rc = dt_lookup_dir(env, obj, dotlustre, fid);
3741                         if (rc != 0)
3742                                 GOTO(out, rc);
3743
3744                         lfsck_object_put(env, obj);
3745                         obj = dt_locate(env, key, fid);
3746                         if (IS_ERR(obj))
3747                                 GOTO(out, rc = PTR_ERR(obj));
3748
3749                         cname = lfsck_name_get_const(env, dotlustre,
3750                                                      strlen(dotlustre));
3751                         rc = lfsck_verify_linkea(env, lfsck, obj, cname,
3752                                                  &lfsck->li_global_root_fid);
3753                         if (rc != 0)
3754                                 GOTO(out, rc);
3755
3756                         *pfid = *fid;
3757                         rc = dt_lookup_dir(env, obj, lostfound, fid);
3758                         if (rc != 0)
3759                                 GOTO(out, rc);
3760
3761                         lfsck_object_put(env, obj);
3762                         obj = dt_locate(env, key, fid);
3763                         if (IS_ERR(obj))
3764                                 GOTO(out, rc = PTR_ERR(obj));
3765
3766                         cname = lfsck_name_get_const(env, lostfound,
3767                                                      strlen(lostfound));
3768                         rc = lfsck_verify_linkea(env, lfsck, obj, cname, pfid);
3769                         if (rc != 0)
3770                                 GOTO(out, rc);
3771
3772                         lfsck_object_put(env, obj);
3773                         obj = NULL;
3774                 }
3775         }
3776
3777         fid->f_seq = FID_SEQ_LOCAL_FILE;
3778         fid->f_oid = OTABLE_IT_OID;
3779         fid->f_ver = 0;
3780         obj = dt_locate(env, key, fid);
3781         if (IS_ERR(obj))
3782                 GOTO(out, rc = PTR_ERR(obj));
3783
3784         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3785         if (rc != 0)
3786                 GOTO(out, rc);
3787
3788         lfsck->li_obj_oit = obj;
3789         obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3790                                         S_IFDIR | S_IRUGO | S_IWUSR);
3791         if (IS_ERR(obj))
3792                 GOTO(out, rc = PTR_ERR(obj));
3793
3794         lu_object_get(&obj->do_lu);
3795         lfsck->li_lfsck_dir = obj;
3796         rc = lfsck_bookmark_setup(env, lfsck);
3797         if (rc != 0)
3798                 GOTO(out, rc);
3799
3800         if (master) {
3801                 rc = lfsck_fid_init(lfsck);
3802                 if (rc < 0)
3803                         GOTO(out, rc);
3804
3805                 rc = lfsck_namespace_setup(env, lfsck);
3806                 if (rc < 0)
3807                         GOTO(out, rc);
3808         }
3809
3810         rc = lfsck_layout_setup(env, lfsck);
3811         if (rc < 0)
3812                 GOTO(out, rc);
3813
3814         /* XXX: more LFSCK components initialization to be added here. */
3815
3816         rc = lfsck_instance_add(lfsck);
3817         if (rc == 0)
3818                 rc = lfsck_add_target_from_orphan(env, lfsck);
3819 out:
3820         if (obj != NULL && !IS_ERR(obj))
3821                 lfsck_object_put(env, obj);
3822         if (root != NULL && !IS_ERR(root))
3823                 lfsck_object_put(env, root);
3824         if (rc != 0)
3825                 lfsck_instance_cleanup(env, lfsck);
3826         return rc;
3827 }
3828 EXPORT_SYMBOL(lfsck_register);
3829
3830 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3831 {
3832         struct lfsck_instance *lfsck;
3833
3834         lfsck = lfsck_instance_find(key, false, true);
3835         if (lfsck != NULL)
3836                 lfsck_instance_put(env, lfsck);
3837 }
3838 EXPORT_SYMBOL(lfsck_degister);
3839
3840 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3841                      struct dt_device *tgt, struct obd_export *exp,
3842                      __u32 index, bool for_ost)
3843 {
3844         struct lfsck_instance *lfsck;
3845         struct lfsck_tgt_desc *ltd;
3846         int rc;
3847
3848         ENTRY;
3849         OBD_ALLOC_PTR(ltd);
3850         if (ltd == NULL)
3851                 RETURN(-ENOMEM);
3852
3853         ltd->ltd_tgt = tgt;
3854         ltd->ltd_key = key;
3855         ltd->ltd_exp = exp;
3856         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3857         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3858         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3859         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3860         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3861         atomic_set(&ltd->ltd_ref, 1);
3862         ltd->ltd_index = index;
3863
3864         spin_lock(&lfsck_instance_lock);
3865         lfsck = __lfsck_instance_find(key, true, false);
3866         if (lfsck == NULL) {
3867                 if (for_ost)
3868                         list_add_tail(&ltd->ltd_orphan_list,
3869                                       &lfsck_ost_orphan_list);
3870                 else
3871                         list_add_tail(&ltd->ltd_orphan_list,
3872                                       &lfsck_mdt_orphan_list);
3873                 spin_unlock(&lfsck_instance_lock);
3874
3875                 RETURN(0);
3876         }
3877         spin_unlock(&lfsck_instance_lock);
3878
3879         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3880         if (rc != 0)
3881                 lfsck_tgt_put(ltd);
3882
3883         lfsck_instance_put(env, lfsck);
3884
3885         RETURN(rc);
3886 }
3887 EXPORT_SYMBOL(lfsck_add_target);
3888
3889 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3890                       struct dt_device *tgt, __u32 index, bool for_ost)
3891 {
3892         struct lfsck_instance *lfsck;
3893         struct lfsck_tgt_descs *ltds;
3894         struct lfsck_tgt_desc *ltd;
3895         struct list_head *head;
3896
3897         if (for_ost)
3898                 head = &lfsck_ost_orphan_list;
3899         else
3900                 head = &lfsck_mdt_orphan_list;
3901
3902         spin_lock(&lfsck_instance_lock);
3903         list_for_each_entry(ltd, head, ltd_orphan_list) {
3904                 if (ltd->ltd_tgt == tgt) {
3905                         list_del_init(&ltd->ltd_orphan_list);
3906                         spin_unlock(&lfsck_instance_lock);
3907                         lfsck_tgt_put(ltd);
3908
3909                         return;
3910                 }
3911         }
3912
3913         ltd = NULL;
3914         lfsck = __lfsck_instance_find(key, true, false);
3915         spin_unlock(&lfsck_instance_lock);
3916         if (unlikely(lfsck == NULL))
3917                 return;
3918
3919         if (for_ost)
3920                 ltds = &lfsck->li_ost_descs;
3921         else
3922                 ltds = &lfsck->li_mdt_descs;
3923
3924         down_write(&ltds->ltd_rw_sem);
3925         LASSERT(ltds->ltd_tgts_bitmap);
3926
3927         if (unlikely(index >= ltds->ltd_tgts_mask_len))
3928                 goto unlock;
3929
3930         ltd = lfsck_ltd2tgt(ltds, index);
3931         if (unlikely(ltd == NULL))
3932                 goto unlock;
3933
3934         LASSERT(ltds->ltd_tgtnr > 0);
3935
3936         ltds->ltd_tgtnr--;
3937         set_bit(index, ltds->ltd_tgts_bitmap);
3938         lfsck_assign_tgt(ltds, NULL, index);
3939
3940 unlock:
3941         if (ltd == NULL) {
3942                 if (for_ost)
3943                         head = &lfsck->li_ost_descs.ltd_orphan;
3944                 else
3945                         head = &lfsck->li_mdt_descs.ltd_orphan;
3946
3947                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3948                         if (ltd->ltd_tgt == tgt) {
3949                                 list_del_init(&ltd->ltd_orphan_list);
3950                                 break;
3951                         }
3952                 }
3953         }
3954
3955         up_write(&ltds->ltd_rw_sem);
3956         if (ltd != NULL) {
3957                 spin_lock(&ltds->ltd_lock);
3958                 ltd->ltd_dead = 1;
3959                 spin_unlock(&ltds->ltd_lock);
3960                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3961                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3962                 lfsck_tgt_put(ltd);
3963         }
3964
3965         lfsck_instance_put(env, lfsck);
3966 }
3967 EXPORT_SYMBOL(lfsck_del_target);
3968
3969 static int __init lfsck_init(void)
3970 {
3971         int rc;
3972
3973         rc = libcfs_setup();
3974         if (rc)
3975                 return rc;
3976
3977         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3978         rc = lu_context_key_register(&lfsck_thread_key);
3979         if (!rc) {
3980                 tgt_register_lfsck_in_notify_local(lfsck_in_notify_local);
3981                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3982                 tgt_register_lfsck_query(lfsck_query);
3983         }
3984
3985         return rc;
3986 }
3987
3988 static void __exit lfsck_exit(void)
3989 {
3990         struct lfsck_tgt_desc *ltd;
3991         struct lfsck_tgt_desc *next;
3992
3993         LASSERT(list_empty(&lfsck_instance_list));
3994
3995         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3996                                  ltd_orphan_list) {
3997                 list_del_init(&ltd->ltd_orphan_list);
3998                 lfsck_tgt_put(ltd);
3999         }
4000
4001         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
4002                                  ltd_orphan_list) {
4003                 list_del_init(&ltd->ltd_orphan_list);
4004                 lfsck_tgt_put(ltd);
4005         }
4006
4007         lu_context_key_degister(&lfsck_thread_key);
4008 }
4009
4010 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
4011 MODULE_DESCRIPTION("Lustre File System Checker");
4012 MODULE_VERSION(LUSTRE_VERSION_STRING);
4013 MODULE_LICENSE("GPL");
4014
4015 module_init(lfsck_init);
4016 module_exit(lfsck_exit);