Whamcloud - gitweb
dda5e1f6db49194210f7d6402881d2ccfdbe7f83
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_big_buf);
57         OBD_FREE_PTR(info);
58 }
59
60 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
61 LU_KEY_INIT_GENERIC(lfsck);
62
63 static struct list_head lfsck_instance_list;
64 static struct list_head lfsck_ost_orphan_list;
65 static struct list_head lfsck_mdt_orphan_list;
66 static DEFINE_SPINLOCK(lfsck_instance_lock);
67
68 static const char *lfsck_status_names[] = {
69         [LS_INIT]               = "init",
70         [LS_SCANNING_PHASE1]    = "scanning-phase1",
71         [LS_SCANNING_PHASE2]    = "scanning-phase2",
72         [LS_COMPLETED]          = "completed",
73         [LS_FAILED]             = "failed",
74         [LS_STOPPED]            = "stopped",
75         [LS_PAUSED]             = "paused",
76         [LS_CRASHED]            = "crashed",
77         [LS_PARTIAL]            = "partial",
78         [LS_CO_FAILED]          = "co-failed",
79         [LS_CO_STOPPED]         = "co-stopped",
80         [LS_CO_PAUSED]          = "co-paused"
81 };
82
83 const char *lfsck_flags_names[] = {
84         "scanned-once",
85         "inconsistent",
86         "upgrade",
87         "incomplete",
88         "crashed_lastid",
89         NULL
90 };
91
92 const char *lfsck_param_names[] = {
93         NULL,
94         "failout",
95         "dryrun",
96         "all_targets",
97         "broadcast",
98         "orphan",
99         "create_ostobj",
100         NULL
101 };
102
103 enum lfsck_verify_lpf_types {
104         LVLT_BY_BOOKMARK        = 0,
105         LVLT_BY_NAMEENTRY       = 1,
106 };
107
108 const char *lfsck_status2names(enum lfsck_status status)
109 {
110         if (unlikely(status < 0 || status >= LS_MAX))
111                 return "unknown";
112
113         return lfsck_status_names[status];
114 }
115
116 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
117 {
118         spin_lock_init(&ltds->ltd_lock);
119         init_rwsem(&ltds->ltd_rw_sem);
120         INIT_LIST_HEAD(&ltds->ltd_orphan);
121         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
122         if (ltds->ltd_tgts_bitmap == NULL)
123                 return -ENOMEM;
124
125         return 0;
126 }
127
128 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
129 {
130         struct lfsck_tgt_desc   *ltd;
131         struct lfsck_tgt_desc   *next;
132         int                      idx;
133
134         down_write(&ltds->ltd_rw_sem);
135
136         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
137                                  ltd_orphan_list) {
138                 list_del_init(&ltd->ltd_orphan_list);
139                 lfsck_tgt_put(ltd);
140         }
141
142         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
143                 up_write(&ltds->ltd_rw_sem);
144
145                 return;
146         }
147
148         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
149                 ltd = LTD_TGT(ltds, idx);
150                 if (likely(ltd != NULL)) {
151                         LASSERT(list_empty(&ltd->ltd_layout_list));
152                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
153                         LASSERT(list_empty(&ltd->ltd_namespace_list));
154                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
155
156                         ltds->ltd_tgtnr--;
157                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
158                         LTD_TGT(ltds, idx) = NULL;
159                         lfsck_tgt_put(ltd);
160                 }
161         }
162
163         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
164                  ltds->ltd_tgtnr);
165
166         for (idx = 0; idx < TGT_PTRS; idx++) {
167                 if (ltds->ltd_tgts_idx[idx] != NULL) {
168                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
169                         ltds->ltd_tgts_idx[idx] = NULL;
170                 }
171         }
172
173         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
174         ltds->ltd_tgts_bitmap = NULL;
175         up_write(&ltds->ltd_rw_sem);
176 }
177
178 static int __lfsck_add_target(const struct lu_env *env,
179                               struct lfsck_instance *lfsck,
180                               struct lfsck_tgt_desc *ltd,
181                               bool for_ost, bool locked)
182 {
183         struct lfsck_tgt_descs *ltds;
184         __u32                   index = ltd->ltd_index;
185         int                     rc    = 0;
186         ENTRY;
187
188         if (for_ost)
189                 ltds = &lfsck->li_ost_descs;
190         else
191                 ltds = &lfsck->li_mdt_descs;
192
193         if (!locked)
194                 down_write(&ltds->ltd_rw_sem);
195
196         LASSERT(ltds->ltd_tgts_bitmap != NULL);
197
198         if (index >= ltds->ltd_tgts_bitmap->size) {
199                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
200                                     (__u32)BITS_PER_LONG);
201                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
202                 cfs_bitmap_t *new_bitmap;
203
204                 while (newsize < index + 1)
205                         newsize <<= 1;
206
207                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
208                 if (new_bitmap == NULL)
209                         GOTO(unlock, rc = -ENOMEM);
210
211                 if (ltds->ltd_tgtnr > 0)
212                         cfs_bitmap_copy(new_bitmap, old_bitmap);
213                 ltds->ltd_tgts_bitmap = new_bitmap;
214                 CFS_FREE_BITMAP(old_bitmap);
215         }
216
217         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
218                 CERROR("%s: the device %s (%u) is registered already\n",
219                        lfsck_lfsck2name(lfsck),
220                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
221                 GOTO(unlock, rc = -EEXIST);
222         }
223
224         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
225                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
226                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
227                         GOTO(unlock, rc = -ENOMEM);
228         }
229
230         LTD_TGT(ltds, index) = ltd;
231         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
232         ltds->ltd_tgtnr++;
233
234         GOTO(unlock, rc = 0);
235
236 unlock:
237         if (!locked)
238                 up_write(&ltds->ltd_rw_sem);
239
240         return rc;
241 }
242
243 static int lfsck_add_target_from_orphan(const struct lu_env *env,
244                                         struct lfsck_instance *lfsck)
245 {
246         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
247         struct lfsck_tgt_desc   *ltd;
248         struct lfsck_tgt_desc   *next;
249         struct list_head        *head    = &lfsck_ost_orphan_list;
250         int                      rc;
251         bool                     for_ost = true;
252
253 again:
254         spin_lock(&lfsck_instance_lock);
255         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
256                 if (ltd->ltd_key == lfsck->li_bottom)
257                         list_move_tail(&ltd->ltd_orphan_list,
258                                        &ltds->ltd_orphan);
259         }
260         spin_unlock(&lfsck_instance_lock);
261
262         down_write(&ltds->ltd_rw_sem);
263         while (!list_empty(&ltds->ltd_orphan)) {
264                 ltd = list_entry(ltds->ltd_orphan.next,
265                                  struct lfsck_tgt_desc,
266                                  ltd_orphan_list);
267                 list_del_init(&ltd->ltd_orphan_list);
268                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
269                 /* Do not hold the semaphore for too long time. */
270                 up_write(&ltds->ltd_rw_sem);
271                 if (rc != 0)
272                         return rc;
273
274                 down_write(&ltds->ltd_rw_sem);
275         }
276         up_write(&ltds->ltd_rw_sem);
277
278         if (for_ost) {
279                 ltds = &lfsck->li_mdt_descs;
280                 head = &lfsck_mdt_orphan_list;
281                 for_ost = false;
282                 goto again;
283         }
284
285         return 0;
286 }
287
288 static inline struct lfsck_component *
289 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
290                        struct list_head *list)
291 {
292         struct lfsck_component *com;
293
294         list_for_each_entry(com, list, lc_link) {
295                 if (com->lc_type == type)
296                         return com;
297         }
298         return NULL;
299 }
300
301 struct lfsck_component *
302 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
303 {
304         struct lfsck_component *com;
305
306         spin_lock(&lfsck->li_lock);
307         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
308         if (com != NULL)
309                 goto unlock;
310
311         com = __lfsck_component_find(lfsck, type,
312                                      &lfsck->li_list_double_scan);
313         if (com != NULL)
314                 goto unlock;
315
316         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
317
318 unlock:
319         if (com != NULL)
320                 lfsck_component_get(com);
321         spin_unlock(&lfsck->li_lock);
322         return com;
323 }
324
325 void lfsck_component_cleanup(const struct lu_env *env,
326                              struct lfsck_component *com)
327 {
328         if (!list_empty(&com->lc_link))
329                 list_del_init(&com->lc_link);
330         if (!list_empty(&com->lc_link_dir))
331                 list_del_init(&com->lc_link_dir);
332
333         lfsck_component_put(env, com);
334 }
335
336 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
337                     struct lu_fid *fid, bool locked)
338 {
339         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
340         int                      rc = 0;
341         ENTRY;
342
343         if (!locked)
344                 mutex_lock(&lfsck->li_mutex);
345
346         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
347         if (rc >= 0) {
348                 bk->lb_last_fid = *fid;
349                 /* We do not care about whether the subsequent sub-operations
350                  * failed or not. The worst case is that one FID is lost that
351                  * is not a big issue for the LFSCK since it is relative rare
352                  * for LFSCK create. */
353                 rc = lfsck_bookmark_store(env, lfsck);
354         }
355
356         if (!locked)
357                 mutex_unlock(&lfsck->li_mutex);
358
359         RETURN(rc);
360 }
361
362 /**
363  * Request the specified ibits lock for the given object.
364  *
365  * Before the LFSCK modifying on the namespace visible object,
366  * it needs to acquire related ibits ldlm lock.
367  *
368  * \param[in] env       pointer to the thread context
369  * \param[in] lfsck     pointer to the lfsck instance
370  * \param[in] obj       pointer to the dt_object to be locked
371  * \param[out] lh       pointer to the lock handle
372  * \param[in] ibits     the bits for the ldlm lock to be acquired
373  * \param[in] mode      the mode for the ldlm lock to be acquired
374  *
375  * \retval              0 for success
376  * \retval              negative error number on failure
377  */
378 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
379                      struct dt_object *obj, struct lustre_handle *lh,
380                      __u64 bits, ldlm_mode_t mode)
381 {
382         struct lfsck_thread_info        *info   = lfsck_env_info(env);
383         ldlm_policy_data_t              *policy = &info->lti_policy;
384         struct ldlm_res_id              *resid  = &info->lti_resid;
385         __u64                            flags  = LDLM_FL_ATOMIC_CB;
386         int                              rc;
387
388         LASSERT(lfsck->li_namespace != NULL);
389
390         memset(policy, 0, sizeof(*policy));
391         policy->l_inodebits.bits = bits;
392         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
393         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
394                                     policy, mode, &flags, ldlm_blocking_ast,
395                                     ldlm_completion_ast, NULL, NULL, 0,
396                                     LVB_T_NONE, NULL, lh);
397         if (rc == ELDLM_OK) {
398                 rc = 0;
399         } else {
400                 memset(lh, 0, sizeof(*lh));
401                 rc = -EIO;
402         }
403
404         return rc;
405 }
406
407 /**
408  * Release the the specified ibits lock.
409  *
410  * If the lock has been acquired before, release it
411  * and cleanup the handle. Otherwise, do nothing.
412  *
413  * \param[in] lh        pointer to the lock handle
414  * \param[in] mode      the mode for the ldlm lock to be released
415  */
416 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
417 {
418         if (lustre_handle_is_used(lh)) {
419                 ldlm_lock_decref(lh, mode);
420                 memset(lh, 0, sizeof(*lh));
421         }
422 }
423
424 static const char dot[] = ".";
425 static const char dotdot[] = "..";
426 static const char dotlustre[] = ".lustre";
427 static const char lostfound[] = "lost+found";
428
429 static int lfsck_create_lpf_local(const struct lu_env *env,
430                                   struct lfsck_instance *lfsck,
431                                   struct dt_object *parent,
432                                   struct dt_object *child,
433                                   struct lu_attr *la,
434                                   struct dt_object_format *dof,
435                                   const char *name)
436 {
437         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
438         struct dt_device        *dev    = lfsck->li_bottom;
439         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
440         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
441         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
442         struct thandle          *th     = NULL;
443         struct linkea_data       ldata  = { 0 };
444         struct lu_buf            linkea_buf;
445         const struct lu_name    *cname;
446         loff_t                   pos    = 0;
447         int                      len    = sizeof(struct lfsck_bookmark);
448         int                      rc;
449         ENTRY;
450
451         rc = linkea_data_new(&ldata,
452                              &lfsck_env_info(env)->lti_linkea_buf);
453         if (rc != 0)
454                 RETURN(rc);
455
456         cname = lfsck_name_get_const(env, name, strlen(name));
457         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
458         if (rc != 0)
459                 RETURN(rc);
460
461         th = dt_trans_create(env, dev);
462         if (IS_ERR(th))
463                 RETURN(PTR_ERR(th));
464
465         /* 1a. create child */
466         rc = dt_declare_create(env, child, la, NULL, dof, th);
467         if (rc != 0)
468                 GOTO(stop, rc);
469
470         /* 2a. increase child nlink */
471         rc = dt_declare_ref_add(env, child, th);
472         if (rc != 0)
473                 GOTO(stop, rc);
474
475         /* 3a. insert linkEA for child */
476         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
477                        ldata.ld_leh->leh_len);
478         rc = dt_declare_xattr_set(env, child, &linkea_buf,
479                                   XATTR_NAME_LINK, 0, th);
480         if (rc != 0)
481                 GOTO(stop, rc);
482
483         /* 4a. insert name into parent dir */
484         rec->rec_type = S_IFDIR;
485         rec->rec_fid = cfid;
486         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
487                                (const struct dt_key *)name, th);
488         if (rc != 0)
489                 GOTO(stop, rc);
490
491         /* 5a. increase parent nlink */
492         rc = dt_declare_ref_add(env, parent, th);
493         if (rc != 0)
494                 GOTO(stop, rc);
495
496         /* 6a. update bookmark */
497         rc = dt_declare_record_write(env, bk_obj,
498                                      lfsck_buf_get(env, bk, len), 0, th);
499         if (rc != 0)
500                 GOTO(stop, rc);
501
502         rc = dt_trans_start_local(env, dev, th);
503         if (rc != 0)
504                 GOTO(stop, rc);
505
506         dt_write_lock(env, child, 0);
507         /* 1b.1. create child */
508         rc = dt_create(env, child, la, NULL, dof, th);
509         if (rc != 0)
510                 GOTO(unlock, rc);
511
512         if (unlikely(!dt_try_as_dir(env, child)))
513                 GOTO(unlock, rc = -ENOTDIR);
514
515         /* 1b.2. insert dot into child dir */
516         rec->rec_fid = cfid;
517         rc = dt_insert(env, child, (const struct dt_rec *)rec,
518                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
519         if (rc != 0)
520                 GOTO(unlock, rc);
521
522         /* 1b.3. insert dotdot into child dir */
523         rec->rec_fid = &LU_LPF_FID;
524         rc = dt_insert(env, child, (const struct dt_rec *)rec,
525                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
526         if (rc != 0)
527                 GOTO(unlock, rc);
528
529         /* 2b. increase child nlink */
530         rc = dt_ref_add(env, child, th);
531         if (rc != 0)
532                 GOTO(unlock, rc);
533
534         /* 3b. insert linkEA for child. */
535         rc = dt_xattr_set(env, child, &linkea_buf,
536                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
537         dt_write_unlock(env, child);
538         if (rc != 0)
539                 GOTO(stop, rc);
540
541         /* 4b. insert name into parent dir */
542         rec->rec_fid = cfid;
543         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
544                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
545         if (rc != 0)
546                 GOTO(stop, rc);
547
548         dt_write_lock(env, parent, 0);
549         /* 5b. increase parent nlink */
550         rc = dt_ref_add(env, parent, th);
551         dt_write_unlock(env, parent);
552         if (rc != 0)
553                 GOTO(stop, rc);
554
555         bk->lb_lpf_fid = *cfid;
556         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
557
558         /* 6b. update bookmark */
559         rc = dt_record_write(env, bk_obj,
560                              lfsck_buf_get(env, bk, len), &pos, th);
561
562         GOTO(stop, rc);
563
564 unlock:
565         dt_write_unlock(env, child);
566
567 stop:
568         dt_trans_stop(env, dev, th);
569
570         return rc;
571 }
572
573 static int lfsck_create_lpf_remote(const struct lu_env *env,
574                                    struct lfsck_instance *lfsck,
575                                    struct dt_object *parent,
576                                    struct dt_object *child,
577                                    struct lu_attr *la,
578                                    struct dt_object_format *dof,
579                                    const char *name)
580 {
581         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
582         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
583         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
584         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
585         struct thandle          *th     = NULL;
586         struct linkea_data       ldata  = { 0 };
587         struct lu_buf            linkea_buf;
588         const struct lu_name    *cname;
589         struct dt_device        *dev;
590         loff_t                   pos    = 0;
591         int                      len    = sizeof(struct lfsck_bookmark);
592         int                      rc;
593         ENTRY;
594
595         rc = linkea_data_new(&ldata,
596                              &lfsck_env_info(env)->lti_linkea_buf);
597         if (rc != 0)
598                 RETURN(rc);
599
600         cname = lfsck_name_get_const(env, name, strlen(name));
601         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
602         if (rc != 0)
603                 RETURN(rc);
604
605         /* Create .lustre/lost+found/MDTxxxx. */
606
607         /* XXX: Currently, cross-MDT create operation needs to create the child
608          *      object firstly, then insert name into the parent directory. For
609          *      this case, the child object resides on current MDT (local), but
610          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
611          *      easy to contain all the sub-modifications orderly within single
612          *      transaction.
613          *
614          *      To avoid more inconsistency, we split the create operation into
615          *      two transactions:
616          *
617          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
618          *         locally.
619          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
620          *         remotely.
621          *
622          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
623          *      repair such inconsistency when LFSCK run next time. */
624
625         /* Transaction I: locally */
626
627         dev = lfsck->li_bottom;
628         th = dt_trans_create(env, dev);
629         if (IS_ERR(th))
630                 RETURN(PTR_ERR(th));
631
632         /* 1a. create child */
633         rc = dt_declare_create(env, child, la, NULL, dof, th);
634         if (rc != 0)
635                 GOTO(stop, rc);
636
637         /* 2a. increase child nlink */
638         rc = dt_declare_ref_add(env, child, th);
639         if (rc != 0)
640                 GOTO(stop, rc);
641
642         /* 3a. insert linkEA for child */
643         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
644                        ldata.ld_leh->leh_len);
645         rc = dt_declare_xattr_set(env, child, &linkea_buf,
646                                   XATTR_NAME_LINK, 0, th);
647         if (rc != 0)
648                 GOTO(stop, rc);
649
650         /* 4a. update bookmark */
651         rc = dt_declare_record_write(env, bk_obj,
652                                      lfsck_buf_get(env, bk, len), 0, th);
653         if (rc != 0)
654                 GOTO(stop, rc);
655
656         rc = dt_trans_start_local(env, dev, th);
657         if (rc != 0)
658                 GOTO(stop, rc);
659
660         dt_write_lock(env, child, 0);
661         /* 1b.1. create child */
662         rc = dt_create(env, child, la, NULL, dof, th);
663         if (rc != 0)
664                 GOTO(unlock, rc);
665
666         if (unlikely(!dt_try_as_dir(env, child)))
667                 GOTO(unlock, rc = -ENOTDIR);
668
669         /* 1b.2. insert dot into child dir */
670         rec->rec_type = S_IFDIR;
671         rec->rec_fid = cfid;
672         rc = dt_insert(env, child, (const struct dt_rec *)rec,
673                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
674         if (rc != 0)
675                 GOTO(unlock, rc);
676
677         /* 1b.3. insert dotdot into child dir */
678         rec->rec_fid = &LU_LPF_FID;
679         rc = dt_insert(env, child, (const struct dt_rec *)rec,
680                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
681         if (rc != 0)
682                 GOTO(unlock, rc);
683
684         /* 2b. increase child nlink */
685         rc = dt_ref_add(env, child, th);
686         if (rc != 0)
687                 GOTO(unlock, rc);
688
689         /* 3b. insert linkEA for child */
690         rc = dt_xattr_set(env, child, &linkea_buf,
691                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
692         if (rc != 0)
693                 GOTO(unlock, rc);
694
695         bk->lb_lpf_fid = *cfid;
696         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
697
698         /* 4b. update bookmark */
699         rc = dt_record_write(env, bk_obj,
700                              lfsck_buf_get(env, bk, len), &pos, th);
701
702         dt_write_unlock(env, child);
703         dt_trans_stop(env, dev, th);
704         if (rc != 0)
705                 RETURN(rc);
706
707         /* Transaction II: remotely */
708
709         dev = lfsck->li_next;
710         th = dt_trans_create(env, dev);
711         if (IS_ERR(th))
712                 RETURN(PTR_ERR(th));
713
714         /* 5a. insert name into parent dir */
715         rec->rec_fid = cfid;
716         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
717                                (const struct dt_key *)name, th);
718         if (rc != 0)
719                 GOTO(stop, rc);
720
721         /* 6a. increase parent nlink */
722         rc = dt_declare_ref_add(env, parent, th);
723         if (rc != 0)
724                 GOTO(stop, rc);
725
726         rc = dt_trans_start(env, dev, th);
727         if (rc != 0)
728                 GOTO(stop, rc);
729
730         /* 5b. insert name into parent dir */
731         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
732                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
733         if (rc != 0)
734                 GOTO(stop, rc);
735
736         dt_write_lock(env, parent, 0);
737         /* 6b. increase parent nlink */
738         rc = dt_ref_add(env, parent, th);
739         dt_write_unlock(env, parent);
740
741         GOTO(stop, rc);
742
743 unlock:
744         dt_write_unlock(env, child);
745 stop:
746         dt_trans_stop(env, dev, th);
747
748         if (rc != 0 && dev == lfsck->li_next)
749                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
750                        "for orphans, but failed to insert the name %s "
751                        "to the .lustre/lost+found/. Such inconsistency "
752                        "will be repaired when LFSCK run next time: rc = %d\n",
753                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
754
755         return rc;
756 }
757
758 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
759  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
760  * only when it is required, such as orphan OST-objects repairing. */
761 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
762 {
763         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
764         struct lfsck_thread_info *info  = lfsck_env_info(env);
765         struct lu_fid            *cfid  = &info->lti_fid2;
766         struct lu_attr           *la    = &info->lti_la;
767         struct dt_object_format  *dof   = &info->lti_dof;
768         struct dt_object         *parent = NULL;
769         struct dt_object         *child = NULL;
770         struct lustre_handle      lh    = { 0 };
771         char                      name[8];
772         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
773         int                       rc    = 0;
774         ENTRY;
775
776         LASSERT(lfsck->li_master);
777
778         sprintf(name, "MDT%04x", node);
779         if (node == 0) {
780                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
781                                                   &LU_LPF_FID);
782         } else {
783                 struct lfsck_tgt_desc *ltd;
784
785                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
786                 if (unlikely(ltd == NULL))
787                         RETURN(-ENXIO);
788
789                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
790                                                   &LU_LPF_FID);
791                 lfsck_tgt_put(ltd);
792         }
793         if (IS_ERR(parent))
794                 RETURN(PTR_ERR(parent));
795
796         if (lfsck->li_lpf_obj != NULL)
797                 GOTO(out, rc = 0);
798
799         if (unlikely(!dt_try_as_dir(env, parent)))
800                 GOTO(out, rc = -ENOTDIR);
801
802         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
803                               MDS_INODELOCK_UPDATE, LCK_EX);
804         if (rc != 0)
805                 GOTO(out, rc);
806
807         mutex_lock(&lfsck->li_mutex);
808         if (lfsck->li_lpf_obj != NULL)
809                 GOTO(unlock, rc = 0);
810
811         if (fid_is_zero(&bk->lb_lpf_fid)) {
812                 /* There is corner case that: in former LFSCK scanning we have
813                  * created the .lustre/lost+found/MDTxxxx but failed to update
814                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
815                  * it from MDT0 firstly. */
816                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
817                                (const struct dt_key *)name, BYPASS_CAPA);
818                 if (rc != 0 && rc != -ENOENT)
819                         GOTO(unlock, rc);
820
821                 if (rc == 0) {
822                         bk->lb_lpf_fid = *cfid;
823                         rc = lfsck_bookmark_store(env, lfsck);
824                 } else {
825                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
826                 }
827                 if (rc != 0)
828                         GOTO(unlock, rc);
829         } else {
830                 *cfid = bk->lb_lpf_fid;
831         }
832
833         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
834         if (IS_ERR(child))
835                 GOTO(unlock, rc = PTR_ERR(child));
836
837         if (dt_object_exists(child) != 0) {
838                 if (unlikely(!dt_try_as_dir(env, child)))
839                         rc = -ENOTDIR;
840                 else
841                         lfsck->li_lpf_obj = child;
842
843                 GOTO(unlock, rc);
844         }
845
846         memset(la, 0, sizeof(*la));
847         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
848         la->la_mode = S_IFDIR | S_IRWXU;
849         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
850                        LA_UID | LA_GID;
851         memset(dof, 0, sizeof(*dof));
852         dof->dof_type = dt_mode_to_dft(S_IFDIR);
853
854         if (node == 0)
855                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
856                                             dof, name);
857         else
858                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
859                                              dof, name);
860         if (rc == 0)
861                 lfsck->li_lpf_obj = child;
862
863         GOTO(unlock, rc);
864
865 unlock:
866         mutex_unlock(&lfsck->li_mutex);
867         lfsck_ibits_unlock(&lh, LCK_EX);
868         if (rc != 0 && child != NULL && !IS_ERR(child))
869                 lu_object_put(env, &child->do_lu);
870 out:
871         if (parent != NULL && !IS_ERR(parent))
872                 lu_object_put(env, &parent->do_lu);
873
874         return rc;
875 }
876
877 /**
878  * Scan .lustre/lost+found for bad name entries and remove them.
879  *
880  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
881  * index in the system. Any other formatted name is invalid and should be
882  * removed.
883  *
884  * \param[in] env       pointer to the thread context
885  * \param[in] lfsck     pointer to the lfsck instance
886  * \param[in] parent    pointer to the lost+found object
887  *
888  * \retval              0 for success
889  * \retval              negative error number on failure
890  */
891 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
892                                       struct lfsck_instance *lfsck,
893                                       struct dt_object *parent)
894 {
895         struct lu_dirent        *ent    =
896                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
897         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
898         struct dt_it            *it;
899         int                      rc;
900         ENTRY;
901
902         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
903         if (IS_ERR(it))
904                 RETURN(PTR_ERR(it));
905
906         rc = iops->load(env, it, 0);
907         if (rc == 0)
908                 rc = iops->next(env, it);
909         else if (rc > 0)
910                 rc = 0;
911
912         while (rc == 0) {
913                 int off = 3;
914
915                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
916                 if (rc != 0)
917                         break;
918
919                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
920                 if (ent->lde_name[0] == '.') {
921                         if (ent->lde_namelen == 1)
922                                 goto next;
923
924                         if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
925                                 goto next;
926                 }
927
928                 /* name length must be strlen("MDTxxxx") */
929                 if (ent->lde_namelen != 7)
930                         goto remove;
931
932                 if (memcmp(ent->lde_name, "MDT", off) != 0)
933                         goto remove;
934
935                 while (off < 7 && isxdigit(ent->lde_name[off]))
936                         off++;
937
938                 if (off != 7) {
939
940 remove:
941                         rc = lfsck_remove_name_entry(env, lfsck, parent,
942                                                      ent->lde_name, S_IFDIR);
943                         if (rc != 0)
944                                 break;
945                 }
946
947 next:
948                 rc = iops->next(env, it);
949         }
950
951         iops->put(env, it);
952         iops->fini(env, it);
953
954         RETURN(rc > 0 ? 0 : rc);
955 }
956
957 static int lfsck_update_lpf_entry(const struct lu_env *env,
958                                   struct lfsck_instance *lfsck,
959                                   struct dt_object *parent,
960                                   struct dt_object *child,
961                                   const char *name,
962                                   enum lfsck_verify_lpf_types type)
963 {
964         int rc;
965
966         if (type == LVLT_BY_BOOKMARK) {
967                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
968                                              lfsck_dto2fid(child), S_IFDIR);
969         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
970                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
971                 rc = lfsck_bookmark_store(env, lfsck);
972
973                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
974                        " in the bookmark file: rc = %d\n",
975                        lfsck_lfsck2name(lfsck),
976                        PFID(lfsck_dto2fid(child)), rc);
977         }
978
979         return rc;
980 }
981
982 /**
983  * Check whether the @child back references the @parent.
984  *
985  * Two cases:
986  * 1) The child's FID is stored in the bookmark file. If the child back
987  *    references the parent (LU_LPF_FID object) via its ".." entry, then
988  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
989  *    the child back references another parent2, then:
990  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
991  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
992  *      references the child. So keep them there. As the LFSCK processing,
993  *      the parent3 may be found, then when the LFSCK run next time, the
994  *      inconsistency can be repaired.
995  *
996  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
997  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
998  *    via its ".." entry, then update the bookmark file, otherwise, if the child
999  *    back references another parent2, then:
1000  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1001  *      from .lustre/lost+found/;
1002  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1003  *      sub-directory name entry and update the child;
1004  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1005  *      or not, then keep them there.
1006  *
1007  * \param[in] env       pointer to the thread context
1008  * \param[in] lfsck     pointer to the lfsck instance
1009  * \param[in] parent    pointer to the lost+found object
1010  * \param[in] child     pointer to the lost+found sub-directory object
1011  * \param[in] name      the name for lost+found sub-directory object
1012  * \param[out] fid      pointer to the buffer to hold the FID of the object
1013  *                      (called it as parent2) that is referenced via the
1014  *                      child's dotdot entry; it also can be the FID that
1015  *                      is referenced by the name entry under the parent2.
1016  * \param[in] type      to indicate where the child's FID is stored in
1017  *
1018  * \retval              positive number for uncertain inconsistency
1019  * \retval              0 for success
1020  * \retval              negative error number on failure
1021  */
1022 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1023                                   struct lfsck_instance *lfsck,
1024                                   struct dt_object *parent,
1025                                   struct dt_object *child, const char *name,
1026                                   struct lu_fid *fid,
1027                                   enum lfsck_verify_lpf_types type)
1028 {
1029         struct lfsck_thread_info *info    = lfsck_env_info(env);
1030         char                     *name2   = info->lti_key;
1031         struct lu_fid            *fid2    = &info->lti_fid3;
1032         struct dt_object         *parent2 = NULL;
1033         struct lustre_handle      lh      = { 0 };
1034         int                       rc;
1035         ENTRY;
1036
1037         fid_zero(fid);
1038         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1039                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1040         if (rc != 0)
1041                 GOTO(linkea, rc);
1042
1043         if (!fid_is_sane(fid))
1044                 GOTO(linkea, rc = -EINVAL);
1045
1046         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1047                 const struct lu_name *cname;
1048
1049                 if (lfsck->li_lpf_obj == NULL) {
1050                         lu_object_get(&child->do_lu);
1051                         lfsck->li_lpf_obj = child;
1052                 }
1053
1054                 cname = lfsck_name_get_const(env, name, strlen(name));
1055                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1056                                          &LU_LPF_FID);
1057                 if (rc == 0)
1058                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1059                                                     name, type);
1060
1061                 GOTO(out_done, rc);
1062         }
1063
1064         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1065         if (IS_ERR(parent2))
1066                 GOTO(linkea, parent2);
1067
1068         if (!dt_object_exists(parent2)) {
1069                 lu_object_put(env, &parent2->do_lu);
1070
1071                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1072         }
1073
1074         if (!dt_try_as_dir(env, parent2)) {
1075                 lu_object_put(env, &parent2->do_lu);
1076
1077                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1078         }
1079
1080 linkea:
1081         /* To prevent rename/unlink race */
1082         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1083                               MDS_INODELOCK_UPDATE, LCK_PR);
1084         if (rc != 0)
1085                 GOTO(out_put, rc);
1086
1087         dt_read_lock(env, child, 0);
1088         rc = lfsck_links_get_first(env, child, name2, fid2);
1089         if (rc != 0) {
1090                 dt_read_unlock(env, child);
1091                 lfsck_ibits_unlock(&lh, LCK_PR);
1092
1093                 GOTO(out_put, rc = 1);
1094         }
1095
1096         /* It is almost impossible that the bookmark file (or the name entry)
1097          * and the linkEA hit the same data corruption. Trust the linkEA. */
1098         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1099                 dt_read_unlock(env, child);
1100                 lfsck_ibits_unlock(&lh, LCK_PR);
1101
1102                 *fid = *fid2;
1103                 if (lfsck->li_lpf_obj == NULL) {
1104                         lu_object_get(&child->do_lu);
1105                         lfsck->li_lpf_obj = child;
1106                 }
1107
1108                 /* Update the child's dotdot entry */
1109                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1110                                              &LU_LPF_FID, S_IFDIR);
1111                 if (rc == 0)
1112                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1113                                                     name, type);
1114
1115                 GOTO(out_put, rc);
1116         }
1117
1118         if (parent2 == NULL || IS_ERR(parent2)) {
1119                 dt_read_unlock(env, child);
1120                 lfsck_ibits_unlock(&lh, LCK_PR);
1121
1122                 GOTO(out_done, rc = 1);
1123         }
1124
1125         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1126                        (const struct dt_key *)name2, BYPASS_CAPA);
1127         dt_read_unlock(env, child);
1128         lfsck_ibits_unlock(&lh, LCK_PR);
1129         if (rc != 0 && rc != -ENOENT)
1130                 GOTO(out_put, rc);
1131
1132         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1133                 if (type == LVLT_BY_BOOKMARK)
1134                         GOTO(out_put, rc = 1);
1135
1136                 /* Trust the name entry, update the child's dotdot entry. */
1137                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1138                                              &LU_LPF_FID, S_IFDIR);
1139
1140                 GOTO(out_put, rc);
1141         }
1142
1143         if (type == LVLT_BY_BOOKMARK) {
1144                 /* Invalid FID record in the bookmark file, reset it. */
1145                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1146                 rc = lfsck_bookmark_store(env, lfsck);
1147
1148                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1149                        " in the bookmark file: rc = %d\n",
1150                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1151         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1152                 /* The name entry is wrong, remove it. */
1153                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1154         }
1155
1156         GOTO(out_put, rc);
1157
1158 out_put:
1159         if (parent2 != NULL && !IS_ERR(parent2))
1160                 lu_object_put(env, &parent2->do_lu);
1161
1162 out_done:
1163         return rc;
1164 }
1165
1166 /**
1167  * Verify the /ROOT/.lustre/lost+found/ directory.
1168  *
1169  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1170  * the LFSCK does not exactly know how to handle, such as orphans. So before
1171  * the LFSCK scanning the system, the consistency of such directory needs to
1172  * be verified firstly to allow the users to use it during the LFSCK.
1173  *
1174  * \param[in] env       pointer to the thread context
1175  * \param[in] lfsck     pointer to the lfsck instance
1176  *
1177  * \retval              positive number for uncertain inconsistency
1178  * \retval              0 for success
1179  * \retval              negative error number on failure
1180  */
1181 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1182 {
1183         struct lfsck_thread_info *info   = lfsck_env_info(env);
1184         struct lu_fid            *pfid   = &info->lti_fid;
1185         struct lu_fid            *cfid   = &info->lti_fid2;
1186         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1187         struct dt_object         *parent = NULL;
1188         /* child1's FID is in the bookmark file. */
1189         struct dt_object         *child1 = NULL;
1190         /* child2's FID is in the name entry MDTxxxx. */
1191         struct dt_object         *child2 = NULL;
1192         struct dt_device         *dev    = lfsck->li_bottom;
1193         const struct lu_name     *cname;
1194         char                      name[8];
1195         int                       node   = lfsck_dev_idx(dev);
1196         int                       rc     = 0;
1197         ENTRY;
1198
1199         LASSERT(lfsck->li_master);
1200
1201         if (node == 0) {
1202                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1203         } else {
1204                 struct lfsck_tgt_desc *ltd;
1205
1206                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1207                 if (unlikely(ltd == NULL))
1208                         RETURN(-ENXIO);
1209
1210                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1211                                                   &LU_LPF_FID);
1212                 lfsck_tgt_put(ltd);
1213         }
1214
1215         if (IS_ERR(parent))
1216                 RETURN(PTR_ERR(parent));
1217
1218         LASSERT(dt_object_exists(parent));
1219
1220         if (unlikely(!dt_try_as_dir(env, parent)))
1221                 GOTO(put, rc = -ENOTDIR);
1222
1223         if (node == 0) {
1224                 rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
1225                 if (rc != 0)
1226                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1227                                "for bad sub-directories: rc = %d\n",
1228                                lfsck_lfsck2name(lfsck), rc);
1229         }
1230
1231         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1232                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1233                         struct lu_fid tfid = bk->lb_lpf_fid;
1234
1235                         /* Invalid FID record in the bookmark file, reset it. */
1236                         fid_zero(&bk->lb_lpf_fid);
1237                         rc = lfsck_bookmark_store(env, lfsck);
1238
1239                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1240                                " in the bookmark file: rc = %d\n",
1241                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1242
1243                         if (rc != 0)
1244                                 GOTO(put, rc);
1245                 } else {
1246                         child1 = lfsck_object_find_by_dev(env, dev,
1247                                                           &bk->lb_lpf_fid);
1248                         if (IS_ERR(child1))
1249                                 GOTO(put, rc = PTR_ERR(child1));
1250
1251                         if (unlikely(!dt_object_exists(child1) ||
1252                                      dt_object_remote(child1)) ||
1253                                      !S_ISDIR(lfsck_object_type(child1))) {
1254                                 /* Invalid FID record in the bookmark file,
1255                                  * reset it. */
1256                                 fid_zero(&bk->lb_lpf_fid);
1257                                 rc = lfsck_bookmark_store(env, lfsck);
1258
1259                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1260                                        " in the bookmark file: rc = %d\n",
1261                                        lfsck_lfsck2name(lfsck),
1262                                        PFID(lfsck_dto2fid(child1)), rc);
1263
1264                                 if (rc != 0)
1265                                         GOTO(put, rc);
1266
1267                                 lu_object_put(env, &child1->do_lu);
1268                                 child1 = NULL;
1269                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1270                                 GOTO(put, rc = -ENOTDIR);
1271                         }
1272                 }
1273         }
1274
1275         snprintf(name, 8, "MDT%04x", node);
1276         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1277                        (const struct dt_key *)name, BYPASS_CAPA);
1278         if (rc == -ENOENT) {
1279                 if (!fid_is_zero(&bk->lb_lpf_fid))
1280                         goto check_child1;
1281
1282                 GOTO(put, rc = 0);
1283         }
1284
1285         if (rc != 0)
1286                 GOTO(put, rc);
1287
1288         /* Invalid FID in the name entry, remove the name entry. */
1289         if (!fid_is_norm(cfid)) {
1290                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1291                 if (rc != 0)
1292                         GOTO(put, rc);
1293
1294                 goto check_child1;
1295         }
1296
1297         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1298         if (IS_ERR(child2))
1299                 GOTO(put, rc = PTR_ERR(child2));
1300
1301         if (unlikely(!dt_object_exists(child2) ||
1302                      dt_object_remote(child2)) ||
1303                      !S_ISDIR(lfsck_object_type(child2))) {
1304                 rc = lfsck_remove_name_entry(env, lfsck, parent, name,
1305                                              S_IFDIR);
1306                 if (rc != 0)
1307                         GOTO(put, rc);
1308
1309                 goto check_child1;
1310         }
1311
1312         if (unlikely(!dt_try_as_dir(env, child2)))
1313                 GOTO(put, rc = -ENOTDIR);
1314
1315         if (child1 == NULL) {
1316                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
1317                                             pfid, LVLT_BY_NAMEENTRY);
1318         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1319                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1320                                             pfid, LVLT_BY_BOOKMARK);
1321                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1322                         rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
1323                                                     name, pfid,
1324                                                     LVLT_BY_NAMEENTRY);
1325         } else {
1326                 if (lfsck->li_lpf_obj == NULL) {
1327                         lu_object_get(&child2->do_lu);
1328                         lfsck->li_lpf_obj = child2;
1329                 }
1330
1331                 cname = lfsck_name_get_const(env, name, strlen(name));
1332                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1333         }
1334
1335         GOTO(put, rc);
1336
1337 check_child1:
1338         if (child1 != NULL)
1339                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1340                                             pfid, LVLT_BY_BOOKMARK);
1341
1342         GOTO(put, rc);
1343
1344 put:
1345         if (lfsck->li_lpf_obj != NULL &&
1346             unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
1347                 rc = -ENOTDIR;
1348
1349         if (child2 != NULL && !IS_ERR(child2))
1350                 lu_object_put(env, &child2->do_lu);
1351         if (child1 != NULL && !IS_ERR(child1))
1352                 lu_object_put(env, &child1->do_lu);
1353         if (parent != NULL && !IS_ERR(parent))
1354                 lu_object_put(env, &parent->do_lu);
1355
1356         return rc;
1357 }
1358
1359 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1360 {
1361         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1362         struct seq_server_site  *ss;
1363         char                    *prefix;
1364         int                      rc     = 0;
1365         ENTRY;
1366
1367         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1368         if (unlikely(ss == NULL))
1369                 RETURN(-ENXIO);
1370
1371         OBD_ALLOC_PTR(lfsck->li_seq);
1372         if (lfsck->li_seq == NULL)
1373                 RETURN(-ENOMEM);
1374
1375         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1376         if (prefix == NULL)
1377                 GOTO(out, rc = -ENOMEM);
1378
1379         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1380         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1381                              ss->ss_server_seq);
1382         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1383         if (rc != 0)
1384                 GOTO(out, rc);
1385
1386         if (fid_is_sane(&bk->lb_last_fid))
1387                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1388
1389         RETURN(0);
1390
1391 out:
1392         OBD_FREE_PTR(lfsck->li_seq);
1393         lfsck->li_seq = NULL;
1394
1395         return rc;
1396 }
1397
1398 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1399 {
1400         if (lfsck->li_seq != NULL) {
1401                 seq_client_fini(lfsck->li_seq);
1402                 OBD_FREE_PTR(lfsck->li_seq);
1403                 lfsck->li_seq = NULL;
1404         }
1405 }
1406
1407 void lfsck_instance_cleanup(const struct lu_env *env,
1408                             struct lfsck_instance *lfsck)
1409 {
1410         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1411         struct lfsck_component  *com;
1412         struct lfsck_component  *next;
1413         ENTRY;
1414
1415         LASSERT(list_empty(&lfsck->li_link));
1416         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1417
1418         if (lfsck->li_obj_oit != NULL) {
1419                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1420                 lfsck->li_obj_oit = NULL;
1421         }
1422
1423         LASSERT(lfsck->li_obj_dir == NULL);
1424
1425         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1426                 lfsck_component_cleanup(env, com);
1427         }
1428
1429         LASSERT(list_empty(&lfsck->li_list_dir));
1430
1431         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1432                                  lc_link) {
1433                 lfsck_component_cleanup(env, com);
1434         }
1435
1436         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1437                 lfsck_component_cleanup(env, com);
1438         }
1439
1440         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1441         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1442
1443         if (lfsck->li_bookmark_obj != NULL) {
1444                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1445                 lfsck->li_bookmark_obj = NULL;
1446         }
1447
1448         if (lfsck->li_lpf_obj != NULL) {
1449                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1450                 lfsck->li_lpf_obj = NULL;
1451         }
1452
1453         if (lfsck->li_los != NULL) {
1454                 local_oid_storage_fini(env, lfsck->li_los);
1455                 lfsck->li_los = NULL;
1456         }
1457
1458         lfsck_fid_fini(lfsck);
1459
1460         OBD_FREE_PTR(lfsck);
1461 }
1462
1463 static inline struct lfsck_instance *
1464 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1465 {
1466         struct lfsck_instance *lfsck;
1467
1468         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1469                 if (lfsck->li_bottom == key) {
1470                         if (ref)
1471                                 lfsck_instance_get(lfsck);
1472                         if (unlink)
1473                                 list_del_init(&lfsck->li_link);
1474
1475                         return lfsck;
1476                 }
1477         }
1478
1479         return NULL;
1480 }
1481
1482 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1483                                            bool unlink)
1484 {
1485         struct lfsck_instance *lfsck;
1486
1487         spin_lock(&lfsck_instance_lock);
1488         lfsck = __lfsck_instance_find(key, ref, unlink);
1489         spin_unlock(&lfsck_instance_lock);
1490
1491         return lfsck;
1492 }
1493
1494 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1495 {
1496         struct lfsck_instance *tmp;
1497
1498         spin_lock(&lfsck_instance_lock);
1499         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1500                 if (lfsck->li_bottom == tmp->li_bottom) {
1501                         spin_unlock(&lfsck_instance_lock);
1502                         return -EEXIST;
1503                 }
1504         }
1505
1506         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1507         spin_unlock(&lfsck_instance_lock);
1508         return 0;
1509 }
1510
1511 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1512                     const char *prefix)
1513 {
1514         int flag;
1515         int i;
1516         bool newline = (bits != 0 ? false : true);
1517
1518         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1519
1520         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1521                 if (flag & bits) {
1522                         bits &= ~flag;
1523                         if (names[i] != NULL) {
1524                                 if (bits == 0)
1525                                         newline = true;
1526
1527                                 seq_printf(m, "%s%c", names[i],
1528                                            newline ? '\n' : ',');
1529                         }
1530                 }
1531         }
1532
1533         if (!newline)
1534                 seq_printf(m, "\n");
1535         return 0;
1536 }
1537
1538 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1539 {
1540         if (time != 0)
1541                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1542                           cfs_time_current_sec() - time);
1543         else
1544                 seq_printf(m, "%s: N/A\n", prefix);
1545         return 0;
1546 }
1547
1548 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1549                    const char *prefix)
1550 {
1551         if (fid_is_zero(&pos->lp_dir_parent)) {
1552                 if (pos->lp_oit_cookie == 0)
1553                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1554                                    prefix);
1555                 else
1556                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1557                                    prefix, pos->lp_oit_cookie);
1558         } else {
1559                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1560                            prefix, pos->lp_oit_cookie,
1561                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1562         }
1563         return 0;
1564 }
1565
1566 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1567                     struct lfsck_position *pos, bool init)
1568 {
1569         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1570
1571         if (unlikely(lfsck->li_di_oit == NULL)) {
1572                 memset(pos, 0, sizeof(*pos));
1573                 return;
1574         }
1575
1576         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1577         if (!lfsck->li_current_oit_processed && !init)
1578                 pos->lp_oit_cookie--;
1579
1580         LASSERT(pos->lp_oit_cookie > 0);
1581
1582         if (lfsck->li_di_dir != NULL) {
1583                 struct dt_object *dto = lfsck->li_obj_dir;
1584
1585                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1586                                                         lfsck->li_di_dir);
1587
1588                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1589                         fid_zero(&pos->lp_dir_parent);
1590                         pos->lp_dir_cookie = 0;
1591                 } else {
1592                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1593                 }
1594         } else {
1595                 fid_zero(&pos->lp_dir_parent);
1596                 pos->lp_dir_cookie = 0;
1597         }
1598 }
1599
1600 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1601 {
1602         bool dirty = false;
1603
1604         if (limit != LFSCK_SPEED_NO_LIMIT) {
1605                 if (limit > HZ) {
1606                         lfsck->li_sleep_rate = limit / HZ;
1607                         lfsck->li_sleep_jif = 1;
1608                 } else {
1609                         lfsck->li_sleep_rate = 1;
1610                         lfsck->li_sleep_jif = HZ / limit;
1611                 }
1612         } else {
1613                 lfsck->li_sleep_jif = 0;
1614                 lfsck->li_sleep_rate = 0;
1615         }
1616
1617         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1618                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1619                 dirty = true;
1620         }
1621
1622         return dirty;
1623 }
1624
1625 void lfsck_control_speed(struct lfsck_instance *lfsck)
1626 {
1627         struct ptlrpc_thread *thread = &lfsck->li_thread;
1628         struct l_wait_info    lwi;
1629
1630         if (lfsck->li_sleep_jif > 0 &&
1631             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1632                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1633                                        LWI_ON_SIGNAL_NOOP, NULL);
1634
1635                 l_wait_event(thread->t_ctl_waitq,
1636                              !thread_is_running(thread),
1637                              &lwi);
1638                 lfsck->li_new_scanned = 0;
1639         }
1640 }
1641
1642 void lfsck_control_speed_by_self(struct lfsck_component *com)
1643 {
1644         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1645         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1646         struct l_wait_info       lwi;
1647
1648         if (lfsck->li_sleep_jif > 0 &&
1649             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1650                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1651                                        LWI_ON_SIGNAL_NOOP, NULL);
1652
1653                 l_wait_event(thread->t_ctl_waitq,
1654                              !thread_is_running(thread),
1655                              &lwi);
1656                 com->lc_new_scanned = 0;
1657         }
1658 }
1659
1660 static struct lfsck_thread_args *
1661 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1662                        struct lfsck_component *com,
1663                        struct lfsck_start_param *lsp)
1664 {
1665         struct lfsck_thread_args *lta;
1666         int                       rc;
1667
1668         OBD_ALLOC_PTR(lta);
1669         if (lta == NULL)
1670                 return ERR_PTR(-ENOMEM);
1671
1672         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1673         if (rc != 0) {
1674                 OBD_FREE_PTR(lta);
1675                 return ERR_PTR(rc);
1676         }
1677
1678         lta->lta_lfsck = lfsck_instance_get(lfsck);
1679         if (com != NULL)
1680                 lta->lta_com = lfsck_component_get(com);
1681
1682         lta->lta_lsp = lsp;
1683
1684         return lta;
1685 }
1686
1687 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1688 {
1689         if (lta->lta_com != NULL)
1690                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1691         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1692         lu_env_fini(&lta->lta_env);
1693         OBD_FREE_PTR(lta);
1694 }
1695
1696 struct lfsck_assistant_data *
1697 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1698                           const char *name)
1699 {
1700         struct lfsck_assistant_data *lad;
1701
1702         OBD_ALLOC_PTR(lad);
1703         if (lad != NULL) {
1704                 INIT_LIST_HEAD(&lad->lad_req_list);
1705                 spin_lock_init(&lad->lad_lock);
1706                 INIT_LIST_HEAD(&lad->lad_ost_list);
1707                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1708                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1709                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1710                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1711                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1712                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1713                 lad->lad_ops = lao;
1714                 lad->lad_name = name;
1715         }
1716
1717         return lad;
1718 }
1719
1720 /**
1721  * Generic LFSCK asynchronous communication interpretor function.
1722  * The LFSCK RPC reply for both the event notification and status
1723  * querying will be handled here.
1724  *
1725  * \param[in] env       pointer to the thread context
1726  * \param[in] req       pointer to the LFSCK request
1727  * \param[in] args      pointer to the lfsck_async_interpret_args
1728  * \param[in] rc        the result for handling the LFSCK request
1729  *
1730  * \retval              0 for success
1731  * \retval              negative error number on failure
1732  */
1733 int lfsck_async_interpret_common(const struct lu_env *env,
1734                                  struct ptlrpc_request *req,
1735                                  void *args, int rc)
1736 {
1737         struct lfsck_async_interpret_args *laia = args;
1738         struct lfsck_component            *com  = laia->laia_com;
1739         struct lfsck_assistant_data       *lad  = com->lc_data;
1740         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1741         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1742         struct lfsck_request              *lr   = laia->laia_lr;
1743
1744         LASSERT(com->lc_lfsck->li_master);
1745
1746         switch (lr->lr_event) {
1747         case LE_START:
1748                 if (rc != 0) {
1749                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1750                                "start: rc = %d\n",
1751                                lfsck_lfsck2name(com->lc_lfsck),
1752                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1753                                ltd->ltd_index, lad->lad_name, rc);
1754
1755                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1756                                 struct lfsck_layout *lo = com->lc_file_ram;
1757
1758                                 lo->ll_flags |= LF_INCOMPLETE;
1759                         } else {
1760                                 struct lfsck_namespace *ns = com->lc_file_ram;
1761
1762                                 ns->ln_flags |= LF_INCOMPLETE;
1763                         }
1764                         break;
1765                 }
1766
1767                 spin_lock(&ltds->ltd_lock);
1768                 if (ltd->ltd_dead) {
1769                         spin_unlock(&ltds->ltd_lock);
1770                         break;
1771                 }
1772
1773                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1774                         struct list_head *list;
1775                         struct list_head *phase_list;
1776
1777                         if (ltd->ltd_layout_done) {
1778                                 spin_unlock(&ltds->ltd_lock);
1779                                 break;
1780                         }
1781
1782                         if (lr->lr_flags & LEF_TO_OST) {
1783                                 list = &lad->lad_ost_list;
1784                                 phase_list = &lad->lad_ost_phase1_list;
1785                         } else {
1786                                 list = &lad->lad_mdt_list;
1787                                 phase_list = &lad->lad_mdt_phase1_list;
1788                         }
1789
1790                         if (list_empty(&ltd->ltd_layout_list))
1791                                 list_add_tail(&ltd->ltd_layout_list, list);
1792                         if (list_empty(&ltd->ltd_layout_phase_list))
1793                                 list_add_tail(&ltd->ltd_layout_phase_list,
1794                                               phase_list);
1795                 } else {
1796                         if (ltd->ltd_namespace_done) {
1797                                 spin_unlock(&ltds->ltd_lock);
1798                                 break;
1799                         }
1800
1801                         if (list_empty(&ltd->ltd_namespace_list))
1802                                 list_add_tail(&ltd->ltd_namespace_list,
1803                                               &lad->lad_mdt_list);
1804                         if (list_empty(&ltd->ltd_namespace_phase_list))
1805                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1806                                               &lad->lad_mdt_phase1_list);
1807                 }
1808                 spin_unlock(&ltds->ltd_lock);
1809                 break;
1810         case LE_STOP:
1811         case LE_PHASE1_DONE:
1812         case LE_PHASE2_DONE:
1813         case LE_PEER_EXIT:
1814                 if (rc != 0 && rc != -EALREADY)
1815                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1816                               "event = %d, rc = %d\n",
1817                               lfsck_lfsck2name(com->lc_lfsck),
1818                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1819                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1820                 break;
1821         case LE_QUERY: {
1822                 struct lfsck_reply *reply;
1823                 struct list_head *list;
1824                 struct list_head *phase_list;
1825
1826                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1827                         list = &ltd->ltd_layout_list;
1828                         phase_list = &ltd->ltd_layout_phase_list;
1829                 } else {
1830                         list = &ltd->ltd_namespace_list;
1831                         phase_list = &ltd->ltd_namespace_phase_list;
1832                 }
1833
1834                 if (rc != 0) {
1835                         spin_lock(&ltds->ltd_lock);
1836                         list_del_init(phase_list);
1837                         list_del_init(list);
1838                         spin_unlock(&ltds->ltd_lock);
1839                         break;
1840                 }
1841
1842                 reply = req_capsule_server_get(&req->rq_pill,
1843                                                &RMF_LFSCK_REPLY);
1844                 if (reply == NULL) {
1845                         rc = -EPROTO;
1846                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1847                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1848                                lad->lad_name, rc);
1849                         spin_lock(&ltds->ltd_lock);
1850                         list_del_init(phase_list);
1851                         list_del_init(list);
1852                         spin_unlock(&ltds->ltd_lock);
1853                         break;
1854                 }
1855
1856                 switch (reply->lr_status) {
1857                 case LS_SCANNING_PHASE1:
1858                         break;
1859                 case LS_SCANNING_PHASE2:
1860                         spin_lock(&ltds->ltd_lock);
1861                         list_del_init(phase_list);
1862                         if (ltd->ltd_dead) {
1863                                 spin_unlock(&ltds->ltd_lock);
1864                                 break;
1865                         }
1866
1867                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1868                                 if (ltd->ltd_layout_done) {
1869                                         spin_unlock(&ltds->ltd_lock);
1870                                         break;
1871                                 }
1872
1873                                 if (lr->lr_flags & LEF_TO_OST)
1874                                         list_add_tail(phase_list,
1875                                                 &lad->lad_ost_phase2_list);
1876                                 else
1877                                         list_add_tail(phase_list,
1878                                                 &lad->lad_mdt_phase2_list);
1879                         } else {
1880                                 if (ltd->ltd_namespace_done) {
1881                                         spin_unlock(&ltds->ltd_lock);
1882                                         break;
1883                                 }
1884
1885                                 list_add_tail(phase_list,
1886                                               &lad->lad_mdt_phase2_list);
1887                         }
1888                         spin_unlock(&ltds->ltd_lock);
1889                         break;
1890                 default:
1891                         spin_lock(&ltds->ltd_lock);
1892                         list_del_init(phase_list);
1893                         list_del_init(list);
1894                         spin_unlock(&ltds->ltd_lock);
1895                         break;
1896                 }
1897                 break;
1898         }
1899         default:
1900                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
1901                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1902                 break;
1903         }
1904
1905         if (!laia->laia_shared) {
1906                 lfsck_tgt_put(ltd);
1907                 lfsck_component_put(env, com);
1908         }
1909
1910         return 0;
1911 }
1912
1913 static void lfsck_interpret(const struct lu_env *env,
1914                             struct lfsck_instance *lfsck,
1915                             struct ptlrpc_request *req, void *args, int result)
1916 {
1917         struct lfsck_async_interpret_args *laia = args;
1918         struct lfsck_component            *com;
1919
1920         LASSERT(laia->laia_com == NULL);
1921         LASSERT(laia->laia_shared);
1922
1923         spin_lock(&lfsck->li_lock);
1924         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1925                 laia->laia_com = com;
1926                 lfsck_async_interpret_common(env, req, laia, result);
1927         }
1928
1929         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1930                 laia->laia_com = com;
1931                 lfsck_async_interpret_common(env, req, laia, result);
1932         }
1933         spin_unlock(&lfsck->li_lock);
1934 }
1935
1936 static int lfsck_stop_notify(const struct lu_env *env,
1937                              struct lfsck_instance *lfsck,
1938                              struct lfsck_tgt_descs *ltds,
1939                              struct lfsck_tgt_desc *ltd, __u16 type)
1940 {
1941         struct lfsck_component *com;
1942         int                     rc = 0;
1943         ENTRY;
1944
1945         LASSERT(lfsck->li_master);
1946
1947         spin_lock(&lfsck->li_lock);
1948         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1949         if (com == NULL)
1950                 com = __lfsck_component_find(lfsck, type,
1951                                              &lfsck->li_list_double_scan);
1952         if (com != NULL)
1953                 lfsck_component_get(com);
1954         spin_unlock(&lfsck->li_lock);
1955
1956         if (com != NULL) {
1957                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
1958                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1959                 struct lfsck_request              *lr    = &info->lti_lr;
1960                 struct lfsck_assistant_data       *lad   = com->lc_data;
1961                 struct list_head                  *list;
1962                 struct list_head                  *phase_list;
1963                 struct ptlrpc_request_set         *set;
1964
1965                 set = ptlrpc_prep_set();
1966                 if (set == NULL) {
1967                         lfsck_component_put(env, com);
1968
1969                         RETURN(-ENOMEM);
1970                 }
1971
1972                 if (type == LFSCK_TYPE_LAYOUT) {
1973                         list = &ltd->ltd_layout_list;
1974                         phase_list = &ltd->ltd_layout_phase_list;
1975                 } else {
1976                         list = &ltd->ltd_namespace_list;
1977                         phase_list = &ltd->ltd_namespace_phase_list;
1978                 }
1979
1980                 spin_lock(&ltds->ltd_lock);
1981                 if (list_empty(list)) {
1982                         LASSERT(list_empty(phase_list));
1983                         spin_unlock(&ltds->ltd_lock);
1984                         ptlrpc_set_destroy(set);
1985
1986                         RETURN(0);
1987                 }
1988
1989                 list_del_init(phase_list);
1990                 list_del_init(list);
1991                 spin_unlock(&ltds->ltd_lock);
1992
1993                 memset(lr, 0, sizeof(*lr));
1994                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1995                 lr->lr_event = LE_PEER_EXIT;
1996                 lr->lr_active = type;
1997                 lr->lr_status = LS_CO_PAUSED;
1998                 if (ltds == &lfsck->li_ost_descs)
1999                         lr->lr_flags = LEF_TO_OST;
2000
2001                 laia->laia_com = com;
2002                 laia->laia_ltds = ltds;
2003                 atomic_inc(&ltd->ltd_ref);
2004                 laia->laia_ltd = ltd;
2005                 laia->laia_lr = lr;
2006                 laia->laia_shared = 0;
2007
2008                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2009                                          lfsck_async_interpret_common,
2010                                          laia, LFSCK_NOTIFY);
2011                 if (rc != 0) {
2012                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2013                                "co-stop for %s: rc = %d\n",
2014                                lfsck_lfsck2name(lfsck),
2015                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2016                                ltd->ltd_index, lad->lad_name, rc);
2017                         lfsck_tgt_put(ltd);
2018                 } else {
2019                         rc = ptlrpc_set_wait(set);
2020                 }
2021
2022                 ptlrpc_set_destroy(set);
2023                 lfsck_component_put(env, com);
2024         }
2025
2026         RETURN(rc);
2027 }
2028
2029 static int lfsck_async_interpret(const struct lu_env *env,
2030                                  struct ptlrpc_request *req,
2031                                  void *args, int rc)
2032 {
2033         struct lfsck_async_interpret_args *laia = args;
2034         struct lfsck_instance             *lfsck;
2035
2036         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2037                               li_mdt_descs);
2038         lfsck_interpret(env, lfsck, req, laia, rc);
2039         lfsck_tgt_put(laia->laia_ltd);
2040         if (rc != 0 && laia->laia_result != -EALREADY)
2041                 laia->laia_result = rc;
2042
2043         return 0;
2044 }
2045
2046 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2047                         struct lfsck_request *lr,
2048                         struct ptlrpc_request_set *set,
2049                         ptlrpc_interpterer_t interpreter,
2050                         void *args, int request)
2051 {
2052         struct lfsck_async_interpret_args *laia;
2053         struct ptlrpc_request             *req;
2054         struct lfsck_request              *tmp;
2055         struct req_format                 *format;
2056         int                                rc;
2057
2058         switch (request) {
2059         case LFSCK_NOTIFY:
2060                 format = &RQF_LFSCK_NOTIFY;
2061                 break;
2062         case LFSCK_QUERY:
2063                 format = &RQF_LFSCK_QUERY;
2064                 break;
2065         default:
2066                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2067                        exp->exp_obd->obd_name, request, -EINVAL);
2068                 return -EINVAL;
2069         }
2070
2071         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2072         if (req == NULL)
2073                 return -ENOMEM;
2074
2075         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2076         if (rc != 0) {
2077                 ptlrpc_request_free(req);
2078
2079                 return rc;
2080         }
2081
2082         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2083         *tmp = *lr;
2084         ptlrpc_request_set_replen(req);
2085
2086         laia = ptlrpc_req_async_args(req);
2087         *laia = *(struct lfsck_async_interpret_args *)args;
2088         if (laia->laia_com != NULL)
2089                 lfsck_component_get(laia->laia_com);
2090         req->rq_interpret_reply = interpreter;
2091         ptlrpc_set_add_req(set, req);
2092
2093         return 0;
2094 }
2095
2096 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2097                           struct lfsck_start_param *lsp)
2098 {
2099         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2100         struct lfsck_assistant_data     *lad     = com->lc_data;
2101         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2102         struct ptlrpc_thread            *athread = &lad->lad_thread;
2103         struct lfsck_thread_args        *lta;
2104         struct task_struct              *task;
2105         int                              rc;
2106         ENTRY;
2107
2108         lad->lad_assistant_status = 0;
2109         lad->lad_post_result = 0;
2110         lad->lad_to_post = 0;
2111         lad->lad_to_double_scan = 0;
2112         lad->lad_in_double_scan = 0;
2113         lad->lad_exit = 0;
2114         thread_set_flags(athread, 0);
2115
2116         lta = lfsck_thread_args_init(lfsck, com, lsp);
2117         if (IS_ERR(lta))
2118                 RETURN(PTR_ERR(lta));
2119
2120         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2121         if (IS_ERR(task)) {
2122                 rc = PTR_ERR(task);
2123                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2124                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2125                 lfsck_thread_args_fini(lta);
2126         } else {
2127                 struct l_wait_info lwi = { 0 };
2128
2129                 l_wait_event(mthread->t_ctl_waitq,
2130                              thread_is_running(athread) ||
2131                              thread_is_stopped(athread),
2132                              &lwi);
2133                 if (unlikely(!thread_is_running(athread)))
2134                         rc = lad->lad_assistant_status;
2135                 else
2136                         rc = 0;
2137         }
2138
2139         RETURN(rc);
2140 }
2141
2142 int lfsck_checkpoint_generic(const struct lu_env *env,
2143                              struct lfsck_component *com)
2144 {
2145         struct lfsck_assistant_data     *lad     = com->lc_data;
2146         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2147         struct ptlrpc_thread            *athread = &lad->lad_thread;
2148         struct l_wait_info               lwi     = { 0 };
2149
2150         if (com->lc_new_checked == 0)
2151                 return LFSCK_CHECKPOINT_SKIP;
2152
2153         l_wait_event(mthread->t_ctl_waitq,
2154                      list_empty(&lad->lad_req_list) ||
2155                      !thread_is_running(mthread) ||
2156                      thread_is_stopped(athread),
2157                      &lwi);
2158
2159         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2160                 return LFSCK_CHECKPOINT_SKIP;
2161
2162         return 0;
2163 }
2164
2165 void lfsck_post_generic(const struct lu_env *env,
2166                         struct lfsck_component *com, int *result)
2167 {
2168         struct lfsck_assistant_data     *lad     = com->lc_data;
2169         struct ptlrpc_thread            *athread = &lad->lad_thread;
2170         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2171         struct l_wait_info               lwi     = { 0 };
2172
2173         lad->lad_post_result = *result;
2174         if (*result <= 0)
2175                 lad->lad_exit = 1;
2176         lad->lad_to_post = 1;
2177
2178         wake_up_all(&athread->t_ctl_waitq);
2179         l_wait_event(mthread->t_ctl_waitq,
2180                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2181                      thread_is_stopped(athread),
2182                      &lwi);
2183
2184         if (lad->lad_assistant_status < 0)
2185                 *result = lad->lad_assistant_status;
2186 }
2187
2188 int lfsck_double_scan_generic(const struct lu_env *env,
2189                               struct lfsck_component *com, int status)
2190 {
2191         struct lfsck_assistant_data     *lad     = com->lc_data;
2192         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2193         struct ptlrpc_thread            *athread = &lad->lad_thread;
2194         struct l_wait_info               lwi     = { 0 };
2195
2196         if (status != LS_SCANNING_PHASE2)
2197                 lad->lad_exit = 1;
2198         else
2199                 lad->lad_to_double_scan = 1;
2200
2201         wake_up_all(&athread->t_ctl_waitq);
2202         l_wait_event(mthread->t_ctl_waitq,
2203                      lad->lad_in_double_scan ||
2204                      thread_is_stopped(athread),
2205                      &lwi);
2206
2207         if (lad->lad_assistant_status < 0)
2208                 return lad->lad_assistant_status;
2209
2210         return 0;
2211 }
2212
2213 void lfsck_quit_generic(const struct lu_env *env,
2214                         struct lfsck_component *com)
2215 {
2216         struct lfsck_assistant_data     *lad     = com->lc_data;
2217         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2218         struct ptlrpc_thread            *athread = &lad->lad_thread;
2219         struct l_wait_info               lwi     = { 0 };
2220
2221         lad->lad_exit = 1;
2222         wake_up_all(&athread->t_ctl_waitq);
2223         l_wait_event(mthread->t_ctl_waitq,
2224                      thread_is_init(athread) ||
2225                      thread_is_stopped(athread),
2226                      &lwi);
2227 }
2228
2229 /* external interfaces */
2230
2231 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2232 {
2233         struct lu_env           env;
2234         struct lfsck_instance  *lfsck;
2235         int                     rc;
2236         ENTRY;
2237
2238         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2239         if (rc != 0)
2240                 RETURN(rc);
2241
2242         lfsck = lfsck_instance_find(key, true, false);
2243         if (likely(lfsck != NULL)) {
2244                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2245                 lfsck_instance_put(&env, lfsck);
2246         } else {
2247                 rc = -ENXIO;
2248         }
2249
2250         lu_env_fini(&env);
2251
2252         RETURN(rc);
2253 }
2254 EXPORT_SYMBOL(lfsck_get_speed);
2255
2256 int lfsck_set_speed(struct dt_device *key, int val)
2257 {
2258         struct lu_env           env;
2259         struct lfsck_instance  *lfsck;
2260         int                     rc;
2261         ENTRY;
2262
2263         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2264         if (rc != 0)
2265                 RETURN(rc);
2266
2267         lfsck = lfsck_instance_find(key, true, false);
2268         if (likely(lfsck != NULL)) {
2269                 mutex_lock(&lfsck->li_mutex);
2270                 if (__lfsck_set_speed(lfsck, val))
2271                         rc = lfsck_bookmark_store(&env, lfsck);
2272                 mutex_unlock(&lfsck->li_mutex);
2273                 lfsck_instance_put(&env, lfsck);
2274         } else {
2275                 rc = -ENXIO;
2276         }
2277
2278         lu_env_fini(&env);
2279
2280         RETURN(rc);
2281 }
2282 EXPORT_SYMBOL(lfsck_set_speed);
2283
2284 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2285 {
2286         struct lu_env           env;
2287         struct lfsck_instance  *lfsck;
2288         int                     rc;
2289         ENTRY;
2290
2291         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2292         if (rc != 0)
2293                 RETURN(rc);
2294
2295         lfsck = lfsck_instance_find(key, true, false);
2296         if (likely(lfsck != NULL)) {
2297                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2298                 lfsck_instance_put(&env, lfsck);
2299         } else {
2300                 rc = -ENXIO;
2301         }
2302
2303         lu_env_fini(&env);
2304
2305         RETURN(rc);
2306 }
2307 EXPORT_SYMBOL(lfsck_get_windows);
2308
2309 int lfsck_set_windows(struct dt_device *key, int val)
2310 {
2311         struct lu_env           env;
2312         struct lfsck_instance  *lfsck;
2313         int                     rc;
2314         ENTRY;
2315
2316         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2317         if (rc != 0)
2318                 RETURN(rc);
2319
2320         lfsck = lfsck_instance_find(key, true, false);
2321         if (likely(lfsck != NULL)) {
2322                 if (val > LFSCK_ASYNC_WIN_MAX) {
2323                         CWARN("%s: Too large async window size, which "
2324                               "may cause memory issues. The valid range "
2325                               "is [0 - %u]. If you do not want to restrict "
2326                               "the window size for async requests pipeline, "
2327                               "just set it as 0.\n",
2328                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2329                         rc = -EINVAL;
2330                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2331                         mutex_lock(&lfsck->li_mutex);
2332                         lfsck->li_bookmark_ram.lb_async_windows = val;
2333                         rc = lfsck_bookmark_store(&env, lfsck);
2334                         mutex_unlock(&lfsck->li_mutex);
2335                 }
2336                 lfsck_instance_put(&env, lfsck);
2337         } else {
2338                 rc = -ENXIO;
2339         }
2340
2341         lu_env_fini(&env);
2342
2343         RETURN(rc);
2344 }
2345 EXPORT_SYMBOL(lfsck_set_windows);
2346
2347 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2348 {
2349         struct lu_env           env;
2350         struct lfsck_instance  *lfsck;
2351         struct lfsck_component *com;
2352         int                     rc;
2353         ENTRY;
2354
2355         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2356         if (rc != 0)
2357                 RETURN(rc);
2358
2359         lfsck = lfsck_instance_find(key, true, false);
2360         if (likely(lfsck != NULL)) {
2361                 com = lfsck_component_find(lfsck, type);
2362                 if (likely(com != NULL)) {
2363                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2364                         lfsck_component_put(&env, com);
2365                 } else {
2366                         rc = -ENOTSUPP;
2367                 }
2368
2369                 lfsck_instance_put(&env, lfsck);
2370         } else {
2371                 rc = -ENXIO;
2372         }
2373
2374         lu_env_fini(&env);
2375
2376         RETURN(rc);
2377 }
2378 EXPORT_SYMBOL(lfsck_dump);
2379
2380 static int lfsck_stop_all(const struct lu_env *env,
2381                           struct lfsck_instance *lfsck,
2382                           struct lfsck_stop *stop)
2383 {
2384         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2385         struct lfsck_request              *lr     = &info->lti_lr;
2386         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2387         struct ptlrpc_request_set         *set;
2388         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2389         struct lfsck_tgt_desc             *ltd;
2390         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2391         __u32                              idx;
2392         int                                rc     = 0;
2393         int                                rc1    = 0;
2394         ENTRY;
2395
2396         LASSERT(stop->ls_flags & LPF_BROADCAST);
2397
2398         set = ptlrpc_prep_set();
2399         if (unlikely(set == NULL))
2400                 RETURN(-ENOMEM);
2401
2402         memset(lr, 0, sizeof(*lr));
2403         lr->lr_event = LE_STOP;
2404         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2405         lr->lr_status = stop->ls_status;
2406         lr->lr_version = bk->lb_version;
2407         lr->lr_active = LFSCK_TYPES_ALL;
2408         lr->lr_param = stop->ls_flags;
2409
2410         laia->laia_com = NULL;
2411         laia->laia_ltds = ltds;
2412         laia->laia_lr = lr;
2413         laia->laia_result = 0;
2414         laia->laia_shared = 1;
2415
2416         down_read(&ltds->ltd_rw_sem);
2417         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2418                 ltd = lfsck_tgt_get(ltds, idx);
2419                 LASSERT(ltd != NULL);
2420
2421                 laia->laia_ltd = ltd;
2422                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2423                                          lfsck_async_interpret, laia,
2424                                          LFSCK_NOTIFY);
2425                 if (rc != 0) {
2426                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2427                         lfsck_tgt_put(ltd);
2428                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2429                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2430                         rc1 = rc;
2431                 }
2432         }
2433         up_read(&ltds->ltd_rw_sem);
2434
2435         rc = ptlrpc_set_wait(set);
2436         ptlrpc_set_destroy(set);
2437
2438         if (rc == 0)
2439                 rc = laia->laia_result;
2440
2441         if (rc == -EALREADY)
2442                 rc = 0;
2443
2444         if (rc != 0)
2445                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2446                        lfsck_lfsck2name(lfsck), rc);
2447
2448         RETURN(rc != 0 ? rc : rc1);
2449 }
2450
2451 static int lfsck_start_all(const struct lu_env *env,
2452                            struct lfsck_instance *lfsck,
2453                            struct lfsck_start *start)
2454 {
2455         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2456         struct lfsck_request              *lr     = &info->lti_lr;
2457         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2458         struct ptlrpc_request_set         *set;
2459         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2460         struct lfsck_tgt_desc             *ltd;
2461         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2462         __u32                              idx;
2463         int                                rc     = 0;
2464         ENTRY;
2465
2466         LASSERT(start->ls_flags & LPF_BROADCAST);
2467
2468         set = ptlrpc_prep_set();
2469         if (unlikely(set == NULL))
2470                 RETURN(-ENOMEM);
2471
2472         memset(lr, 0, sizeof(*lr));
2473         lr->lr_event = LE_START;
2474         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2475         lr->lr_speed = bk->lb_speed_limit;
2476         lr->lr_version = bk->lb_version;
2477         lr->lr_active = start->ls_active;
2478         lr->lr_param = start->ls_flags;
2479         lr->lr_async_windows = bk->lb_async_windows;
2480         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2481                        LSV_ASYNC_WINDOWS;
2482
2483         laia->laia_com = NULL;
2484         laia->laia_ltds = ltds;
2485         laia->laia_lr = lr;
2486         laia->laia_result = 0;
2487         laia->laia_shared = 1;
2488
2489         down_read(&ltds->ltd_rw_sem);
2490         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2491                 ltd = lfsck_tgt_get(ltds, idx);
2492                 LASSERT(ltd != NULL);
2493
2494                 laia->laia_ltd = ltd;
2495                 ltd->ltd_layout_done = 0;
2496                 ltd->ltd_namespace_done = 0;
2497                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2498                                          lfsck_async_interpret, laia,
2499                                          LFSCK_NOTIFY);
2500                 if (rc != 0) {
2501                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2502                         lfsck_tgt_put(ltd);
2503                         CERROR("%s: cannot notify MDT %x for LFSCK "
2504                                "start, failout: rc = %d\n",
2505                                lfsck_lfsck2name(lfsck), idx, rc);
2506                         break;
2507                 }
2508         }
2509         up_read(&ltds->ltd_rw_sem);
2510
2511         if (rc != 0) {
2512                 ptlrpc_set_destroy(set);
2513
2514                 RETURN(rc);
2515         }
2516
2517         rc = ptlrpc_set_wait(set);
2518         ptlrpc_set_destroy(set);
2519
2520         if (rc == 0)
2521                 rc = laia->laia_result;
2522
2523         if (rc != 0) {
2524                 struct lfsck_stop *stop = &info->lti_stop;
2525
2526                 CERROR("%s: cannot start LFSCK on some MDTs, "
2527                        "stop all: rc = %d\n",
2528                        lfsck_lfsck2name(lfsck), rc);
2529                 if (rc != -EALREADY) {
2530                         stop->ls_status = LS_FAILED;
2531                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2532                         lfsck_stop_all(env, lfsck, stop);
2533                 }
2534         }
2535
2536         RETURN(rc);
2537 }
2538
2539 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2540                 struct lfsck_start_param *lsp)
2541 {
2542         struct lfsck_start              *start  = lsp->lsp_start;
2543         struct lfsck_instance           *lfsck;
2544         struct lfsck_bookmark           *bk;
2545         struct ptlrpc_thread            *thread;
2546         struct lfsck_component          *com;
2547         struct l_wait_info               lwi    = { 0 };
2548         struct lfsck_thread_args        *lta;
2549         struct task_struct              *task;
2550         int                              rc     = 0;
2551         __u16                            valid  = 0;
2552         __u16                            flags  = 0;
2553         __u16                            type   = 1;
2554         ENTRY;
2555
2556         lfsck = lfsck_instance_find(key, true, false);
2557         if (unlikely(lfsck == NULL))
2558                 RETURN(-ENXIO);
2559
2560         /* System is not ready, try again later. */
2561         if (unlikely(lfsck->li_namespace == NULL))
2562                 GOTO(put, rc = -EAGAIN);
2563
2564         /* start == NULL means auto trigger paused LFSCK. */
2565         if ((start == NULL) &&
2566             (list_empty(&lfsck->li_list_scan) ||
2567              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2568                 GOTO(put, rc = 0);
2569
2570         bk = &lfsck->li_bookmark_ram;
2571         thread = &lfsck->li_thread;
2572         mutex_lock(&lfsck->li_mutex);
2573         spin_lock(&lfsck->li_lock);
2574         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2575                 rc = -EALREADY;
2576                 if (unlikely(start == NULL)) {
2577                         spin_unlock(&lfsck->li_lock);
2578                         GOTO(out, rc);
2579                 }
2580
2581                 while (start->ls_active != 0) {
2582                         if (!(type & start->ls_active)) {
2583                                 type <<= 1;
2584                                 continue;
2585                         }
2586
2587                         com = __lfsck_component_find(lfsck, type,
2588                                                      &lfsck->li_list_scan);
2589                         if (com == NULL)
2590                                 com = __lfsck_component_find(lfsck, type,
2591                                                 &lfsck->li_list_double_scan);
2592                         if (com == NULL) {
2593                                 rc = -EOPNOTSUPP;
2594                                 break;
2595                         }
2596
2597                         if (com->lc_ops->lfsck_join != NULL) {
2598                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2599                                 if (rc != 0 && rc != -EALREADY)
2600                                         break;
2601                         }
2602                         start->ls_active &= ~type;
2603                         type <<= 1;
2604                 }
2605                 spin_unlock(&lfsck->li_lock);
2606                 GOTO(out, rc);
2607         }
2608         spin_unlock(&lfsck->li_lock);
2609
2610         lfsck->li_status = 0;
2611         lfsck->li_oit_over = 0;
2612         lfsck->li_start_unplug = 0;
2613         lfsck->li_drop_dryrun = 0;
2614         lfsck->li_new_scanned = 0;
2615
2616         /* For auto trigger. */
2617         if (start == NULL)
2618                 goto trigger;
2619
2620         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2621                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2622                        lfsck_lfsck2name(lfsck));
2623
2624                 GOTO(out, rc = -EPERM);
2625         }
2626
2627         start->ls_version = bk->lb_version;
2628
2629         if (start->ls_active != 0) {
2630                 struct lfsck_component *next;
2631
2632                 if (start->ls_active == LFSCK_TYPES_ALL)
2633                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2634
2635                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2636                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2637                         GOTO(out, rc = -ENOTSUPP);
2638                 }
2639
2640                 list_for_each_entry_safe(com, next,
2641                                          &lfsck->li_list_scan, lc_link) {
2642                         if (!(com->lc_type & start->ls_active)) {
2643                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2644                                                              false);
2645                                 if (rc != 0)
2646                                         GOTO(out, rc);
2647                         }
2648                 }
2649
2650                 while (start->ls_active != 0) {
2651                         if (type & start->ls_active) {
2652                                 com = __lfsck_component_find(lfsck, type,
2653                                                         &lfsck->li_list_idle);
2654                                 if (com != NULL)
2655                                         /* The component status will be updated
2656                                          * when its prep() is called later by
2657                                          * the LFSCK main engine. */
2658                                         list_move_tail(&com->lc_link,
2659                                                        &lfsck->li_list_scan);
2660                                 start->ls_active &= ~type;
2661                         }
2662                         type <<= 1;
2663                 }
2664         }
2665
2666         if (list_empty(&lfsck->li_list_scan)) {
2667                 /* The speed limit will be used to control both the LFSCK and
2668                  * low layer scrub (if applied), need to be handled firstly. */
2669                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2670                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2671                                 rc = lfsck_bookmark_store(env, lfsck);
2672                                 if (rc != 0)
2673                                         GOTO(out, rc);
2674                         }
2675                 }
2676
2677                 goto trigger;
2678         }
2679
2680         if (start->ls_flags & LPF_RESET)
2681                 flags |= DOIF_RESET;
2682
2683         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2684         if (rc != 0)
2685                 GOTO(out, rc);
2686
2687         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2688                 start->ls_active |= com->lc_type;
2689                 if (flags & DOIF_RESET) {
2690                         rc = com->lc_ops->lfsck_reset(env, com, false);
2691                         if (rc != 0)
2692                                 GOTO(out, rc);
2693                 }
2694         }
2695
2696 trigger:
2697         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2698         if (bk->lb_param & LPF_DRYRUN)
2699                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2700
2701         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2702                 valid |= DOIV_ERROR_HANDLE;
2703                 if (start->ls_flags & LPF_FAILOUT)
2704                         flags |= DOIF_FAILOUT;
2705         }
2706
2707         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2708                 valid |= DOIV_DRYRUN;
2709                 if (start->ls_flags & LPF_DRYRUN)
2710                         flags |= DOIF_DRYRUN;
2711         }
2712
2713         if (!list_empty(&lfsck->li_list_scan))
2714                 flags |= DOIF_OUTUSED;
2715
2716         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2717         thread_set_flags(thread, 0);
2718         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2719         if (IS_ERR(lta))
2720                 GOTO(out, rc = PTR_ERR(lta));
2721
2722         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2723         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2724         if (IS_ERR(task)) {
2725                 rc = PTR_ERR(task);
2726                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2727                        lfsck_lfsck2name(lfsck), rc);
2728                 lfsck_thread_args_fini(lta);
2729
2730                 GOTO(out, rc);
2731         }
2732
2733         l_wait_event(thread->t_ctl_waitq,
2734                      thread_is_running(thread) ||
2735                      thread_is_stopped(thread),
2736                      &lwi);
2737         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2738                 lfsck->li_start_unplug = 1;
2739                 wake_up_all(&thread->t_ctl_waitq);
2740
2741                 GOTO(out, rc = 0);
2742         }
2743
2744         /* release lfsck::li_mutex to avoid deadlock. */
2745         mutex_unlock(&lfsck->li_mutex);
2746         rc = lfsck_start_all(env, lfsck, start);
2747         if (rc != 0) {
2748                 spin_lock(&lfsck->li_lock);
2749                 if (thread_is_stopped(thread)) {
2750                         spin_unlock(&lfsck->li_lock);
2751                 } else {
2752                         lfsck->li_status = LS_FAILED;
2753                         lfsck->li_flags = 0;
2754                         thread_set_flags(thread, SVC_STOPPING);
2755                         spin_unlock(&lfsck->li_lock);
2756
2757                         lfsck->li_start_unplug = 1;
2758                         wake_up_all(&thread->t_ctl_waitq);
2759                         l_wait_event(thread->t_ctl_waitq,
2760                                      thread_is_stopped(thread),
2761                                      &lwi);
2762                 }
2763         } else {
2764                 lfsck->li_start_unplug = 1;
2765                 wake_up_all(&thread->t_ctl_waitq);
2766         }
2767
2768         GOTO(put, rc);
2769
2770 out:
2771         mutex_unlock(&lfsck->li_mutex);
2772
2773 put:
2774         lfsck_instance_put(env, lfsck);
2775
2776         return rc < 0 ? rc : 0;
2777 }
2778 EXPORT_SYMBOL(lfsck_start);
2779
2780 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2781                struct lfsck_stop *stop)
2782 {
2783         struct lfsck_instance   *lfsck;
2784         struct ptlrpc_thread    *thread;
2785         struct l_wait_info       lwi    = { 0 };
2786         int                      rc     = 0;
2787         int                      rc1    = 0;
2788         ENTRY;
2789
2790         lfsck = lfsck_instance_find(key, true, false);
2791         if (unlikely(lfsck == NULL))
2792                 RETURN(-ENXIO);
2793
2794         thread = &lfsck->li_thread;
2795         /* release lfsck::li_mutex to avoid deadlock. */
2796         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2797                 if (!lfsck->li_master) {
2798                         CERROR("%s: only allow to specify '-A' via MDS\n",
2799                                lfsck_lfsck2name(lfsck));
2800
2801                         GOTO(out, rc = -EPERM);
2802                 }
2803
2804                 rc1 = lfsck_stop_all(env, lfsck, stop);
2805         }
2806
2807         mutex_lock(&lfsck->li_mutex);
2808         spin_lock(&lfsck->li_lock);
2809         /* no error if LFSCK is already stopped, or was never started */
2810         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2811                 spin_unlock(&lfsck->li_lock);
2812                 GOTO(out, rc = 0);
2813         }
2814
2815         if (stop != NULL) {
2816                 lfsck->li_status = stop->ls_status;
2817                 lfsck->li_flags = stop->ls_flags;
2818         } else {
2819                 lfsck->li_status = LS_STOPPED;
2820                 lfsck->li_flags = 0;
2821         }
2822
2823         thread_set_flags(thread, SVC_STOPPING);
2824         spin_unlock(&lfsck->li_lock);
2825
2826         wake_up_all(&thread->t_ctl_waitq);
2827         l_wait_event(thread->t_ctl_waitq,
2828                      thread_is_stopped(thread),
2829                      &lwi);
2830
2831         GOTO(out, rc = 0);
2832
2833 out:
2834         mutex_unlock(&lfsck->li_mutex);
2835         lfsck_instance_put(env, lfsck);
2836
2837         return rc != 0 ? rc : rc1;
2838 }
2839 EXPORT_SYMBOL(lfsck_stop);
2840
2841 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2842                     struct lfsck_request *lr)
2843 {
2844         int rc = -EOPNOTSUPP;
2845         ENTRY;
2846
2847         switch (lr->lr_event) {
2848         case LE_START: {
2849                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2850                 struct lfsck_start_param  lsp;
2851
2852                 memset(start, 0, sizeof(*start));
2853                 start->ls_valid = lr->lr_valid;
2854                 start->ls_speed_limit = lr->lr_speed;
2855                 start->ls_version = lr->lr_version;
2856                 start->ls_active = lr->lr_active;
2857                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2858                 start->ls_async_windows = lr->lr_async_windows;
2859
2860                 lsp.lsp_start = start;
2861                 lsp.lsp_index = lr->lr_index;
2862                 lsp.lsp_index_valid = 1;
2863                 rc = lfsck_start(env, key, &lsp);
2864                 break;
2865         }
2866         case LE_STOP: {
2867                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2868
2869                 memset(stop, 0, sizeof(*stop));
2870                 stop->ls_status = lr->lr_status;
2871                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2872                 rc = lfsck_stop(env, key, stop);
2873                 break;
2874         }
2875         case LE_PHASE1_DONE:
2876         case LE_PHASE2_DONE:
2877         case LE_FID_ACCESSED:
2878         case LE_PEER_EXIT:
2879         case LE_CONDITIONAL_DESTROY:
2880         case LE_PAIRS_VERIFY: {
2881                 struct lfsck_instance  *lfsck;
2882                 struct lfsck_component *com;
2883
2884                 lfsck = lfsck_instance_find(key, true, false);
2885                 if (unlikely(lfsck == NULL))
2886                         RETURN(-ENXIO);
2887
2888                 com = lfsck_component_find(lfsck, lr->lr_active);
2889                 if (likely(com != NULL)) {
2890                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2891                         lfsck_component_put(env, com);
2892                 }
2893
2894                 lfsck_instance_put(env, lfsck);
2895                 break;
2896         }
2897         default:
2898                 break;
2899         }
2900
2901         RETURN(rc);
2902 }
2903 EXPORT_SYMBOL(lfsck_in_notify);
2904
2905 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2906                 struct lfsck_request *lr)
2907 {
2908         struct lfsck_instance  *lfsck;
2909         struct lfsck_component *com;
2910         int                     rc;
2911         ENTRY;
2912
2913         lfsck = lfsck_instance_find(key, true, false);
2914         if (unlikely(lfsck == NULL))
2915                 RETURN(-ENXIO);
2916
2917         com = lfsck_component_find(lfsck, lr->lr_active);
2918         if (likely(com != NULL)) {
2919                 rc = com->lc_ops->lfsck_query(env, com);
2920                 lfsck_component_put(env, com);
2921         } else {
2922                 rc = -ENOTSUPP;
2923         }
2924
2925         lfsck_instance_put(env, lfsck);
2926
2927         RETURN(rc);
2928 }
2929 EXPORT_SYMBOL(lfsck_query);
2930
2931 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2932                              struct ldlm_namespace *ns)
2933 {
2934         struct lfsck_instance  *lfsck;
2935         int                     rc      = -ENXIO;
2936
2937         lfsck = lfsck_instance_find(key, true, false);
2938         if (likely(lfsck != NULL)) {
2939                 lfsck->li_namespace = ns;
2940                 lfsck_instance_put(env, lfsck);
2941                 rc = 0;
2942         }
2943
2944         return rc;
2945 }
2946 EXPORT_SYMBOL(lfsck_register_namespace);
2947
2948 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2949                    struct dt_device *next, struct obd_device *obd,
2950                    lfsck_out_notify notify, void *notify_data, bool master)
2951 {
2952         struct lfsck_instance   *lfsck;
2953         struct dt_object        *root  = NULL;
2954         struct dt_object        *obj   = NULL;
2955         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2956         int                      rc;
2957         ENTRY;
2958
2959         lfsck = lfsck_instance_find(key, false, false);
2960         if (unlikely(lfsck != NULL))
2961                 RETURN(-EEXIST);
2962
2963         OBD_ALLOC_PTR(lfsck);
2964         if (lfsck == NULL)
2965                 RETURN(-ENOMEM);
2966
2967         mutex_init(&lfsck->li_mutex);
2968         spin_lock_init(&lfsck->li_lock);
2969         INIT_LIST_HEAD(&lfsck->li_link);
2970         INIT_LIST_HEAD(&lfsck->li_list_scan);
2971         INIT_LIST_HEAD(&lfsck->li_list_dir);
2972         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2973         INIT_LIST_HEAD(&lfsck->li_list_idle);
2974         atomic_set(&lfsck->li_ref, 1);
2975         atomic_set(&lfsck->li_double_scan_count, 0);
2976         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2977         lfsck->li_out_notify = notify;
2978         lfsck->li_out_notify_data = notify_data;
2979         lfsck->li_next = next;
2980         lfsck->li_bottom = key;
2981         lfsck->li_obd = obd;
2982
2983         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2984         if (rc != 0)
2985                 GOTO(out, rc);
2986
2987         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2988         if (rc != 0)
2989                 GOTO(out, rc);
2990
2991         fid->f_seq = FID_SEQ_LOCAL_NAME;
2992         fid->f_oid = 1;
2993         fid->f_ver = 0;
2994         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
2995         if (rc != 0)
2996                 GOTO(out, rc);
2997
2998         rc = dt_root_get(env, key, fid);
2999         if (rc != 0)
3000                 GOTO(out, rc);
3001
3002         root = dt_locate(env, key, fid);
3003         if (IS_ERR(root))
3004                 GOTO(out, rc = PTR_ERR(root));
3005
3006         if (unlikely(!dt_try_as_dir(env, root)))
3007                 GOTO(out, rc = -ENOTDIR);
3008
3009         lfsck->li_local_root_fid = *fid;
3010         if (master) {
3011                 lfsck->li_master = 1;
3012                 if (lfsck_dev_idx(key) == 0) {
3013                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3014                         const struct lu_name *cname;
3015
3016                         rc = dt_lookup(env, root,
3017                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3018                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3019                         if (rc != 0)
3020                                 GOTO(out, rc);
3021
3022                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3023                         if (IS_ERR(obj))
3024                                 GOTO(out, rc = PTR_ERR(obj));
3025
3026                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3027                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3028                         if (rc != 0)
3029                                 GOTO(out, rc);
3030
3031                         lu_object_put(env, &obj->do_lu);
3032                         obj = dt_locate(env, key, fid);
3033                         if (IS_ERR(obj))
3034                                 GOTO(out, rc = PTR_ERR(obj));
3035
3036                         cname = lfsck_name_get_const(env, dotlustre,
3037                                                      strlen(dotlustre));
3038                         rc = lfsck_verify_linkea(env, key, obj, cname,
3039                                                  &lfsck->li_global_root_fid);
3040                         if (rc != 0)
3041                                 GOTO(out, rc);
3042
3043                         *pfid = *fid;
3044                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3045                                        (const struct dt_key *)lostfound,
3046                                        BYPASS_CAPA);
3047                         if (rc != 0)
3048                                 GOTO(out, rc);
3049
3050                         lu_object_put(env, &obj->do_lu);
3051                         obj = dt_locate(env, key, fid);
3052                         if (IS_ERR(obj))
3053                                 GOTO(out, rc = PTR_ERR(obj));
3054
3055                         cname = lfsck_name_get_const(env, lostfound,
3056                                                      strlen(lostfound));
3057                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3058                         if (rc != 0)
3059                                 GOTO(out, rc);
3060
3061                         lu_object_put(env, &obj->do_lu);
3062                         obj = NULL;
3063                 }
3064         }
3065
3066         fid->f_seq = FID_SEQ_LOCAL_FILE;
3067         fid->f_oid = OTABLE_IT_OID;
3068         fid->f_ver = 0;
3069         obj = dt_locate(env, key, fid);
3070         if (IS_ERR(obj))
3071                 GOTO(out, rc = PTR_ERR(obj));
3072
3073         lu_object_get(&obj->do_lu);
3074         lfsck->li_obj_oit = obj;
3075         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3076         if (rc != 0)
3077                 GOTO(out, rc);
3078
3079         rc = lfsck_bookmark_setup(env, lfsck);
3080         if (rc != 0)
3081                 GOTO(out, rc);
3082
3083         if (master) {
3084                 rc = lfsck_fid_init(lfsck);
3085                 if (rc < 0)
3086                         GOTO(out, rc);
3087
3088                 rc = lfsck_namespace_setup(env, lfsck);
3089                 if (rc < 0)
3090                         GOTO(out, rc);
3091         }
3092
3093         rc = lfsck_layout_setup(env, lfsck);
3094         if (rc < 0)
3095                 GOTO(out, rc);
3096
3097         /* XXX: more LFSCK components initialization to be added here. */
3098
3099         rc = lfsck_instance_add(lfsck);
3100         if (rc == 0)
3101                 rc = lfsck_add_target_from_orphan(env, lfsck);
3102 out:
3103         if (obj != NULL && !IS_ERR(obj))
3104                 lu_object_put(env, &obj->do_lu);
3105         if (root != NULL && !IS_ERR(root))
3106                 lu_object_put(env, &root->do_lu);
3107         if (rc != 0)
3108                 lfsck_instance_cleanup(env, lfsck);
3109         return rc;
3110 }
3111 EXPORT_SYMBOL(lfsck_register);
3112
3113 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3114 {
3115         struct lfsck_instance *lfsck;
3116
3117         lfsck = lfsck_instance_find(key, false, true);
3118         if (lfsck != NULL)
3119                 lfsck_instance_put(env, lfsck);
3120 }
3121 EXPORT_SYMBOL(lfsck_degister);
3122
3123 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3124                      struct dt_device *tgt, struct obd_export *exp,
3125                      __u32 index, bool for_ost)
3126 {
3127         struct lfsck_instance   *lfsck;
3128         struct lfsck_tgt_desc   *ltd;
3129         int                      rc;
3130         ENTRY;
3131
3132         OBD_ALLOC_PTR(ltd);
3133         if (ltd == NULL)
3134                 RETURN(-ENOMEM);
3135
3136         ltd->ltd_tgt = tgt;
3137         ltd->ltd_key = key;
3138         ltd->ltd_exp = exp;
3139         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3140         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3141         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3142         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3143         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3144         atomic_set(&ltd->ltd_ref, 1);
3145         ltd->ltd_index = index;
3146
3147         spin_lock(&lfsck_instance_lock);
3148         lfsck = __lfsck_instance_find(key, true, false);
3149         if (lfsck == NULL) {
3150                 if (for_ost)
3151                         list_add_tail(&ltd->ltd_orphan_list,
3152                                       &lfsck_ost_orphan_list);
3153                 else
3154                         list_add_tail(&ltd->ltd_orphan_list,
3155                                       &lfsck_mdt_orphan_list);
3156                 spin_unlock(&lfsck_instance_lock);
3157
3158                 RETURN(0);
3159         }
3160         spin_unlock(&lfsck_instance_lock);
3161
3162         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3163         if (rc != 0)
3164                 lfsck_tgt_put(ltd);
3165
3166         lfsck_instance_put(env, lfsck);
3167
3168         RETURN(rc);
3169 }
3170 EXPORT_SYMBOL(lfsck_add_target);
3171
3172 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3173                       struct dt_device *tgt, __u32 index, bool for_ost)
3174 {
3175         struct lfsck_instance   *lfsck;
3176         struct lfsck_tgt_descs  *ltds;
3177         struct lfsck_tgt_desc   *ltd;
3178         struct list_head        *head;
3179
3180         if (for_ost)
3181                 head = &lfsck_ost_orphan_list;
3182         else
3183                 head = &lfsck_mdt_orphan_list;
3184
3185         spin_lock(&lfsck_instance_lock);
3186         list_for_each_entry(ltd, head, ltd_orphan_list) {
3187                 if (ltd->ltd_tgt == tgt) {
3188                         list_del_init(&ltd->ltd_orphan_list);
3189                         spin_unlock(&lfsck_instance_lock);
3190                         lfsck_tgt_put(ltd);
3191
3192                         return;
3193                 }
3194         }
3195
3196         ltd = NULL;
3197         lfsck = __lfsck_instance_find(key, true, false);
3198         spin_unlock(&lfsck_instance_lock);
3199         if (unlikely(lfsck == NULL))
3200                 return;
3201
3202         if (for_ost)
3203                 ltds = &lfsck->li_ost_descs;
3204         else
3205                 ltds = &lfsck->li_mdt_descs;
3206
3207         down_write(&ltds->ltd_rw_sem);
3208         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3209
3210         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3211                 goto unlock;
3212
3213         ltd = LTD_TGT(ltds, index);
3214         if (unlikely(ltd == NULL))
3215                 goto unlock;
3216
3217         LASSERT(ltds->ltd_tgtnr > 0);
3218
3219         ltds->ltd_tgtnr--;
3220         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3221         LTD_TGT(ltds, index) = NULL;
3222
3223 unlock:
3224         if (ltd == NULL) {
3225                 if (for_ost)
3226                         head = &lfsck->li_ost_descs.ltd_orphan;
3227                 else
3228                         head = &lfsck->li_mdt_descs.ltd_orphan;
3229
3230                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3231                         if (ltd->ltd_tgt == tgt) {
3232                                 list_del_init(&ltd->ltd_orphan_list);
3233                                 break;
3234                         }
3235                 }
3236         }
3237
3238         up_write(&ltds->ltd_rw_sem);
3239         if (ltd != NULL) {
3240                 spin_lock(&ltds->ltd_lock);
3241                 ltd->ltd_dead = 1;
3242                 spin_unlock(&ltds->ltd_lock);
3243                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3244                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3245                 lfsck_tgt_put(ltd);
3246         }
3247
3248         lfsck_instance_put(env, lfsck);
3249 }
3250 EXPORT_SYMBOL(lfsck_del_target);
3251
3252 static int __init lfsck_init(void)
3253 {
3254         int rc;
3255
3256         INIT_LIST_HEAD(&lfsck_instance_list);
3257         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3258         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3259         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3260         rc = lu_context_key_register(&lfsck_thread_key);
3261         if (rc == 0) {
3262                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3263                 tgt_register_lfsck_query(lfsck_query);
3264         }
3265
3266         return rc;
3267 }
3268
3269 static void __exit lfsck_exit(void)
3270 {
3271         struct lfsck_tgt_desc *ltd;
3272         struct lfsck_tgt_desc *next;
3273
3274         LASSERT(list_empty(&lfsck_instance_list));
3275
3276         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3277                                  ltd_orphan_list) {
3278                 list_del_init(&ltd->ltd_orphan_list);
3279                 lfsck_tgt_put(ltd);
3280         }
3281
3282         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3283                                  ltd_orphan_list) {
3284                 list_del_init(&ltd->ltd_orphan_list);
3285                 lfsck_tgt_put(ltd);
3286         }
3287
3288         lu_context_key_degister(&lfsck_thread_key);
3289 }
3290
3291 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3292 MODULE_DESCRIPTION("LFSCK");
3293 MODULE_LICENSE("GPL");
3294
3295 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);