Whamcloud - gitweb
LU-5517 lfsck: repair invalid nlink count
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         NULL
102 };
103
104 enum lfsck_verify_lpf_types {
105         LVLT_BY_BOOKMARK        = 0,
106         LVLT_BY_NAMEENTRY       = 1,
107 };
108
109 const char *lfsck_status2names(enum lfsck_status status)
110 {
111         if (unlikely(status < 0 || status >= LS_MAX))
112                 return "unknown";
113
114         return lfsck_status_names[status];
115 }
116
117 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
118 {
119         spin_lock_init(&ltds->ltd_lock);
120         init_rwsem(&ltds->ltd_rw_sem);
121         INIT_LIST_HEAD(&ltds->ltd_orphan);
122         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
123         if (ltds->ltd_tgts_bitmap == NULL)
124                 return -ENOMEM;
125
126         return 0;
127 }
128
129 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
130 {
131         struct lfsck_tgt_desc   *ltd;
132         struct lfsck_tgt_desc   *next;
133         int                      idx;
134
135         down_write(&ltds->ltd_rw_sem);
136
137         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
138                                  ltd_orphan_list) {
139                 list_del_init(&ltd->ltd_orphan_list);
140                 lfsck_tgt_put(ltd);
141         }
142
143         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
144                 up_write(&ltds->ltd_rw_sem);
145
146                 return;
147         }
148
149         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
150                 ltd = LTD_TGT(ltds, idx);
151                 if (likely(ltd != NULL)) {
152                         LASSERT(list_empty(&ltd->ltd_layout_list));
153                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
154                         LASSERT(list_empty(&ltd->ltd_namespace_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
156
157                         ltds->ltd_tgtnr--;
158                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
159                         LTD_TGT(ltds, idx) = NULL;
160                         lfsck_tgt_put(ltd);
161                 }
162         }
163
164         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
165                  ltds->ltd_tgtnr);
166
167         for (idx = 0; idx < TGT_PTRS; idx++) {
168                 if (ltds->ltd_tgts_idx[idx] != NULL) {
169                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
170                         ltds->ltd_tgts_idx[idx] = NULL;
171                 }
172         }
173
174         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
175         ltds->ltd_tgts_bitmap = NULL;
176         up_write(&ltds->ltd_rw_sem);
177 }
178
179 static int __lfsck_add_target(const struct lu_env *env,
180                               struct lfsck_instance *lfsck,
181                               struct lfsck_tgt_desc *ltd,
182                               bool for_ost, bool locked)
183 {
184         struct lfsck_tgt_descs *ltds;
185         __u32                   index = ltd->ltd_index;
186         int                     rc    = 0;
187         ENTRY;
188
189         if (for_ost)
190                 ltds = &lfsck->li_ost_descs;
191         else
192                 ltds = &lfsck->li_mdt_descs;
193
194         if (!locked)
195                 down_write(&ltds->ltd_rw_sem);
196
197         LASSERT(ltds->ltd_tgts_bitmap != NULL);
198
199         if (index >= ltds->ltd_tgts_bitmap->size) {
200                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
201                                     (__u32)BITS_PER_LONG);
202                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
203                 cfs_bitmap_t *new_bitmap;
204
205                 while (newsize < index + 1)
206                         newsize <<= 1;
207
208                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
209                 if (new_bitmap == NULL)
210                         GOTO(unlock, rc = -ENOMEM);
211
212                 if (ltds->ltd_tgtnr > 0)
213                         cfs_bitmap_copy(new_bitmap, old_bitmap);
214                 ltds->ltd_tgts_bitmap = new_bitmap;
215                 CFS_FREE_BITMAP(old_bitmap);
216         }
217
218         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
219                 CERROR("%s: the device %s (%u) is registered already\n",
220                        lfsck_lfsck2name(lfsck),
221                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
222                 GOTO(unlock, rc = -EEXIST);
223         }
224
225         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
226                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
227                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
228                         GOTO(unlock, rc = -ENOMEM);
229         }
230
231         LTD_TGT(ltds, index) = ltd;
232         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
233         ltds->ltd_tgtnr++;
234
235         GOTO(unlock, rc = 0);
236
237 unlock:
238         if (!locked)
239                 up_write(&ltds->ltd_rw_sem);
240
241         return rc;
242 }
243
244 static int lfsck_add_target_from_orphan(const struct lu_env *env,
245                                         struct lfsck_instance *lfsck)
246 {
247         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
248         struct lfsck_tgt_desc   *ltd;
249         struct lfsck_tgt_desc   *next;
250         struct list_head        *head    = &lfsck_ost_orphan_list;
251         int                      rc;
252         bool                     for_ost = true;
253
254 again:
255         spin_lock(&lfsck_instance_lock);
256         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
257                 if (ltd->ltd_key == lfsck->li_bottom)
258                         list_move_tail(&ltd->ltd_orphan_list,
259                                        &ltds->ltd_orphan);
260         }
261         spin_unlock(&lfsck_instance_lock);
262
263         down_write(&ltds->ltd_rw_sem);
264         while (!list_empty(&ltds->ltd_orphan)) {
265                 ltd = list_entry(ltds->ltd_orphan.next,
266                                  struct lfsck_tgt_desc,
267                                  ltd_orphan_list);
268                 list_del_init(&ltd->ltd_orphan_list);
269                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
270                 /* Do not hold the semaphore for too long time. */
271                 up_write(&ltds->ltd_rw_sem);
272                 if (rc != 0)
273                         return rc;
274
275                 down_write(&ltds->ltd_rw_sem);
276         }
277         up_write(&ltds->ltd_rw_sem);
278
279         if (for_ost) {
280                 ltds = &lfsck->li_mdt_descs;
281                 head = &lfsck_mdt_orphan_list;
282                 for_ost = false;
283                 goto again;
284         }
285
286         return 0;
287 }
288
289 static inline struct lfsck_component *
290 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
291                        struct list_head *list)
292 {
293         struct lfsck_component *com;
294
295         list_for_each_entry(com, list, lc_link) {
296                 if (com->lc_type == type)
297                         return com;
298         }
299         return NULL;
300 }
301
302 struct lfsck_component *
303 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
304 {
305         struct lfsck_component *com;
306
307         spin_lock(&lfsck->li_lock);
308         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
309         if (com != NULL)
310                 goto unlock;
311
312         com = __lfsck_component_find(lfsck, type,
313                                      &lfsck->li_list_double_scan);
314         if (com != NULL)
315                 goto unlock;
316
317         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
318
319 unlock:
320         if (com != NULL)
321                 lfsck_component_get(com);
322         spin_unlock(&lfsck->li_lock);
323         return com;
324 }
325
326 void lfsck_component_cleanup(const struct lu_env *env,
327                              struct lfsck_component *com)
328 {
329         if (!list_empty(&com->lc_link))
330                 list_del_init(&com->lc_link);
331         if (!list_empty(&com->lc_link_dir))
332                 list_del_init(&com->lc_link_dir);
333
334         lfsck_component_put(env, com);
335 }
336
337 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
338                     struct lu_fid *fid, bool locked)
339 {
340         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
341         int                      rc = 0;
342         ENTRY;
343
344         if (!locked)
345                 mutex_lock(&lfsck->li_mutex);
346
347         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
348         if (rc >= 0) {
349                 bk->lb_last_fid = *fid;
350                 /* We do not care about whether the subsequent sub-operations
351                  * failed or not. The worst case is that one FID is lost that
352                  * is not a big issue for the LFSCK since it is relative rare
353                  * for LFSCK create. */
354                 rc = lfsck_bookmark_store(env, lfsck);
355         }
356
357         if (!locked)
358                 mutex_unlock(&lfsck->li_mutex);
359
360         RETURN(rc);
361 }
362
363 /**
364  * Request the specified ibits lock for the given object.
365  *
366  * Before the LFSCK modifying on the namespace visible object,
367  * it needs to acquire related ibits ldlm lock.
368  *
369  * \param[in] env       pointer to the thread context
370  * \param[in] lfsck     pointer to the lfsck instance
371  * \param[in] obj       pointer to the dt_object to be locked
372  * \param[out] lh       pointer to the lock handle
373  * \param[in] ibits     the bits for the ldlm lock to be acquired
374  * \param[in] mode      the mode for the ldlm lock to be acquired
375  *
376  * \retval              0 for success
377  * \retval              negative error number on failure
378  */
379 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
380                      struct dt_object *obj, struct lustre_handle *lh,
381                      __u64 bits, ldlm_mode_t mode)
382 {
383         struct lfsck_thread_info        *info   = lfsck_env_info(env);
384         ldlm_policy_data_t              *policy = &info->lti_policy;
385         struct ldlm_res_id              *resid  = &info->lti_resid;
386         __u64                            flags  = LDLM_FL_ATOMIC_CB;
387         int                              rc;
388
389         LASSERT(lfsck->li_namespace != NULL);
390
391         memset(policy, 0, sizeof(*policy));
392         policy->l_inodebits.bits = bits;
393         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
394         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
395                                     policy, mode, &flags, ldlm_blocking_ast,
396                                     ldlm_completion_ast, NULL, NULL, 0,
397                                     LVB_T_NONE, NULL, lh);
398         if (rc == ELDLM_OK) {
399                 rc = 0;
400         } else {
401                 memset(lh, 0, sizeof(*lh));
402                 rc = -EIO;
403         }
404
405         return rc;
406 }
407
408 /**
409  * Release the the specified ibits lock.
410  *
411  * If the lock has been acquired before, release it
412  * and cleanup the handle. Otherwise, do nothing.
413  *
414  * \param[in] lh        pointer to the lock handle
415  * \param[in] mode      the mode for the ldlm lock to be released
416  */
417 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
418 {
419         if (lustre_handle_is_used(lh)) {
420                 ldlm_lock_decref(lh, mode);
421                 memset(lh, 0, sizeof(*lh));
422         }
423 }
424
425 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
426                               struct lfsck_instance *lfsck,
427                               const struct lu_fid *fid)
428 {
429         struct seq_server_site  *ss     =
430                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
431         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
432         int                      rc;
433
434         fld_range_set_mdt(range);
435         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
436         if (rc == 0)
437                 rc = range->lsr_index;
438
439         return rc;
440 }
441
442 const char dot[] = ".";
443 const char dotdot[] = "..";
444 static const char dotlustre[] = ".lustre";
445 static const char lostfound[] = "lost+found";
446
447 static int lfsck_create_lpf_local(const struct lu_env *env,
448                                   struct lfsck_instance *lfsck,
449                                   struct dt_object *parent,
450                                   struct dt_object *child,
451                                   struct lu_attr *la,
452                                   struct dt_object_format *dof,
453                                   const char *name)
454 {
455         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
456         struct dt_device        *dev    = lfsck->li_bottom;
457         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
458         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
459         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
460         struct thandle          *th     = NULL;
461         struct linkea_data       ldata  = { 0 };
462         struct lu_buf            linkea_buf;
463         const struct lu_name    *cname;
464         loff_t                   pos    = 0;
465         int                      len    = sizeof(struct lfsck_bookmark);
466         int                      rc;
467         ENTRY;
468
469         rc = linkea_data_new(&ldata,
470                              &lfsck_env_info(env)->lti_linkea_buf2);
471         if (rc != 0)
472                 RETURN(rc);
473
474         cname = lfsck_name_get_const(env, name, strlen(name));
475         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
476         if (rc != 0)
477                 RETURN(rc);
478
479         th = dt_trans_create(env, dev);
480         if (IS_ERR(th))
481                 RETURN(PTR_ERR(th));
482
483         /* 1a. create child */
484         rc = dt_declare_create(env, child, la, NULL, dof, th);
485         if (rc != 0)
486                 GOTO(stop, rc);
487
488         /* 2a. increase child nlink */
489         rc = dt_declare_ref_add(env, child, th);
490         if (rc != 0)
491                 GOTO(stop, rc);
492
493         /* 3a. insert linkEA for child */
494         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
495                        ldata.ld_leh->leh_len);
496         rc = dt_declare_xattr_set(env, child, &linkea_buf,
497                                   XATTR_NAME_LINK, 0, th);
498         if (rc != 0)
499                 GOTO(stop, rc);
500
501         /* 4a. insert name into parent dir */
502         rec->rec_type = S_IFDIR;
503         rec->rec_fid = cfid;
504         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
505                                (const struct dt_key *)name, th);
506         if (rc != 0)
507                 GOTO(stop, rc);
508
509         /* 5a. increase parent nlink */
510         rc = dt_declare_ref_add(env, parent, th);
511         if (rc != 0)
512                 GOTO(stop, rc);
513
514         /* 6a. update bookmark */
515         rc = dt_declare_record_write(env, bk_obj,
516                                      lfsck_buf_get(env, bk, len), 0, th);
517         if (rc != 0)
518                 GOTO(stop, rc);
519
520         rc = dt_trans_start_local(env, dev, th);
521         if (rc != 0)
522                 GOTO(stop, rc);
523
524         dt_write_lock(env, child, 0);
525         /* 1b.1. create child */
526         rc = dt_create(env, child, la, NULL, dof, th);
527         if (rc != 0)
528                 GOTO(unlock, rc);
529
530         if (unlikely(!dt_try_as_dir(env, child)))
531                 GOTO(unlock, rc = -ENOTDIR);
532
533         /* 1b.2. insert dot into child dir */
534         rec->rec_fid = cfid;
535         rc = dt_insert(env, child, (const struct dt_rec *)rec,
536                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
537         if (rc != 0)
538                 GOTO(unlock, rc);
539
540         /* 1b.3. insert dotdot into child dir */
541         rec->rec_fid = &LU_LPF_FID;
542         rc = dt_insert(env, child, (const struct dt_rec *)rec,
543                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
544         if (rc != 0)
545                 GOTO(unlock, rc);
546
547         /* 2b. increase child nlink */
548         rc = dt_ref_add(env, child, th);
549         if (rc != 0)
550                 GOTO(unlock, rc);
551
552         /* 3b. insert linkEA for child. */
553         rc = dt_xattr_set(env, child, &linkea_buf,
554                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
555         dt_write_unlock(env, child);
556         if (rc != 0)
557                 GOTO(stop, rc);
558
559         /* 4b. insert name into parent dir */
560         rec->rec_fid = cfid;
561         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
562                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
563         if (rc != 0)
564                 GOTO(stop, rc);
565
566         dt_write_lock(env, parent, 0);
567         /* 5b. increase parent nlink */
568         rc = dt_ref_add(env, parent, th);
569         dt_write_unlock(env, parent);
570         if (rc != 0)
571                 GOTO(stop, rc);
572
573         bk->lb_lpf_fid = *cfid;
574         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
575
576         /* 6b. update bookmark */
577         rc = dt_record_write(env, bk_obj,
578                              lfsck_buf_get(env, bk, len), &pos, th);
579
580         GOTO(stop, rc);
581
582 unlock:
583         dt_write_unlock(env, child);
584
585 stop:
586         dt_trans_stop(env, dev, th);
587
588         return rc;
589 }
590
591 static int lfsck_create_lpf_remote(const struct lu_env *env,
592                                    struct lfsck_instance *lfsck,
593                                    struct dt_object *parent,
594                                    struct dt_object *child,
595                                    struct lu_attr *la,
596                                    struct dt_object_format *dof,
597                                    const char *name)
598 {
599         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
600         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
601         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
602         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
603         struct thandle          *th     = NULL;
604         struct linkea_data       ldata  = { 0 };
605         struct lu_buf            linkea_buf;
606         const struct lu_name    *cname;
607         struct dt_device        *dev;
608         loff_t                   pos    = 0;
609         int                      len    = sizeof(struct lfsck_bookmark);
610         int                      rc;
611         ENTRY;
612
613         rc = linkea_data_new(&ldata,
614                              &lfsck_env_info(env)->lti_linkea_buf2);
615         if (rc != 0)
616                 RETURN(rc);
617
618         cname = lfsck_name_get_const(env, name, strlen(name));
619         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
620         if (rc != 0)
621                 RETURN(rc);
622
623         /* Create .lustre/lost+found/MDTxxxx. */
624
625         /* XXX: Currently, cross-MDT create operation needs to create the child
626          *      object firstly, then insert name into the parent directory. For
627          *      this case, the child object resides on current MDT (local), but
628          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
629          *      easy to contain all the sub-modifications orderly within single
630          *      transaction.
631          *
632          *      To avoid more inconsistency, we split the create operation into
633          *      two transactions:
634          *
635          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
636          *         locally.
637          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
638          *         remotely.
639          *
640          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
641          *      repair such inconsistency when LFSCK run next time. */
642
643         /* Transaction I: locally */
644
645         dev = lfsck->li_bottom;
646         th = dt_trans_create(env, dev);
647         if (IS_ERR(th))
648                 RETURN(PTR_ERR(th));
649
650         /* 1a. create child */
651         rc = dt_declare_create(env, child, la, NULL, dof, th);
652         if (rc != 0)
653                 GOTO(stop, rc);
654
655         /* 2a. increase child nlink */
656         rc = dt_declare_ref_add(env, child, th);
657         if (rc != 0)
658                 GOTO(stop, rc);
659
660         /* 3a. insert linkEA for child */
661         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
662                        ldata.ld_leh->leh_len);
663         rc = dt_declare_xattr_set(env, child, &linkea_buf,
664                                   XATTR_NAME_LINK, 0, th);
665         if (rc != 0)
666                 GOTO(stop, rc);
667
668         /* 4a. update bookmark */
669         rc = dt_declare_record_write(env, bk_obj,
670                                      lfsck_buf_get(env, bk, len), 0, th);
671         if (rc != 0)
672                 GOTO(stop, rc);
673
674         rc = dt_trans_start_local(env, dev, th);
675         if (rc != 0)
676                 GOTO(stop, rc);
677
678         dt_write_lock(env, child, 0);
679         /* 1b.1. create child */
680         rc = dt_create(env, child, la, NULL, dof, th);
681         if (rc != 0)
682                 GOTO(unlock, rc);
683
684         if (unlikely(!dt_try_as_dir(env, child)))
685                 GOTO(unlock, rc = -ENOTDIR);
686
687         /* 1b.2. insert dot into child dir */
688         rec->rec_type = S_IFDIR;
689         rec->rec_fid = cfid;
690         rc = dt_insert(env, child, (const struct dt_rec *)rec,
691                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
692         if (rc != 0)
693                 GOTO(unlock, rc);
694
695         /* 1b.3. insert dotdot into child dir */
696         rec->rec_fid = &LU_LPF_FID;
697         rc = dt_insert(env, child, (const struct dt_rec *)rec,
698                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
699         if (rc != 0)
700                 GOTO(unlock, rc);
701
702         /* 2b. increase child nlink */
703         rc = dt_ref_add(env, child, th);
704         if (rc != 0)
705                 GOTO(unlock, rc);
706
707         /* 3b. insert linkEA for child */
708         rc = dt_xattr_set(env, child, &linkea_buf,
709                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
710         if (rc != 0)
711                 GOTO(unlock, rc);
712
713         bk->lb_lpf_fid = *cfid;
714         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
715
716         /* 4b. update bookmark */
717         rc = dt_record_write(env, bk_obj,
718                              lfsck_buf_get(env, bk, len), &pos, th);
719
720         dt_write_unlock(env, child);
721         dt_trans_stop(env, dev, th);
722         if (rc != 0)
723                 RETURN(rc);
724
725         /* Transaction II: remotely */
726
727         dev = lfsck->li_next;
728         th = dt_trans_create(env, dev);
729         if (IS_ERR(th))
730                 RETURN(PTR_ERR(th));
731
732         /* 5a. insert name into parent dir */
733         rec->rec_fid = cfid;
734         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
735                                (const struct dt_key *)name, th);
736         if (rc != 0)
737                 GOTO(stop, rc);
738
739         /* 6a. increase parent nlink */
740         rc = dt_declare_ref_add(env, parent, th);
741         if (rc != 0)
742                 GOTO(stop, rc);
743
744         rc = dt_trans_start(env, dev, th);
745         if (rc != 0)
746                 GOTO(stop, rc);
747
748         /* 5b. insert name into parent dir */
749         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
750                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
751         if (rc != 0)
752                 GOTO(stop, rc);
753
754         dt_write_lock(env, parent, 0);
755         /* 6b. increase parent nlink */
756         rc = dt_ref_add(env, parent, th);
757         dt_write_unlock(env, parent);
758
759         GOTO(stop, rc);
760
761 unlock:
762         dt_write_unlock(env, child);
763 stop:
764         dt_trans_stop(env, dev, th);
765
766         if (rc != 0 && dev == lfsck->li_next)
767                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
768                        "for orphans, but failed to insert the name %s "
769                        "to the .lustre/lost+found/. Such inconsistency "
770                        "will be repaired when LFSCK run next time: rc = %d\n",
771                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
772
773         return rc;
774 }
775
776 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
777  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
778  * only when it is required, such as orphan OST-objects repairing. */
779 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
780 {
781         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
782         struct lfsck_thread_info *info  = lfsck_env_info(env);
783         struct lu_fid            *cfid  = &info->lti_fid2;
784         struct lu_attr           *la    = &info->lti_la;
785         struct dt_object_format  *dof   = &info->lti_dof;
786         struct dt_object         *parent = NULL;
787         struct dt_object         *child = NULL;
788         struct lustre_handle      lh    = { 0 };
789         char                      name[8];
790         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
791         int                       rc    = 0;
792         ENTRY;
793
794         LASSERT(lfsck->li_master);
795
796         sprintf(name, "MDT%04x", node);
797         if (node == 0) {
798                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
799                                                   &LU_LPF_FID);
800         } else {
801                 struct lfsck_tgt_desc *ltd;
802
803                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
804                 if (unlikely(ltd == NULL))
805                         RETURN(-ENXIO);
806
807                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
808                                                   &LU_LPF_FID);
809                 lfsck_tgt_put(ltd);
810         }
811         if (IS_ERR(parent))
812                 RETURN(PTR_ERR(parent));
813
814         if (lfsck->li_lpf_obj != NULL)
815                 GOTO(out, rc = 0);
816
817         if (unlikely(!dt_try_as_dir(env, parent)))
818                 GOTO(out, rc = -ENOTDIR);
819
820         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
821                               MDS_INODELOCK_UPDATE, LCK_EX);
822         if (rc != 0)
823                 GOTO(out, rc);
824
825         mutex_lock(&lfsck->li_mutex);
826         if (lfsck->li_lpf_obj != NULL)
827                 GOTO(unlock, rc = 0);
828
829         if (fid_is_zero(&bk->lb_lpf_fid)) {
830                 /* There is corner case that: in former LFSCK scanning we have
831                  * created the .lustre/lost+found/MDTxxxx but failed to update
832                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
833                  * it from MDT0 firstly. */
834                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
835                                (const struct dt_key *)name, BYPASS_CAPA);
836                 if (rc != 0 && rc != -ENOENT)
837                         GOTO(unlock, rc);
838
839                 if (rc == 0) {
840                         bk->lb_lpf_fid = *cfid;
841                         rc = lfsck_bookmark_store(env, lfsck);
842                 } else {
843                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
844                 }
845                 if (rc != 0)
846                         GOTO(unlock, rc);
847         } else {
848                 *cfid = bk->lb_lpf_fid;
849         }
850
851         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
852         if (IS_ERR(child))
853                 GOTO(unlock, rc = PTR_ERR(child));
854
855         if (dt_object_exists(child) != 0) {
856                 if (unlikely(!dt_try_as_dir(env, child)))
857                         rc = -ENOTDIR;
858                 else
859                         lfsck->li_lpf_obj = child;
860
861                 GOTO(unlock, rc);
862         }
863
864         memset(la, 0, sizeof(*la));
865         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
866         la->la_mode = S_IFDIR | S_IRWXU;
867         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
868                        LA_UID | LA_GID;
869         memset(dof, 0, sizeof(*dof));
870         dof->dof_type = dt_mode_to_dft(S_IFDIR);
871
872         if (node == 0)
873                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
874                                             dof, name);
875         else
876                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
877                                              dof, name);
878         if (rc == 0)
879                 lfsck->li_lpf_obj = child;
880
881         GOTO(unlock, rc);
882
883 unlock:
884         mutex_unlock(&lfsck->li_mutex);
885         lfsck_ibits_unlock(&lh, LCK_EX);
886         if (rc != 0 && child != NULL && !IS_ERR(child))
887                 lu_object_put(env, &child->do_lu);
888 out:
889         if (parent != NULL && !IS_ERR(parent))
890                 lu_object_put(env, &parent->do_lu);
891
892         return rc;
893 }
894
895 /**
896  * Scan .lustre/lost+found for bad name entries and remove them.
897  *
898  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
899  * index in the system. Any other formatted name is invalid and should be
900  * removed.
901  *
902  * \param[in] env       pointer to the thread context
903  * \param[in] lfsck     pointer to the lfsck instance
904  * \param[in] parent    pointer to the lost+found object
905  *
906  * \retval              0 for success
907  * \retval              negative error number on failure
908  */
909 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
910                                       struct lfsck_instance *lfsck,
911                                       struct dt_object *parent)
912 {
913         struct lu_dirent        *ent    =
914                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
915         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
916         struct dt_it            *it;
917         int                      rc;
918         ENTRY;
919
920         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
921         if (IS_ERR(it))
922                 RETURN(PTR_ERR(it));
923
924         rc = iops->load(env, it, 0);
925         if (rc == 0)
926                 rc = iops->next(env, it);
927         else if (rc > 0)
928                 rc = 0;
929
930         while (rc == 0) {
931                 int off = 3;
932
933                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
934                 if (rc != 0)
935                         break;
936
937                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
938                 if (ent->lde_name[0] == '.') {
939                         if (ent->lde_namelen == 1)
940                                 goto next;
941
942                         if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
943                                 goto next;
944                 }
945
946                 /* name length must be strlen("MDTxxxx") */
947                 if (ent->lde_namelen != 7)
948                         goto remove;
949
950                 if (memcmp(ent->lde_name, "MDT", off) != 0)
951                         goto remove;
952
953                 while (off < 7 && isxdigit(ent->lde_name[off]))
954                         off++;
955
956                 if (off != 7) {
957
958 remove:
959                         rc = lfsck_remove_name_entry(env, lfsck, parent,
960                                                      ent->lde_name, S_IFDIR);
961                         if (rc != 0)
962                                 break;
963                 }
964
965 next:
966                 rc = iops->next(env, it);
967         }
968
969         iops->put(env, it);
970         iops->fini(env, it);
971
972         RETURN(rc > 0 ? 0 : rc);
973 }
974
975 static int lfsck_update_lpf_entry(const struct lu_env *env,
976                                   struct lfsck_instance *lfsck,
977                                   struct dt_object *parent,
978                                   struct dt_object *child,
979                                   const char *name,
980                                   enum lfsck_verify_lpf_types type)
981 {
982         int rc;
983
984         if (type == LVLT_BY_BOOKMARK) {
985                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
986                                              lfsck_dto2fid(child), S_IFDIR);
987         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
988                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
989                 rc = lfsck_bookmark_store(env, lfsck);
990
991                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
992                        " in the bookmark file: rc = %d\n",
993                        lfsck_lfsck2name(lfsck),
994                        PFID(lfsck_dto2fid(child)), rc);
995         }
996
997         return rc;
998 }
999
1000 /**
1001  * Check whether the @child back references the @parent.
1002  *
1003  * Two cases:
1004  * 1) The child's FID is stored in the bookmark file. If the child back
1005  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1006  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1007  *    the child back references another parent2, then:
1008  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1009  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1010  *      references the child. So keep them there. As the LFSCK processing,
1011  *      the parent3 may be found, then when the LFSCK run next time, the
1012  *      inconsistency can be repaired.
1013  *
1014  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1015  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1016  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1017  *    back references another parent2, then:
1018  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1019  *      from .lustre/lost+found/;
1020  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1021  *      sub-directory name entry and update the child;
1022  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1023  *      or not, then keep them there.
1024  *
1025  * \param[in] env       pointer to the thread context
1026  * \param[in] lfsck     pointer to the lfsck instance
1027  * \param[in] parent    pointer to the lost+found object
1028  * \param[in] child     pointer to the lost+found sub-directory object
1029  * \param[in] name      the name for lost+found sub-directory object
1030  * \param[out] fid      pointer to the buffer to hold the FID of the object
1031  *                      (called it as parent2) that is referenced via the
1032  *                      child's dotdot entry; it also can be the FID that
1033  *                      is referenced by the name entry under the parent2.
1034  * \param[in] type      to indicate where the child's FID is stored in
1035  *
1036  * \retval              positive number for uncertain inconsistency
1037  * \retval              0 for success
1038  * \retval              negative error number on failure
1039  */
1040 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1041                                   struct lfsck_instance *lfsck,
1042                                   struct dt_object *parent,
1043                                   struct dt_object *child, const char *name,
1044                                   struct lu_fid *fid,
1045                                   enum lfsck_verify_lpf_types type)
1046 {
1047         struct lfsck_thread_info *info    = lfsck_env_info(env);
1048         char                     *name2   = info->lti_key;
1049         struct lu_fid            *fid2    = &info->lti_fid3;
1050         struct dt_object         *parent2 = NULL;
1051         struct lustre_handle      lh      = { 0 };
1052         int                       rc;
1053         ENTRY;
1054
1055         fid_zero(fid);
1056         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1057                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1058         if (rc != 0)
1059                 GOTO(linkea, rc);
1060
1061         if (!fid_is_sane(fid))
1062                 GOTO(linkea, rc = -EINVAL);
1063
1064         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1065                 const struct lu_name *cname;
1066
1067                 if (lfsck->li_lpf_obj == NULL) {
1068                         lu_object_get(&child->do_lu);
1069                         lfsck->li_lpf_obj = child;
1070                 }
1071
1072                 cname = lfsck_name_get_const(env, name, strlen(name));
1073                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1074                                          &LU_LPF_FID);
1075                 if (rc == 0)
1076                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1077                                                     name, type);
1078
1079                 GOTO(out_done, rc);
1080         }
1081
1082         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1083         if (IS_ERR(parent2))
1084                 GOTO(linkea, parent2);
1085
1086         if (!dt_object_exists(parent2)) {
1087                 lu_object_put(env, &parent2->do_lu);
1088
1089                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1090         }
1091
1092         if (!dt_try_as_dir(env, parent2)) {
1093                 lu_object_put(env, &parent2->do_lu);
1094
1095                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1096         }
1097
1098 linkea:
1099         /* To prevent rename/unlink race */
1100         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1101                               MDS_INODELOCK_UPDATE, LCK_PR);
1102         if (rc != 0)
1103                 GOTO(out_put, rc);
1104
1105         dt_read_lock(env, child, 0);
1106         rc = lfsck_links_get_first(env, child, name2, fid2);
1107         if (rc != 0) {
1108                 dt_read_unlock(env, child);
1109                 lfsck_ibits_unlock(&lh, LCK_PR);
1110
1111                 GOTO(out_put, rc = 1);
1112         }
1113
1114         /* It is almost impossible that the bookmark file (or the name entry)
1115          * and the linkEA hit the same data corruption. Trust the linkEA. */
1116         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1117                 dt_read_unlock(env, child);
1118                 lfsck_ibits_unlock(&lh, LCK_PR);
1119
1120                 *fid = *fid2;
1121                 if (lfsck->li_lpf_obj == NULL) {
1122                         lu_object_get(&child->do_lu);
1123                         lfsck->li_lpf_obj = child;
1124                 }
1125
1126                 /* Update the child's dotdot entry */
1127                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1128                                              &LU_LPF_FID, S_IFDIR);
1129                 if (rc == 0)
1130                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1131                                                     name, type);
1132
1133                 GOTO(out_put, rc);
1134         }
1135
1136         if (parent2 == NULL || IS_ERR(parent2)) {
1137                 dt_read_unlock(env, child);
1138                 lfsck_ibits_unlock(&lh, LCK_PR);
1139
1140                 GOTO(out_done, rc = 1);
1141         }
1142
1143         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1144                        (const struct dt_key *)name2, BYPASS_CAPA);
1145         dt_read_unlock(env, child);
1146         lfsck_ibits_unlock(&lh, LCK_PR);
1147         if (rc != 0 && rc != -ENOENT)
1148                 GOTO(out_put, rc);
1149
1150         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1151                 if (type == LVLT_BY_BOOKMARK)
1152                         GOTO(out_put, rc = 1);
1153
1154                 /* Trust the name entry, update the child's dotdot entry. */
1155                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1156                                              &LU_LPF_FID, S_IFDIR);
1157
1158                 GOTO(out_put, rc);
1159         }
1160
1161         if (type == LVLT_BY_BOOKMARK) {
1162                 /* Invalid FID record in the bookmark file, reset it. */
1163                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1164                 rc = lfsck_bookmark_store(env, lfsck);
1165
1166                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1167                        " in the bookmark file: rc = %d\n",
1168                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1169         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1170                 /* The name entry is wrong, remove it. */
1171                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1172         }
1173
1174         GOTO(out_put, rc);
1175
1176 out_put:
1177         if (parent2 != NULL && !IS_ERR(parent2))
1178                 lu_object_put(env, &parent2->do_lu);
1179
1180 out_done:
1181         return rc;
1182 }
1183
1184 /**
1185  * Verify the /ROOT/.lustre/lost+found/ directory.
1186  *
1187  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1188  * the LFSCK does not exactly know how to handle, such as orphans. So before
1189  * the LFSCK scanning the system, the consistency of such directory needs to
1190  * be verified firstly to allow the users to use it during the LFSCK.
1191  *
1192  * \param[in] env       pointer to the thread context
1193  * \param[in] lfsck     pointer to the lfsck instance
1194  *
1195  * \retval              positive number for uncertain inconsistency
1196  * \retval              0 for success
1197  * \retval              negative error number on failure
1198  */
1199 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1200 {
1201         struct lfsck_thread_info *info   = lfsck_env_info(env);
1202         struct lu_fid            *pfid   = &info->lti_fid;
1203         struct lu_fid            *cfid   = &info->lti_fid2;
1204         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1205         struct dt_object         *parent = NULL;
1206         /* child1's FID is in the bookmark file. */
1207         struct dt_object         *child1 = NULL;
1208         /* child2's FID is in the name entry MDTxxxx. */
1209         struct dt_object         *child2 = NULL;
1210         struct dt_device         *dev    = lfsck->li_bottom;
1211         const struct lu_name     *cname;
1212         char                      name[8];
1213         int                       node   = lfsck_dev_idx(dev);
1214         int                       rc     = 0;
1215         ENTRY;
1216
1217         LASSERT(lfsck->li_master);
1218
1219         if (node == 0) {
1220                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1221         } else {
1222                 struct lfsck_tgt_desc *ltd;
1223
1224                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1225                 if (unlikely(ltd == NULL))
1226                         RETURN(-ENXIO);
1227
1228                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1229                                                   &LU_LPF_FID);
1230                 lfsck_tgt_put(ltd);
1231         }
1232
1233         if (IS_ERR(parent))
1234                 RETURN(PTR_ERR(parent));
1235
1236         LASSERT(dt_object_exists(parent));
1237
1238         if (unlikely(!dt_try_as_dir(env, parent)))
1239                 GOTO(put, rc = -ENOTDIR);
1240
1241         if (node == 0) {
1242                 rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
1243                 if (rc != 0)
1244                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1245                                "for bad sub-directories: rc = %d\n",
1246                                lfsck_lfsck2name(lfsck), rc);
1247         }
1248
1249         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1250                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1251                         struct lu_fid tfid = bk->lb_lpf_fid;
1252
1253                         /* Invalid FID record in the bookmark file, reset it. */
1254                         fid_zero(&bk->lb_lpf_fid);
1255                         rc = lfsck_bookmark_store(env, lfsck);
1256
1257                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1258                                " in the bookmark file: rc = %d\n",
1259                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1260
1261                         if (rc != 0)
1262                                 GOTO(put, rc);
1263                 } else {
1264                         child1 = lfsck_object_find_by_dev(env, dev,
1265                                                           &bk->lb_lpf_fid);
1266                         if (IS_ERR(child1))
1267                                 GOTO(put, rc = PTR_ERR(child1));
1268
1269                         if (unlikely(!dt_object_exists(child1) ||
1270                                      dt_object_remote(child1)) ||
1271                                      !S_ISDIR(lfsck_object_type(child1))) {
1272                                 /* Invalid FID record in the bookmark file,
1273                                  * reset it. */
1274                                 fid_zero(&bk->lb_lpf_fid);
1275                                 rc = lfsck_bookmark_store(env, lfsck);
1276
1277                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1278                                        " in the bookmark file: rc = %d\n",
1279                                        lfsck_lfsck2name(lfsck),
1280                                        PFID(lfsck_dto2fid(child1)), rc);
1281
1282                                 if (rc != 0)
1283                                         GOTO(put, rc);
1284
1285                                 lu_object_put(env, &child1->do_lu);
1286                                 child1 = NULL;
1287                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1288                                 GOTO(put, rc = -ENOTDIR);
1289                         }
1290                 }
1291         }
1292
1293         snprintf(name, 8, "MDT%04x", node);
1294         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1295                        (const struct dt_key *)name, BYPASS_CAPA);
1296         if (rc == -ENOENT) {
1297                 if (!fid_is_zero(&bk->lb_lpf_fid))
1298                         goto check_child1;
1299
1300                 GOTO(put, rc = 0);
1301         }
1302
1303         if (rc != 0)
1304                 GOTO(put, rc);
1305
1306         /* Invalid FID in the name entry, remove the name entry. */
1307         if (!fid_is_norm(cfid)) {
1308                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1309                 if (rc != 0)
1310                         GOTO(put, rc);
1311
1312                 goto check_child1;
1313         }
1314
1315         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1316         if (IS_ERR(child2))
1317                 GOTO(put, rc = PTR_ERR(child2));
1318
1319         if (unlikely(!dt_object_exists(child2) ||
1320                      dt_object_remote(child2)) ||
1321                      !S_ISDIR(lfsck_object_type(child2))) {
1322                 rc = lfsck_remove_name_entry(env, lfsck, parent, name,
1323                                              S_IFDIR);
1324                 if (rc != 0)
1325                         GOTO(put, rc);
1326
1327                 goto check_child1;
1328         }
1329
1330         if (unlikely(!dt_try_as_dir(env, child2)))
1331                 GOTO(put, rc = -ENOTDIR);
1332
1333         if (child1 == NULL) {
1334                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
1335                                             pfid, LVLT_BY_NAMEENTRY);
1336         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1337                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1338                                             pfid, LVLT_BY_BOOKMARK);
1339                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1340                         rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
1341                                                     name, pfid,
1342                                                     LVLT_BY_NAMEENTRY);
1343         } else {
1344                 if (lfsck->li_lpf_obj == NULL) {
1345                         lu_object_get(&child2->do_lu);
1346                         lfsck->li_lpf_obj = child2;
1347                 }
1348
1349                 cname = lfsck_name_get_const(env, name, strlen(name));
1350                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1351         }
1352
1353         GOTO(put, rc);
1354
1355 check_child1:
1356         if (child1 != NULL)
1357                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1358                                             pfid, LVLT_BY_BOOKMARK);
1359
1360         GOTO(put, rc);
1361
1362 put:
1363         if (lfsck->li_lpf_obj != NULL &&
1364             unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
1365                 rc = -ENOTDIR;
1366
1367         if (child2 != NULL && !IS_ERR(child2))
1368                 lu_object_put(env, &child2->do_lu);
1369         if (child1 != NULL && !IS_ERR(child1))
1370                 lu_object_put(env, &child1->do_lu);
1371         if (parent != NULL && !IS_ERR(parent))
1372                 lu_object_put(env, &parent->do_lu);
1373
1374         return rc;
1375 }
1376
1377 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1378 {
1379         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1380         struct seq_server_site  *ss;
1381         char                    *prefix;
1382         int                      rc     = 0;
1383         ENTRY;
1384
1385         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1386         if (unlikely(ss == NULL))
1387                 RETURN(-ENXIO);
1388
1389         OBD_ALLOC_PTR(lfsck->li_seq);
1390         if (lfsck->li_seq == NULL)
1391                 RETURN(-ENOMEM);
1392
1393         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1394         if (prefix == NULL)
1395                 GOTO(out, rc = -ENOMEM);
1396
1397         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1398         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1399                              ss->ss_server_seq);
1400         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1401         if (rc != 0)
1402                 GOTO(out, rc);
1403
1404         if (fid_is_sane(&bk->lb_last_fid))
1405                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1406
1407         RETURN(0);
1408
1409 out:
1410         OBD_FREE_PTR(lfsck->li_seq);
1411         lfsck->li_seq = NULL;
1412
1413         return rc;
1414 }
1415
1416 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1417 {
1418         if (lfsck->li_seq != NULL) {
1419                 seq_client_fini(lfsck->li_seq);
1420                 OBD_FREE_PTR(lfsck->li_seq);
1421                 lfsck->li_seq = NULL;
1422         }
1423 }
1424
1425 void lfsck_instance_cleanup(const struct lu_env *env,
1426                             struct lfsck_instance *lfsck)
1427 {
1428         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1429         struct lfsck_component  *com;
1430         struct lfsck_component  *next;
1431         ENTRY;
1432
1433         LASSERT(list_empty(&lfsck->li_link));
1434         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1435
1436         if (lfsck->li_obj_oit != NULL) {
1437                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1438                 lfsck->li_obj_oit = NULL;
1439         }
1440
1441         LASSERT(lfsck->li_obj_dir == NULL);
1442
1443         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1444                 lfsck_component_cleanup(env, com);
1445         }
1446
1447         LASSERT(list_empty(&lfsck->li_list_dir));
1448
1449         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1450                                  lc_link) {
1451                 lfsck_component_cleanup(env, com);
1452         }
1453
1454         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1455                 lfsck_component_cleanup(env, com);
1456         }
1457
1458         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1459         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1460
1461         if (lfsck->li_bookmark_obj != NULL) {
1462                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1463                 lfsck->li_bookmark_obj = NULL;
1464         }
1465
1466         if (lfsck->li_lpf_obj != NULL) {
1467                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1468                 lfsck->li_lpf_obj = NULL;
1469         }
1470
1471         if (lfsck->li_los != NULL) {
1472                 local_oid_storage_fini(env, lfsck->li_los);
1473                 lfsck->li_los = NULL;
1474         }
1475
1476         lfsck_fid_fini(lfsck);
1477
1478         OBD_FREE_PTR(lfsck);
1479 }
1480
1481 static inline struct lfsck_instance *
1482 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1483 {
1484         struct lfsck_instance *lfsck;
1485
1486         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1487                 if (lfsck->li_bottom == key) {
1488                         if (ref)
1489                                 lfsck_instance_get(lfsck);
1490                         if (unlink)
1491                                 list_del_init(&lfsck->li_link);
1492
1493                         return lfsck;
1494                 }
1495         }
1496
1497         return NULL;
1498 }
1499
1500 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1501                                            bool unlink)
1502 {
1503         struct lfsck_instance *lfsck;
1504
1505         spin_lock(&lfsck_instance_lock);
1506         lfsck = __lfsck_instance_find(key, ref, unlink);
1507         spin_unlock(&lfsck_instance_lock);
1508
1509         return lfsck;
1510 }
1511
1512 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1513 {
1514         struct lfsck_instance *tmp;
1515
1516         spin_lock(&lfsck_instance_lock);
1517         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1518                 if (lfsck->li_bottom == tmp->li_bottom) {
1519                         spin_unlock(&lfsck_instance_lock);
1520                         return -EEXIST;
1521                 }
1522         }
1523
1524         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1525         spin_unlock(&lfsck_instance_lock);
1526         return 0;
1527 }
1528
1529 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1530                     const char *prefix)
1531 {
1532         int flag;
1533         int i;
1534         bool newline = (bits != 0 ? false : true);
1535
1536         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1537
1538         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1539                 if (flag & bits) {
1540                         bits &= ~flag;
1541                         if (names[i] != NULL) {
1542                                 if (bits == 0)
1543                                         newline = true;
1544
1545                                 seq_printf(m, "%s%c", names[i],
1546                                            newline ? '\n' : ',');
1547                         }
1548                 }
1549         }
1550
1551         if (!newline)
1552                 seq_printf(m, "\n");
1553         return 0;
1554 }
1555
1556 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1557 {
1558         if (time != 0)
1559                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1560                           cfs_time_current_sec() - time);
1561         else
1562                 seq_printf(m, "%s: N/A\n", prefix);
1563         return 0;
1564 }
1565
1566 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1567                    const char *prefix)
1568 {
1569         if (fid_is_zero(&pos->lp_dir_parent)) {
1570                 if (pos->lp_oit_cookie == 0)
1571                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1572                                    prefix);
1573                 else
1574                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1575                                    prefix, pos->lp_oit_cookie);
1576         } else {
1577                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1578                            prefix, pos->lp_oit_cookie,
1579                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1580         }
1581         return 0;
1582 }
1583
1584 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1585                     struct lfsck_position *pos, bool init)
1586 {
1587         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1588
1589         if (unlikely(lfsck->li_di_oit == NULL)) {
1590                 memset(pos, 0, sizeof(*pos));
1591                 return;
1592         }
1593
1594         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1595         if (!lfsck->li_current_oit_processed && !init)
1596                 pos->lp_oit_cookie--;
1597
1598         LASSERT(pos->lp_oit_cookie > 0);
1599
1600         if (lfsck->li_di_dir != NULL) {
1601                 struct dt_object *dto = lfsck->li_obj_dir;
1602
1603                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1604                                                         lfsck->li_di_dir);
1605
1606                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1607                         fid_zero(&pos->lp_dir_parent);
1608                         pos->lp_dir_cookie = 0;
1609                 } else {
1610                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1611                 }
1612         } else {
1613                 fid_zero(&pos->lp_dir_parent);
1614                 pos->lp_dir_cookie = 0;
1615         }
1616 }
1617
1618 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1619 {
1620         bool dirty = false;
1621
1622         if (limit != LFSCK_SPEED_NO_LIMIT) {
1623                 if (limit > HZ) {
1624                         lfsck->li_sleep_rate = limit / HZ;
1625                         lfsck->li_sleep_jif = 1;
1626                 } else {
1627                         lfsck->li_sleep_rate = 1;
1628                         lfsck->li_sleep_jif = HZ / limit;
1629                 }
1630         } else {
1631                 lfsck->li_sleep_jif = 0;
1632                 lfsck->li_sleep_rate = 0;
1633         }
1634
1635         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1636                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1637                 dirty = true;
1638         }
1639
1640         return dirty;
1641 }
1642
1643 void lfsck_control_speed(struct lfsck_instance *lfsck)
1644 {
1645         struct ptlrpc_thread *thread = &lfsck->li_thread;
1646         struct l_wait_info    lwi;
1647
1648         if (lfsck->li_sleep_jif > 0 &&
1649             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1650                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1651                                        LWI_ON_SIGNAL_NOOP, NULL);
1652
1653                 l_wait_event(thread->t_ctl_waitq,
1654                              !thread_is_running(thread),
1655                              &lwi);
1656                 lfsck->li_new_scanned = 0;
1657         }
1658 }
1659
1660 void lfsck_control_speed_by_self(struct lfsck_component *com)
1661 {
1662         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1663         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1664         struct l_wait_info       lwi;
1665
1666         if (lfsck->li_sleep_jif > 0 &&
1667             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1668                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1669                                        LWI_ON_SIGNAL_NOOP, NULL);
1670
1671                 l_wait_event(thread->t_ctl_waitq,
1672                              !thread_is_running(thread),
1673                              &lwi);
1674                 com->lc_new_scanned = 0;
1675         }
1676 }
1677
1678 static struct lfsck_thread_args *
1679 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1680                        struct lfsck_component *com,
1681                        struct lfsck_start_param *lsp)
1682 {
1683         struct lfsck_thread_args *lta;
1684         int                       rc;
1685
1686         OBD_ALLOC_PTR(lta);
1687         if (lta == NULL)
1688                 return ERR_PTR(-ENOMEM);
1689
1690         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1691         if (rc != 0) {
1692                 OBD_FREE_PTR(lta);
1693                 return ERR_PTR(rc);
1694         }
1695
1696         lta->lta_lfsck = lfsck_instance_get(lfsck);
1697         if (com != NULL)
1698                 lta->lta_com = lfsck_component_get(com);
1699
1700         lta->lta_lsp = lsp;
1701
1702         return lta;
1703 }
1704
1705 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1706 {
1707         if (lta->lta_com != NULL)
1708                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1709         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1710         lu_env_fini(&lta->lta_env);
1711         OBD_FREE_PTR(lta);
1712 }
1713
1714 struct lfsck_assistant_data *
1715 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1716                           const char *name)
1717 {
1718         struct lfsck_assistant_data *lad;
1719
1720         OBD_ALLOC_PTR(lad);
1721         if (lad != NULL) {
1722                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1723                 if (lad->lad_bitmap == NULL) {
1724                         OBD_FREE_PTR(lad);
1725                         return NULL;
1726                 }
1727
1728                 INIT_LIST_HEAD(&lad->lad_req_list);
1729                 spin_lock_init(&lad->lad_lock);
1730                 INIT_LIST_HEAD(&lad->lad_ost_list);
1731                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1732                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1733                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1734                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1735                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1736                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1737                 lad->lad_ops = lao;
1738                 lad->lad_name = name;
1739         }
1740
1741         return lad;
1742 }
1743
1744 /**
1745  * Generic LFSCK asynchronous communication interpretor function.
1746  * The LFSCK RPC reply for both the event notification and status
1747  * querying will be handled here.
1748  *
1749  * \param[in] env       pointer to the thread context
1750  * \param[in] req       pointer to the LFSCK request
1751  * \param[in] args      pointer to the lfsck_async_interpret_args
1752  * \param[in] rc        the result for handling the LFSCK request
1753  *
1754  * \retval              0 for success
1755  * \retval              negative error number on failure
1756  */
1757 int lfsck_async_interpret_common(const struct lu_env *env,
1758                                  struct ptlrpc_request *req,
1759                                  void *args, int rc)
1760 {
1761         struct lfsck_async_interpret_args *laia = args;
1762         struct lfsck_component            *com  = laia->laia_com;
1763         struct lfsck_assistant_data       *lad  = com->lc_data;
1764         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1765         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1766         struct lfsck_request              *lr   = laia->laia_lr;
1767
1768         LASSERT(com->lc_lfsck->li_master);
1769
1770         switch (lr->lr_event) {
1771         case LE_START:
1772                 if (rc != 0) {
1773                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1774                                "start: rc = %d\n",
1775                                lfsck_lfsck2name(com->lc_lfsck),
1776                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1777                                ltd->ltd_index, lad->lad_name, rc);
1778
1779                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1780                                 struct lfsck_layout *lo = com->lc_file_ram;
1781
1782                                 if (lr->lr_flags & LEF_TO_OST)
1783                                         lfsck_lad_set_bitmap(env, com,
1784                                                              ltd->ltd_index);
1785                                 else
1786                                         lo->ll_flags |= LF_INCOMPLETE;
1787                         } else {
1788                                 struct lfsck_namespace *ns = com->lc_file_ram;
1789
1790                                 /* If some MDT does not join the namespace
1791                                  * LFSCK, then we cannot know whether there
1792                                  * is some name entry on such MDT that with
1793                                  * the referenced MDT-object on this MDT or
1794                                  * not. So the namespace LFSCK on this MDT
1795                                  * cannot handle orphan MDT-objects properly.
1796                                  * So we mark the LFSCK as LF_INCOMPLETE and
1797                                  * skip orphan MDT-objects handling. */
1798                                 ns->ln_flags |= LF_INCOMPLETE;
1799                         }
1800                         break;
1801                 }
1802
1803                 spin_lock(&ltds->ltd_lock);
1804                 if (ltd->ltd_dead) {
1805                         spin_unlock(&ltds->ltd_lock);
1806                         break;
1807                 }
1808
1809                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1810                         struct list_head *list;
1811                         struct list_head *phase_list;
1812
1813                         if (ltd->ltd_layout_done) {
1814                                 spin_unlock(&ltds->ltd_lock);
1815                                 break;
1816                         }
1817
1818                         if (lr->lr_flags & LEF_TO_OST) {
1819                                 list = &lad->lad_ost_list;
1820                                 phase_list = &lad->lad_ost_phase1_list;
1821                         } else {
1822                                 list = &lad->lad_mdt_list;
1823                                 phase_list = &lad->lad_mdt_phase1_list;
1824                         }
1825
1826                         if (list_empty(&ltd->ltd_layout_list))
1827                                 list_add_tail(&ltd->ltd_layout_list, list);
1828                         if (list_empty(&ltd->ltd_layout_phase_list))
1829                                 list_add_tail(&ltd->ltd_layout_phase_list,
1830                                               phase_list);
1831                 } else {
1832                         if (ltd->ltd_namespace_done) {
1833                                 spin_unlock(&ltds->ltd_lock);
1834                                 break;
1835                         }
1836
1837                         if (list_empty(&ltd->ltd_namespace_list))
1838                                 list_add_tail(&ltd->ltd_namespace_list,
1839                                               &lad->lad_mdt_list);
1840                         if (list_empty(&ltd->ltd_namespace_phase_list))
1841                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1842                                               &lad->lad_mdt_phase1_list);
1843                 }
1844                 spin_unlock(&ltds->ltd_lock);
1845                 break;
1846         case LE_STOP:
1847         case LE_PHASE1_DONE:
1848         case LE_PHASE2_DONE:
1849         case LE_PEER_EXIT:
1850                 if (rc != 0 && rc != -EALREADY)
1851                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1852                               "event = %d, rc = %d\n",
1853                               lfsck_lfsck2name(com->lc_lfsck),
1854                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1855                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1856                 break;
1857         case LE_QUERY: {
1858                 struct lfsck_reply *reply;
1859                 struct list_head *list;
1860                 struct list_head *phase_list;
1861
1862                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1863                         list = &ltd->ltd_layout_list;
1864                         phase_list = &ltd->ltd_layout_phase_list;
1865                 } else {
1866                         list = &ltd->ltd_namespace_list;
1867                         phase_list = &ltd->ltd_namespace_phase_list;
1868                 }
1869
1870                 if (rc != 0) {
1871                         spin_lock(&ltds->ltd_lock);
1872                         list_del_init(phase_list);
1873                         list_del_init(list);
1874                         spin_unlock(&ltds->ltd_lock);
1875                         break;
1876                 }
1877
1878                 reply = req_capsule_server_get(&req->rq_pill,
1879                                                &RMF_LFSCK_REPLY);
1880                 if (reply == NULL) {
1881                         rc = -EPROTO;
1882                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1883                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1884                                lad->lad_name, rc);
1885                         spin_lock(&ltds->ltd_lock);
1886                         list_del_init(phase_list);
1887                         list_del_init(list);
1888                         spin_unlock(&ltds->ltd_lock);
1889                         break;
1890                 }
1891
1892                 switch (reply->lr_status) {
1893                 case LS_SCANNING_PHASE1:
1894                         break;
1895                 case LS_SCANNING_PHASE2:
1896                         spin_lock(&ltds->ltd_lock);
1897                         list_del_init(phase_list);
1898                         if (ltd->ltd_dead) {
1899                                 spin_unlock(&ltds->ltd_lock);
1900                                 break;
1901                         }
1902
1903                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1904                                 if (ltd->ltd_layout_done) {
1905                                         spin_unlock(&ltds->ltd_lock);
1906                                         break;
1907                                 }
1908
1909                                 if (lr->lr_flags & LEF_TO_OST)
1910                                         list_add_tail(phase_list,
1911                                                 &lad->lad_ost_phase2_list);
1912                                 else
1913                                         list_add_tail(phase_list,
1914                                                 &lad->lad_mdt_phase2_list);
1915                         } else {
1916                                 if (ltd->ltd_namespace_done) {
1917                                         spin_unlock(&ltds->ltd_lock);
1918                                         break;
1919                                 }
1920
1921                                 list_add_tail(phase_list,
1922                                               &lad->lad_mdt_phase2_list);
1923                         }
1924                         spin_unlock(&ltds->ltd_lock);
1925                         break;
1926                 default:
1927                         spin_lock(&ltds->ltd_lock);
1928                         list_del_init(phase_list);
1929                         list_del_init(list);
1930                         spin_unlock(&ltds->ltd_lock);
1931                         break;
1932                 }
1933                 break;
1934         }
1935         default:
1936                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
1937                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1938                 break;
1939         }
1940
1941         if (!laia->laia_shared) {
1942                 lfsck_tgt_put(ltd);
1943                 lfsck_component_put(env, com);
1944         }
1945
1946         return 0;
1947 }
1948
1949 static void lfsck_interpret(const struct lu_env *env,
1950                             struct lfsck_instance *lfsck,
1951                             struct ptlrpc_request *req, void *args, int result)
1952 {
1953         struct lfsck_async_interpret_args *laia = args;
1954         struct lfsck_component            *com;
1955
1956         LASSERT(laia->laia_com == NULL);
1957         LASSERT(laia->laia_shared);
1958
1959         spin_lock(&lfsck->li_lock);
1960         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1961                 laia->laia_com = com;
1962                 lfsck_async_interpret_common(env, req, laia, result);
1963         }
1964
1965         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1966                 laia->laia_com = com;
1967                 lfsck_async_interpret_common(env, req, laia, result);
1968         }
1969         spin_unlock(&lfsck->li_lock);
1970 }
1971
1972 static int lfsck_stop_notify(const struct lu_env *env,
1973                              struct lfsck_instance *lfsck,
1974                              struct lfsck_tgt_descs *ltds,
1975                              struct lfsck_tgt_desc *ltd, __u16 type)
1976 {
1977         struct lfsck_component *com;
1978         int                     rc = 0;
1979         ENTRY;
1980
1981         LASSERT(lfsck->li_master);
1982
1983         spin_lock(&lfsck->li_lock);
1984         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1985         if (com == NULL)
1986                 com = __lfsck_component_find(lfsck, type,
1987                                              &lfsck->li_list_double_scan);
1988         if (com != NULL)
1989                 lfsck_component_get(com);
1990         spin_unlock(&lfsck->li_lock);
1991
1992         if (com != NULL) {
1993                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
1994                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1995                 struct lfsck_request              *lr    = &info->lti_lr;
1996                 struct lfsck_assistant_data       *lad   = com->lc_data;
1997                 struct list_head                  *list;
1998                 struct list_head                  *phase_list;
1999                 struct ptlrpc_request_set         *set;
2000
2001                 set = ptlrpc_prep_set();
2002                 if (set == NULL) {
2003                         lfsck_component_put(env, com);
2004
2005                         RETURN(-ENOMEM);
2006                 }
2007
2008                 if (type == LFSCK_TYPE_LAYOUT) {
2009                         list = &ltd->ltd_layout_list;
2010                         phase_list = &ltd->ltd_layout_phase_list;
2011                 } else {
2012                         list = &ltd->ltd_namespace_list;
2013                         phase_list = &ltd->ltd_namespace_phase_list;
2014                 }
2015
2016                 spin_lock(&ltds->ltd_lock);
2017                 if (list_empty(list)) {
2018                         LASSERT(list_empty(phase_list));
2019                         spin_unlock(&ltds->ltd_lock);
2020                         ptlrpc_set_destroy(set);
2021
2022                         RETURN(0);
2023                 }
2024
2025                 list_del_init(phase_list);
2026                 list_del_init(list);
2027                 spin_unlock(&ltds->ltd_lock);
2028
2029                 memset(lr, 0, sizeof(*lr));
2030                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2031                 lr->lr_event = LE_PEER_EXIT;
2032                 lr->lr_active = type;
2033                 lr->lr_status = LS_CO_PAUSED;
2034                 if (ltds == &lfsck->li_ost_descs)
2035                         lr->lr_flags = LEF_TO_OST;
2036
2037                 laia->laia_com = com;
2038                 laia->laia_ltds = ltds;
2039                 atomic_inc(&ltd->ltd_ref);
2040                 laia->laia_ltd = ltd;
2041                 laia->laia_lr = lr;
2042                 laia->laia_shared = 0;
2043
2044                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2045                                          lfsck_async_interpret_common,
2046                                          laia, LFSCK_NOTIFY);
2047                 if (rc != 0) {
2048                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2049                                "co-stop for %s: rc = %d\n",
2050                                lfsck_lfsck2name(lfsck),
2051                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2052                                ltd->ltd_index, lad->lad_name, rc);
2053                         lfsck_tgt_put(ltd);
2054                 } else {
2055                         rc = ptlrpc_set_wait(set);
2056                 }
2057
2058                 ptlrpc_set_destroy(set);
2059                 lfsck_component_put(env, com);
2060         }
2061
2062         RETURN(rc);
2063 }
2064
2065 static int lfsck_async_interpret(const struct lu_env *env,
2066                                  struct ptlrpc_request *req,
2067                                  void *args, int rc)
2068 {
2069         struct lfsck_async_interpret_args *laia = args;
2070         struct lfsck_instance             *lfsck;
2071
2072         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2073                               li_mdt_descs);
2074         lfsck_interpret(env, lfsck, req, laia, rc);
2075         lfsck_tgt_put(laia->laia_ltd);
2076         if (rc != 0 && laia->laia_result != -EALREADY)
2077                 laia->laia_result = rc;
2078
2079         return 0;
2080 }
2081
2082 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2083                         struct lfsck_request *lr,
2084                         struct ptlrpc_request_set *set,
2085                         ptlrpc_interpterer_t interpreter,
2086                         void *args, int request)
2087 {
2088         struct lfsck_async_interpret_args *laia;
2089         struct ptlrpc_request             *req;
2090         struct lfsck_request              *tmp;
2091         struct req_format                 *format;
2092         int                                rc;
2093
2094         switch (request) {
2095         case LFSCK_NOTIFY:
2096                 format = &RQF_LFSCK_NOTIFY;
2097                 break;
2098         case LFSCK_QUERY:
2099                 format = &RQF_LFSCK_QUERY;
2100                 break;
2101         default:
2102                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2103                        exp->exp_obd->obd_name, request, -EINVAL);
2104                 return -EINVAL;
2105         }
2106
2107         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2108         if (req == NULL)
2109                 return -ENOMEM;
2110
2111         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2112         if (rc != 0) {
2113                 ptlrpc_request_free(req);
2114
2115                 return rc;
2116         }
2117
2118         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2119         *tmp = *lr;
2120         ptlrpc_request_set_replen(req);
2121
2122         laia = ptlrpc_req_async_args(req);
2123         *laia = *(struct lfsck_async_interpret_args *)args;
2124         if (laia->laia_com != NULL)
2125                 lfsck_component_get(laia->laia_com);
2126         req->rq_interpret_reply = interpreter;
2127         ptlrpc_set_add_req(set, req);
2128
2129         return 0;
2130 }
2131
2132 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2133                           struct lfsck_start_param *lsp)
2134 {
2135         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2136         struct lfsck_assistant_data     *lad     = com->lc_data;
2137         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2138         struct ptlrpc_thread            *athread = &lad->lad_thread;
2139         struct lfsck_thread_args        *lta;
2140         struct task_struct              *task;
2141         int                              rc;
2142         ENTRY;
2143
2144         lad->lad_assistant_status = 0;
2145         lad->lad_post_result = 0;
2146         lad->lad_to_post = 0;
2147         lad->lad_to_double_scan = 0;
2148         lad->lad_in_double_scan = 0;
2149         lad->lad_exit = 0;
2150         thread_set_flags(athread, 0);
2151
2152         lta = lfsck_thread_args_init(lfsck, com, lsp);
2153         if (IS_ERR(lta))
2154                 RETURN(PTR_ERR(lta));
2155
2156         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2157         if (IS_ERR(task)) {
2158                 rc = PTR_ERR(task);
2159                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2160                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2161                 lfsck_thread_args_fini(lta);
2162         } else {
2163                 struct l_wait_info lwi = { 0 };
2164
2165                 l_wait_event(mthread->t_ctl_waitq,
2166                              thread_is_running(athread) ||
2167                              thread_is_stopped(athread),
2168                              &lwi);
2169                 if (unlikely(!thread_is_running(athread)))
2170                         rc = lad->lad_assistant_status;
2171                 else
2172                         rc = 0;
2173         }
2174
2175         RETURN(rc);
2176 }
2177
2178 int lfsck_checkpoint_generic(const struct lu_env *env,
2179                              struct lfsck_component *com)
2180 {
2181         struct lfsck_assistant_data     *lad     = com->lc_data;
2182         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2183         struct ptlrpc_thread            *athread = &lad->lad_thread;
2184         struct l_wait_info               lwi     = { 0 };
2185
2186         if (com->lc_new_checked == 0)
2187                 return LFSCK_CHECKPOINT_SKIP;
2188
2189         l_wait_event(mthread->t_ctl_waitq,
2190                      list_empty(&lad->lad_req_list) ||
2191                      !thread_is_running(mthread) ||
2192                      thread_is_stopped(athread),
2193                      &lwi);
2194
2195         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2196                 return LFSCK_CHECKPOINT_SKIP;
2197
2198         return 0;
2199 }
2200
2201 void lfsck_post_generic(const struct lu_env *env,
2202                         struct lfsck_component *com, int *result)
2203 {
2204         struct lfsck_assistant_data     *lad     = com->lc_data;
2205         struct ptlrpc_thread            *athread = &lad->lad_thread;
2206         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2207         struct l_wait_info               lwi     = { 0 };
2208
2209         lad->lad_post_result = *result;
2210         if (*result <= 0)
2211                 lad->lad_exit = 1;
2212         lad->lad_to_post = 1;
2213
2214         wake_up_all(&athread->t_ctl_waitq);
2215         l_wait_event(mthread->t_ctl_waitq,
2216                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2217                      thread_is_stopped(athread),
2218                      &lwi);
2219
2220         if (lad->lad_assistant_status < 0)
2221                 *result = lad->lad_assistant_status;
2222 }
2223
2224 int lfsck_double_scan_generic(const struct lu_env *env,
2225                               struct lfsck_component *com, int status)
2226 {
2227         struct lfsck_assistant_data     *lad     = com->lc_data;
2228         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2229         struct ptlrpc_thread            *athread = &lad->lad_thread;
2230         struct l_wait_info               lwi     = { 0 };
2231
2232         if (status != LS_SCANNING_PHASE2)
2233                 lad->lad_exit = 1;
2234         else
2235                 lad->lad_to_double_scan = 1;
2236
2237         wake_up_all(&athread->t_ctl_waitq);
2238         l_wait_event(mthread->t_ctl_waitq,
2239                      lad->lad_in_double_scan ||
2240                      thread_is_stopped(athread),
2241                      &lwi);
2242
2243         if (lad->lad_assistant_status < 0)
2244                 return lad->lad_assistant_status;
2245
2246         return 0;
2247 }
2248
2249 void lfsck_quit_generic(const struct lu_env *env,
2250                         struct lfsck_component *com)
2251 {
2252         struct lfsck_assistant_data     *lad     = com->lc_data;
2253         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2254         struct ptlrpc_thread            *athread = &lad->lad_thread;
2255         struct l_wait_info               lwi     = { 0 };
2256
2257         lad->lad_exit = 1;
2258         wake_up_all(&athread->t_ctl_waitq);
2259         l_wait_event(mthread->t_ctl_waitq,
2260                      thread_is_init(athread) ||
2261                      thread_is_stopped(athread),
2262                      &lwi);
2263 }
2264
2265 /* external interfaces */
2266
2267 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2268 {
2269         struct lu_env           env;
2270         struct lfsck_instance  *lfsck;
2271         int                     rc;
2272         ENTRY;
2273
2274         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2275         if (rc != 0)
2276                 RETURN(rc);
2277
2278         lfsck = lfsck_instance_find(key, true, false);
2279         if (likely(lfsck != NULL)) {
2280                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2281                 lfsck_instance_put(&env, lfsck);
2282         } else {
2283                 rc = -ENXIO;
2284         }
2285
2286         lu_env_fini(&env);
2287
2288         RETURN(rc);
2289 }
2290 EXPORT_SYMBOL(lfsck_get_speed);
2291
2292 int lfsck_set_speed(struct dt_device *key, int val)
2293 {
2294         struct lu_env           env;
2295         struct lfsck_instance  *lfsck;
2296         int                     rc;
2297         ENTRY;
2298
2299         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2300         if (rc != 0)
2301                 RETURN(rc);
2302
2303         lfsck = lfsck_instance_find(key, true, false);
2304         if (likely(lfsck != NULL)) {
2305                 mutex_lock(&lfsck->li_mutex);
2306                 if (__lfsck_set_speed(lfsck, val))
2307                         rc = lfsck_bookmark_store(&env, lfsck);
2308                 mutex_unlock(&lfsck->li_mutex);
2309                 lfsck_instance_put(&env, lfsck);
2310         } else {
2311                 rc = -ENXIO;
2312         }
2313
2314         lu_env_fini(&env);
2315
2316         RETURN(rc);
2317 }
2318 EXPORT_SYMBOL(lfsck_set_speed);
2319
2320 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2321 {
2322         struct lu_env           env;
2323         struct lfsck_instance  *lfsck;
2324         int                     rc;
2325         ENTRY;
2326
2327         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2328         if (rc != 0)
2329                 RETURN(rc);
2330
2331         lfsck = lfsck_instance_find(key, true, false);
2332         if (likely(lfsck != NULL)) {
2333                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2334                 lfsck_instance_put(&env, lfsck);
2335         } else {
2336                 rc = -ENXIO;
2337         }
2338
2339         lu_env_fini(&env);
2340
2341         RETURN(rc);
2342 }
2343 EXPORT_SYMBOL(lfsck_get_windows);
2344
2345 int lfsck_set_windows(struct dt_device *key, int val)
2346 {
2347         struct lu_env           env;
2348         struct lfsck_instance  *lfsck;
2349         int                     rc;
2350         ENTRY;
2351
2352         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2353         if (rc != 0)
2354                 RETURN(rc);
2355
2356         lfsck = lfsck_instance_find(key, true, false);
2357         if (likely(lfsck != NULL)) {
2358                 if (val > LFSCK_ASYNC_WIN_MAX) {
2359                         CWARN("%s: Too large async window size, which "
2360                               "may cause memory issues. The valid range "
2361                               "is [0 - %u]. If you do not want to restrict "
2362                               "the window size for async requests pipeline, "
2363                               "just set it as 0.\n",
2364                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2365                         rc = -EINVAL;
2366                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2367                         mutex_lock(&lfsck->li_mutex);
2368                         lfsck->li_bookmark_ram.lb_async_windows = val;
2369                         rc = lfsck_bookmark_store(&env, lfsck);
2370                         mutex_unlock(&lfsck->li_mutex);
2371                 }
2372                 lfsck_instance_put(&env, lfsck);
2373         } else {
2374                 rc = -ENXIO;
2375         }
2376
2377         lu_env_fini(&env);
2378
2379         RETURN(rc);
2380 }
2381 EXPORT_SYMBOL(lfsck_set_windows);
2382
2383 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2384 {
2385         struct lu_env           env;
2386         struct lfsck_instance  *lfsck;
2387         struct lfsck_component *com;
2388         int                     rc;
2389         ENTRY;
2390
2391         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2392         if (rc != 0)
2393                 RETURN(rc);
2394
2395         lfsck = lfsck_instance_find(key, true, false);
2396         if (likely(lfsck != NULL)) {
2397                 com = lfsck_component_find(lfsck, type);
2398                 if (likely(com != NULL)) {
2399                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2400                         lfsck_component_put(&env, com);
2401                 } else {
2402                         rc = -ENOTSUPP;
2403                 }
2404
2405                 lfsck_instance_put(&env, lfsck);
2406         } else {
2407                 rc = -ENXIO;
2408         }
2409
2410         lu_env_fini(&env);
2411
2412         RETURN(rc);
2413 }
2414 EXPORT_SYMBOL(lfsck_dump);
2415
2416 static int lfsck_stop_all(const struct lu_env *env,
2417                           struct lfsck_instance *lfsck,
2418                           struct lfsck_stop *stop)
2419 {
2420         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2421         struct lfsck_request              *lr     = &info->lti_lr;
2422         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2423         struct ptlrpc_request_set         *set;
2424         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2425         struct lfsck_tgt_desc             *ltd;
2426         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2427         __u32                              idx;
2428         int                                rc     = 0;
2429         int                                rc1    = 0;
2430         ENTRY;
2431
2432         LASSERT(stop->ls_flags & LPF_BROADCAST);
2433
2434         set = ptlrpc_prep_set();
2435         if (unlikely(set == NULL))
2436                 RETURN(-ENOMEM);
2437
2438         memset(lr, 0, sizeof(*lr));
2439         lr->lr_event = LE_STOP;
2440         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2441         lr->lr_status = stop->ls_status;
2442         lr->lr_version = bk->lb_version;
2443         lr->lr_active = LFSCK_TYPES_ALL;
2444         lr->lr_param = stop->ls_flags;
2445
2446         laia->laia_com = NULL;
2447         laia->laia_ltds = ltds;
2448         laia->laia_lr = lr;
2449         laia->laia_result = 0;
2450         laia->laia_shared = 1;
2451
2452         down_read(&ltds->ltd_rw_sem);
2453         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2454                 ltd = lfsck_tgt_get(ltds, idx);
2455                 LASSERT(ltd != NULL);
2456
2457                 laia->laia_ltd = ltd;
2458                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2459                                          lfsck_async_interpret, laia,
2460                                          LFSCK_NOTIFY);
2461                 if (rc != 0) {
2462                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2463                         lfsck_tgt_put(ltd);
2464                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2465                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2466                         rc1 = rc;
2467                 }
2468         }
2469         up_read(&ltds->ltd_rw_sem);
2470
2471         rc = ptlrpc_set_wait(set);
2472         ptlrpc_set_destroy(set);
2473
2474         if (rc == 0)
2475                 rc = laia->laia_result;
2476
2477         if (rc == -EALREADY)
2478                 rc = 0;
2479
2480         if (rc != 0)
2481                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2482                        lfsck_lfsck2name(lfsck), rc);
2483
2484         RETURN(rc != 0 ? rc : rc1);
2485 }
2486
2487 static int lfsck_start_all(const struct lu_env *env,
2488                            struct lfsck_instance *lfsck,
2489                            struct lfsck_start *start)
2490 {
2491         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2492         struct lfsck_request              *lr     = &info->lti_lr;
2493         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2494         struct ptlrpc_request_set         *set;
2495         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2496         struct lfsck_tgt_desc             *ltd;
2497         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2498         __u32                              idx;
2499         int                                rc     = 0;
2500         ENTRY;
2501
2502         LASSERT(start->ls_flags & LPF_BROADCAST);
2503
2504         set = ptlrpc_prep_set();
2505         if (unlikely(set == NULL))
2506                 RETURN(-ENOMEM);
2507
2508         memset(lr, 0, sizeof(*lr));
2509         lr->lr_event = LE_START;
2510         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2511         lr->lr_speed = bk->lb_speed_limit;
2512         lr->lr_version = bk->lb_version;
2513         lr->lr_active = start->ls_active;
2514         lr->lr_param = start->ls_flags;
2515         lr->lr_async_windows = bk->lb_async_windows;
2516         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2517                        LSV_ASYNC_WINDOWS;
2518
2519         laia->laia_com = NULL;
2520         laia->laia_ltds = ltds;
2521         laia->laia_lr = lr;
2522         laia->laia_result = 0;
2523         laia->laia_shared = 1;
2524
2525         down_read(&ltds->ltd_rw_sem);
2526         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2527                 ltd = lfsck_tgt_get(ltds, idx);
2528                 LASSERT(ltd != NULL);
2529
2530                 laia->laia_ltd = ltd;
2531                 ltd->ltd_layout_done = 0;
2532                 ltd->ltd_namespace_done = 0;
2533                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2534                                          lfsck_async_interpret, laia,
2535                                          LFSCK_NOTIFY);
2536                 if (rc != 0) {
2537                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2538                         lfsck_tgt_put(ltd);
2539                         CERROR("%s: cannot notify MDT %x for LFSCK "
2540                                "start, failout: rc = %d\n",
2541                                lfsck_lfsck2name(lfsck), idx, rc);
2542                         break;
2543                 }
2544         }
2545         up_read(&ltds->ltd_rw_sem);
2546
2547         if (rc != 0) {
2548                 ptlrpc_set_destroy(set);
2549
2550                 RETURN(rc);
2551         }
2552
2553         rc = ptlrpc_set_wait(set);
2554         ptlrpc_set_destroy(set);
2555
2556         if (rc == 0)
2557                 rc = laia->laia_result;
2558
2559         if (rc != 0) {
2560                 struct lfsck_stop *stop = &info->lti_stop;
2561
2562                 CERROR("%s: cannot start LFSCK on some MDTs, "
2563                        "stop all: rc = %d\n",
2564                        lfsck_lfsck2name(lfsck), rc);
2565                 if (rc != -EALREADY) {
2566                         stop->ls_status = LS_FAILED;
2567                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2568                         lfsck_stop_all(env, lfsck, stop);
2569                 }
2570         }
2571
2572         RETURN(rc);
2573 }
2574
2575 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2576                 struct lfsck_start_param *lsp)
2577 {
2578         struct lfsck_start              *start  = lsp->lsp_start;
2579         struct lfsck_instance           *lfsck;
2580         struct lfsck_bookmark           *bk;
2581         struct ptlrpc_thread            *thread;
2582         struct lfsck_component          *com;
2583         struct l_wait_info               lwi    = { 0 };
2584         struct lfsck_thread_args        *lta;
2585         struct task_struct              *task;
2586         int                              rc     = 0;
2587         __u16                            valid  = 0;
2588         __u16                            flags  = 0;
2589         __u16                            type   = 1;
2590         ENTRY;
2591
2592         lfsck = lfsck_instance_find(key, true, false);
2593         if (unlikely(lfsck == NULL))
2594                 RETURN(-ENXIO);
2595
2596         /* System is not ready, try again later. */
2597         if (unlikely(lfsck->li_namespace == NULL))
2598                 GOTO(put, rc = -EAGAIN);
2599
2600         /* start == NULL means auto trigger paused LFSCK. */
2601         if ((start == NULL) &&
2602             (list_empty(&lfsck->li_list_scan) ||
2603              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2604                 GOTO(put, rc = 0);
2605
2606         bk = &lfsck->li_bookmark_ram;
2607         thread = &lfsck->li_thread;
2608         mutex_lock(&lfsck->li_mutex);
2609         spin_lock(&lfsck->li_lock);
2610         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2611                 rc = -EALREADY;
2612                 if (unlikely(start == NULL)) {
2613                         spin_unlock(&lfsck->li_lock);
2614                         GOTO(out, rc);
2615                 }
2616
2617                 while (start->ls_active != 0) {
2618                         if (!(type & start->ls_active)) {
2619                                 type <<= 1;
2620                                 continue;
2621                         }
2622
2623                         com = __lfsck_component_find(lfsck, type,
2624                                                      &lfsck->li_list_scan);
2625                         if (com == NULL)
2626                                 com = __lfsck_component_find(lfsck, type,
2627                                                 &lfsck->li_list_double_scan);
2628                         if (com == NULL) {
2629                                 rc = -EOPNOTSUPP;
2630                                 break;
2631                         }
2632
2633                         if (com->lc_ops->lfsck_join != NULL) {
2634                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2635                                 if (rc != 0 && rc != -EALREADY)
2636                                         break;
2637                         }
2638                         start->ls_active &= ~type;
2639                         type <<= 1;
2640                 }
2641                 spin_unlock(&lfsck->li_lock);
2642                 GOTO(out, rc);
2643         }
2644         spin_unlock(&lfsck->li_lock);
2645
2646         lfsck->li_status = 0;
2647         lfsck->li_oit_over = 0;
2648         lfsck->li_start_unplug = 0;
2649         lfsck->li_drop_dryrun = 0;
2650         lfsck->li_new_scanned = 0;
2651
2652         /* For auto trigger. */
2653         if (start == NULL)
2654                 goto trigger;
2655
2656         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2657                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2658                        lfsck_lfsck2name(lfsck));
2659
2660                 GOTO(out, rc = -EPERM);
2661         }
2662
2663         start->ls_version = bk->lb_version;
2664
2665         if (start->ls_active != 0) {
2666                 struct lfsck_component *next;
2667
2668                 if (start->ls_active == LFSCK_TYPES_ALL)
2669                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2670
2671                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2672                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2673                         GOTO(out, rc = -ENOTSUPP);
2674                 }
2675
2676                 list_for_each_entry_safe(com, next,
2677                                          &lfsck->li_list_scan, lc_link) {
2678                         if (!(com->lc_type & start->ls_active)) {
2679                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2680                                                              false);
2681                                 if (rc != 0)
2682                                         GOTO(out, rc);
2683                         }
2684                 }
2685
2686                 while (start->ls_active != 0) {
2687                         if (type & start->ls_active) {
2688                                 com = __lfsck_component_find(lfsck, type,
2689                                                         &lfsck->li_list_idle);
2690                                 if (com != NULL)
2691                                         /* The component status will be updated
2692                                          * when its prep() is called later by
2693                                          * the LFSCK main engine. */
2694                                         list_move_tail(&com->lc_link,
2695                                                        &lfsck->li_list_scan);
2696                                 start->ls_active &= ~type;
2697                         }
2698                         type <<= 1;
2699                 }
2700         }
2701
2702         if (list_empty(&lfsck->li_list_scan)) {
2703                 /* The speed limit will be used to control both the LFSCK and
2704                  * low layer scrub (if applied), need to be handled firstly. */
2705                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2706                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2707                                 rc = lfsck_bookmark_store(env, lfsck);
2708                                 if (rc != 0)
2709                                         GOTO(out, rc);
2710                         }
2711                 }
2712
2713                 goto trigger;
2714         }
2715
2716         if (start->ls_flags & LPF_RESET)
2717                 flags |= DOIF_RESET;
2718
2719         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2720         if (rc != 0)
2721                 GOTO(out, rc);
2722
2723         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2724                 start->ls_active |= com->lc_type;
2725                 if (flags & DOIF_RESET) {
2726                         rc = com->lc_ops->lfsck_reset(env, com, false);
2727                         if (rc != 0)
2728                                 GOTO(out, rc);
2729                 }
2730         }
2731
2732 trigger:
2733         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2734         if (bk->lb_param & LPF_DRYRUN)
2735                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2736
2737         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2738                 valid |= DOIV_ERROR_HANDLE;
2739                 if (start->ls_flags & LPF_FAILOUT)
2740                         flags |= DOIF_FAILOUT;
2741         }
2742
2743         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2744                 valid |= DOIV_DRYRUN;
2745                 if (start->ls_flags & LPF_DRYRUN)
2746                         flags |= DOIF_DRYRUN;
2747         }
2748
2749         if (!list_empty(&lfsck->li_list_scan))
2750                 flags |= DOIF_OUTUSED;
2751
2752         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2753         thread_set_flags(thread, 0);
2754         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2755         if (IS_ERR(lta))
2756                 GOTO(out, rc = PTR_ERR(lta));
2757
2758         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2759         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2760         if (IS_ERR(task)) {
2761                 rc = PTR_ERR(task);
2762                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2763                        lfsck_lfsck2name(lfsck), rc);
2764                 lfsck_thread_args_fini(lta);
2765
2766                 GOTO(out, rc);
2767         }
2768
2769         l_wait_event(thread->t_ctl_waitq,
2770                      thread_is_running(thread) ||
2771                      thread_is_stopped(thread),
2772                      &lwi);
2773         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2774                 lfsck->li_start_unplug = 1;
2775                 wake_up_all(&thread->t_ctl_waitq);
2776
2777                 GOTO(out, rc = 0);
2778         }
2779
2780         /* release lfsck::li_mutex to avoid deadlock. */
2781         mutex_unlock(&lfsck->li_mutex);
2782         rc = lfsck_start_all(env, lfsck, start);
2783         if (rc != 0) {
2784                 spin_lock(&lfsck->li_lock);
2785                 if (thread_is_stopped(thread)) {
2786                         spin_unlock(&lfsck->li_lock);
2787                 } else {
2788                         lfsck->li_status = LS_FAILED;
2789                         lfsck->li_flags = 0;
2790                         thread_set_flags(thread, SVC_STOPPING);
2791                         spin_unlock(&lfsck->li_lock);
2792
2793                         lfsck->li_start_unplug = 1;
2794                         wake_up_all(&thread->t_ctl_waitq);
2795                         l_wait_event(thread->t_ctl_waitq,
2796                                      thread_is_stopped(thread),
2797                                      &lwi);
2798                 }
2799         } else {
2800                 lfsck->li_start_unplug = 1;
2801                 wake_up_all(&thread->t_ctl_waitq);
2802         }
2803
2804         GOTO(put, rc);
2805
2806 out:
2807         mutex_unlock(&lfsck->li_mutex);
2808
2809 put:
2810         lfsck_instance_put(env, lfsck);
2811
2812         return rc < 0 ? rc : 0;
2813 }
2814 EXPORT_SYMBOL(lfsck_start);
2815
2816 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2817                struct lfsck_stop *stop)
2818 {
2819         struct lfsck_instance   *lfsck;
2820         struct ptlrpc_thread    *thread;
2821         struct l_wait_info       lwi    = { 0 };
2822         int                      rc     = 0;
2823         int                      rc1    = 0;
2824         ENTRY;
2825
2826         lfsck = lfsck_instance_find(key, true, false);
2827         if (unlikely(lfsck == NULL))
2828                 RETURN(-ENXIO);
2829
2830         thread = &lfsck->li_thread;
2831         /* release lfsck::li_mutex to avoid deadlock. */
2832         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2833                 if (!lfsck->li_master) {
2834                         CERROR("%s: only allow to specify '-A' via MDS\n",
2835                                lfsck_lfsck2name(lfsck));
2836
2837                         GOTO(out, rc = -EPERM);
2838                 }
2839
2840                 rc1 = lfsck_stop_all(env, lfsck, stop);
2841         }
2842
2843         mutex_lock(&lfsck->li_mutex);
2844         spin_lock(&lfsck->li_lock);
2845         /* no error if LFSCK is already stopped, or was never started */
2846         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2847                 spin_unlock(&lfsck->li_lock);
2848                 GOTO(out, rc = 0);
2849         }
2850
2851         if (stop != NULL) {
2852                 lfsck->li_status = stop->ls_status;
2853                 lfsck->li_flags = stop->ls_flags;
2854         } else {
2855                 lfsck->li_status = LS_STOPPED;
2856                 lfsck->li_flags = 0;
2857         }
2858
2859         thread_set_flags(thread, SVC_STOPPING);
2860         spin_unlock(&lfsck->li_lock);
2861
2862         wake_up_all(&thread->t_ctl_waitq);
2863         l_wait_event(thread->t_ctl_waitq,
2864                      thread_is_stopped(thread),
2865                      &lwi);
2866
2867         GOTO(out, rc = 0);
2868
2869 out:
2870         mutex_unlock(&lfsck->li_mutex);
2871         lfsck_instance_put(env, lfsck);
2872
2873         return rc != 0 ? rc : rc1;
2874 }
2875 EXPORT_SYMBOL(lfsck_stop);
2876
2877 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2878                     struct lfsck_request *lr, struct thandle *th)
2879 {
2880         int rc = -EOPNOTSUPP;
2881         ENTRY;
2882
2883         switch (lr->lr_event) {
2884         case LE_START: {
2885                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2886                 struct lfsck_start_param  lsp;
2887
2888                 memset(start, 0, sizeof(*start));
2889                 start->ls_valid = lr->lr_valid;
2890                 start->ls_speed_limit = lr->lr_speed;
2891                 start->ls_version = lr->lr_version;
2892                 start->ls_active = lr->lr_active;
2893                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2894                 start->ls_async_windows = lr->lr_async_windows;
2895
2896                 lsp.lsp_start = start;
2897                 lsp.lsp_index = lr->lr_index;
2898                 lsp.lsp_index_valid = 1;
2899                 rc = lfsck_start(env, key, &lsp);
2900                 break;
2901         }
2902         case LE_STOP: {
2903                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2904
2905                 memset(stop, 0, sizeof(*stop));
2906                 stop->ls_status = lr->lr_status;
2907                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2908                 rc = lfsck_stop(env, key, stop);
2909                 break;
2910         }
2911         case LE_PHASE1_DONE:
2912         case LE_PHASE2_DONE:
2913         case LE_FID_ACCESSED:
2914         case LE_PEER_EXIT:
2915         case LE_CONDITIONAL_DESTROY:
2916         case LE_CREATE_ORPHAN:
2917         case LE_SKIP_NLINK_DECLARE:
2918         case LE_SKIP_NLINK:
2919         case LE_PAIRS_VERIFY: {
2920                 struct lfsck_instance  *lfsck;
2921                 struct lfsck_component *com;
2922
2923                 lfsck = lfsck_instance_find(key, true, false);
2924                 if (unlikely(lfsck == NULL))
2925                         RETURN(-ENXIO);
2926
2927                 com = lfsck_component_find(lfsck, lr->lr_active);
2928                 if (likely(com != NULL)) {
2929                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
2930                         lfsck_component_put(env, com);
2931                 }
2932
2933                 lfsck_instance_put(env, lfsck);
2934                 break;
2935         }
2936         default:
2937                 break;
2938         }
2939
2940         RETURN(rc);
2941 }
2942 EXPORT_SYMBOL(lfsck_in_notify);
2943
2944 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2945                 struct lfsck_request *lr)
2946 {
2947         struct lfsck_instance  *lfsck;
2948         struct lfsck_component *com;
2949         int                     rc;
2950         ENTRY;
2951
2952         lfsck = lfsck_instance_find(key, true, false);
2953         if (unlikely(lfsck == NULL))
2954                 RETURN(-ENXIO);
2955
2956         com = lfsck_component_find(lfsck, lr->lr_active);
2957         if (likely(com != NULL)) {
2958                 rc = com->lc_ops->lfsck_query(env, com);
2959                 lfsck_component_put(env, com);
2960         } else {
2961                 rc = -ENOTSUPP;
2962         }
2963
2964         lfsck_instance_put(env, lfsck);
2965
2966         RETURN(rc);
2967 }
2968 EXPORT_SYMBOL(lfsck_query);
2969
2970 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2971                              struct ldlm_namespace *ns)
2972 {
2973         struct lfsck_instance  *lfsck;
2974         int                     rc      = -ENXIO;
2975
2976         lfsck = lfsck_instance_find(key, true, false);
2977         if (likely(lfsck != NULL)) {
2978                 lfsck->li_namespace = ns;
2979                 lfsck_instance_put(env, lfsck);
2980                 rc = 0;
2981         }
2982
2983         return rc;
2984 }
2985 EXPORT_SYMBOL(lfsck_register_namespace);
2986
2987 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2988                    struct dt_device *next, struct obd_device *obd,
2989                    lfsck_out_notify notify, void *notify_data, bool master)
2990 {
2991         struct lfsck_instance   *lfsck;
2992         struct dt_object        *root  = NULL;
2993         struct dt_object        *obj   = NULL;
2994         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2995         int                      rc;
2996         ENTRY;
2997
2998         lfsck = lfsck_instance_find(key, false, false);
2999         if (unlikely(lfsck != NULL))
3000                 RETURN(-EEXIST);
3001
3002         OBD_ALLOC_PTR(lfsck);
3003         if (lfsck == NULL)
3004                 RETURN(-ENOMEM);
3005
3006         mutex_init(&lfsck->li_mutex);
3007         spin_lock_init(&lfsck->li_lock);
3008         INIT_LIST_HEAD(&lfsck->li_link);
3009         INIT_LIST_HEAD(&lfsck->li_list_scan);
3010         INIT_LIST_HEAD(&lfsck->li_list_dir);
3011         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3012         INIT_LIST_HEAD(&lfsck->li_list_idle);
3013         atomic_set(&lfsck->li_ref, 1);
3014         atomic_set(&lfsck->li_double_scan_count, 0);
3015         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3016         lfsck->li_out_notify = notify;
3017         lfsck->li_out_notify_data = notify_data;
3018         lfsck->li_next = next;
3019         lfsck->li_bottom = key;
3020         lfsck->li_obd = obd;
3021
3022         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3023         if (rc != 0)
3024                 GOTO(out, rc);
3025
3026         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3027         if (rc != 0)
3028                 GOTO(out, rc);
3029
3030         fid->f_seq = FID_SEQ_LOCAL_NAME;
3031         fid->f_oid = 1;
3032         fid->f_ver = 0;
3033         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3034         if (rc != 0)
3035                 GOTO(out, rc);
3036
3037         rc = dt_root_get(env, key, fid);
3038         if (rc != 0)
3039                 GOTO(out, rc);
3040
3041         root = dt_locate(env, key, fid);
3042         if (IS_ERR(root))
3043                 GOTO(out, rc = PTR_ERR(root));
3044
3045         if (unlikely(!dt_try_as_dir(env, root)))
3046                 GOTO(out, rc = -ENOTDIR);
3047
3048         lfsck->li_local_root_fid = *fid;
3049         if (master) {
3050                 lfsck->li_master = 1;
3051                 if (lfsck_dev_idx(key) == 0) {
3052                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3053                         const struct lu_name *cname;
3054
3055                         rc = dt_lookup(env, root,
3056                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3057                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3058                         if (rc != 0)
3059                                 GOTO(out, rc);
3060
3061                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3062                         if (IS_ERR(obj))
3063                                 GOTO(out, rc = PTR_ERR(obj));
3064
3065                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3066                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3067                         if (rc != 0)
3068                                 GOTO(out, rc);
3069
3070                         lu_object_put(env, &obj->do_lu);
3071                         obj = dt_locate(env, key, fid);
3072                         if (IS_ERR(obj))
3073                                 GOTO(out, rc = PTR_ERR(obj));
3074
3075                         cname = lfsck_name_get_const(env, dotlustre,
3076                                                      strlen(dotlustre));
3077                         rc = lfsck_verify_linkea(env, key, obj, cname,
3078                                                  &lfsck->li_global_root_fid);
3079                         if (rc != 0)
3080                                 GOTO(out, rc);
3081
3082                         *pfid = *fid;
3083                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3084                                        (const struct dt_key *)lostfound,
3085                                        BYPASS_CAPA);
3086                         if (rc != 0)
3087                                 GOTO(out, rc);
3088
3089                         lu_object_put(env, &obj->do_lu);
3090                         obj = dt_locate(env, key, fid);
3091                         if (IS_ERR(obj))
3092                                 GOTO(out, rc = PTR_ERR(obj));
3093
3094                         cname = lfsck_name_get_const(env, lostfound,
3095                                                      strlen(lostfound));
3096                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3097                         if (rc != 0)
3098                                 GOTO(out, rc);
3099
3100                         lu_object_put(env, &obj->do_lu);
3101                         obj = NULL;
3102                 }
3103         }
3104
3105         fid->f_seq = FID_SEQ_LOCAL_FILE;
3106         fid->f_oid = OTABLE_IT_OID;
3107         fid->f_ver = 0;
3108         obj = dt_locate(env, key, fid);
3109         if (IS_ERR(obj))
3110                 GOTO(out, rc = PTR_ERR(obj));
3111
3112         lu_object_get(&obj->do_lu);
3113         lfsck->li_obj_oit = obj;
3114         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3115         if (rc != 0)
3116                 GOTO(out, rc);
3117
3118         rc = lfsck_bookmark_setup(env, lfsck);
3119         if (rc != 0)
3120                 GOTO(out, rc);
3121
3122         if (master) {
3123                 rc = lfsck_fid_init(lfsck);
3124                 if (rc < 0)
3125                         GOTO(out, rc);
3126
3127                 rc = lfsck_namespace_setup(env, lfsck);
3128                 if (rc < 0)
3129                         GOTO(out, rc);
3130         }
3131
3132         rc = lfsck_layout_setup(env, lfsck);
3133         if (rc < 0)
3134                 GOTO(out, rc);
3135
3136         /* XXX: more LFSCK components initialization to be added here. */
3137
3138         rc = lfsck_instance_add(lfsck);
3139         if (rc == 0)
3140                 rc = lfsck_add_target_from_orphan(env, lfsck);
3141 out:
3142         if (obj != NULL && !IS_ERR(obj))
3143                 lu_object_put(env, &obj->do_lu);
3144         if (root != NULL && !IS_ERR(root))
3145                 lu_object_put(env, &root->do_lu);
3146         if (rc != 0)
3147                 lfsck_instance_cleanup(env, lfsck);
3148         return rc;
3149 }
3150 EXPORT_SYMBOL(lfsck_register);
3151
3152 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3153 {
3154         struct lfsck_instance *lfsck;
3155
3156         lfsck = lfsck_instance_find(key, false, true);
3157         if (lfsck != NULL)
3158                 lfsck_instance_put(env, lfsck);
3159 }
3160 EXPORT_SYMBOL(lfsck_degister);
3161
3162 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3163                      struct dt_device *tgt, struct obd_export *exp,
3164                      __u32 index, bool for_ost)
3165 {
3166         struct lfsck_instance   *lfsck;
3167         struct lfsck_tgt_desc   *ltd;
3168         int                      rc;
3169         ENTRY;
3170
3171         OBD_ALLOC_PTR(ltd);
3172         if (ltd == NULL)
3173                 RETURN(-ENOMEM);
3174
3175         ltd->ltd_tgt = tgt;
3176         ltd->ltd_key = key;
3177         ltd->ltd_exp = exp;
3178         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3179         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3180         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3181         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3182         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3183         atomic_set(&ltd->ltd_ref, 1);
3184         ltd->ltd_index = index;
3185
3186         spin_lock(&lfsck_instance_lock);
3187         lfsck = __lfsck_instance_find(key, true, false);
3188         if (lfsck == NULL) {