Whamcloud - gitweb
LU-5506 lfsck: skip orphan OST-object handling for failed OSTs
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_big_buf);
57         OBD_FREE_PTR(info);
58 }
59
60 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
61 LU_KEY_INIT_GENERIC(lfsck);
62
63 static struct list_head lfsck_instance_list;
64 static struct list_head lfsck_ost_orphan_list;
65 static struct list_head lfsck_mdt_orphan_list;
66 static DEFINE_SPINLOCK(lfsck_instance_lock);
67
68 static const char *lfsck_status_names[] = {
69         [LS_INIT]               = "init",
70         [LS_SCANNING_PHASE1]    = "scanning-phase1",
71         [LS_SCANNING_PHASE2]    = "scanning-phase2",
72         [LS_COMPLETED]          = "completed",
73         [LS_FAILED]             = "failed",
74         [LS_STOPPED]            = "stopped",
75         [LS_PAUSED]             = "paused",
76         [LS_CRASHED]            = "crashed",
77         [LS_PARTIAL]            = "partial",
78         [LS_CO_FAILED]          = "co-failed",
79         [LS_CO_STOPPED]         = "co-stopped",
80         [LS_CO_PAUSED]          = "co-paused"
81 };
82
83 const char *lfsck_flags_names[] = {
84         "scanned-once",
85         "inconsistent",
86         "upgrade",
87         "incomplete",
88         "crashed_lastid",
89         NULL
90 };
91
92 const char *lfsck_param_names[] = {
93         NULL,
94         "failout",
95         "dryrun",
96         "all_targets",
97         "broadcast",
98         "orphan",
99         "create_ostobj",
100         NULL
101 };
102
103 enum lfsck_verify_lpf_types {
104         LVLT_BY_BOOKMARK        = 0,
105         LVLT_BY_NAMEENTRY       = 1,
106 };
107
108 const char *lfsck_status2names(enum lfsck_status status)
109 {
110         if (unlikely(status < 0 || status >= LS_MAX))
111                 return "unknown";
112
113         return lfsck_status_names[status];
114 }
115
116 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
117 {
118         spin_lock_init(&ltds->ltd_lock);
119         init_rwsem(&ltds->ltd_rw_sem);
120         INIT_LIST_HEAD(&ltds->ltd_orphan);
121         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
122         if (ltds->ltd_tgts_bitmap == NULL)
123                 return -ENOMEM;
124
125         return 0;
126 }
127
128 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
129 {
130         struct lfsck_tgt_desc   *ltd;
131         struct lfsck_tgt_desc   *next;
132         int                      idx;
133
134         down_write(&ltds->ltd_rw_sem);
135
136         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
137                                  ltd_orphan_list) {
138                 list_del_init(&ltd->ltd_orphan_list);
139                 lfsck_tgt_put(ltd);
140         }
141
142         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
143                 up_write(&ltds->ltd_rw_sem);
144
145                 return;
146         }
147
148         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
149                 ltd = LTD_TGT(ltds, idx);
150                 if (likely(ltd != NULL)) {
151                         LASSERT(list_empty(&ltd->ltd_layout_list));
152                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
153                         LASSERT(list_empty(&ltd->ltd_namespace_list));
154                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
155
156                         ltds->ltd_tgtnr--;
157                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
158                         LTD_TGT(ltds, idx) = NULL;
159                         lfsck_tgt_put(ltd);
160                 }
161         }
162
163         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
164                  ltds->ltd_tgtnr);
165
166         for (idx = 0; idx < TGT_PTRS; idx++) {
167                 if (ltds->ltd_tgts_idx[idx] != NULL) {
168                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
169                         ltds->ltd_tgts_idx[idx] = NULL;
170                 }
171         }
172
173         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
174         ltds->ltd_tgts_bitmap = NULL;
175         up_write(&ltds->ltd_rw_sem);
176 }
177
178 static int __lfsck_add_target(const struct lu_env *env,
179                               struct lfsck_instance *lfsck,
180                               struct lfsck_tgt_desc *ltd,
181                               bool for_ost, bool locked)
182 {
183         struct lfsck_tgt_descs *ltds;
184         __u32                   index = ltd->ltd_index;
185         int                     rc    = 0;
186         ENTRY;
187
188         if (for_ost)
189                 ltds = &lfsck->li_ost_descs;
190         else
191                 ltds = &lfsck->li_mdt_descs;
192
193         if (!locked)
194                 down_write(&ltds->ltd_rw_sem);
195
196         LASSERT(ltds->ltd_tgts_bitmap != NULL);
197
198         if (index >= ltds->ltd_tgts_bitmap->size) {
199                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
200                                     (__u32)BITS_PER_LONG);
201                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
202                 cfs_bitmap_t *new_bitmap;
203
204                 while (newsize < index + 1)
205                         newsize <<= 1;
206
207                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
208                 if (new_bitmap == NULL)
209                         GOTO(unlock, rc = -ENOMEM);
210
211                 if (ltds->ltd_tgtnr > 0)
212                         cfs_bitmap_copy(new_bitmap, old_bitmap);
213                 ltds->ltd_tgts_bitmap = new_bitmap;
214                 CFS_FREE_BITMAP(old_bitmap);
215         }
216
217         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
218                 CERROR("%s: the device %s (%u) is registered already\n",
219                        lfsck_lfsck2name(lfsck),
220                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
221                 GOTO(unlock, rc = -EEXIST);
222         }
223
224         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
225                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
226                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
227                         GOTO(unlock, rc = -ENOMEM);
228         }
229
230         LTD_TGT(ltds, index) = ltd;
231         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
232         ltds->ltd_tgtnr++;
233
234         GOTO(unlock, rc = 0);
235
236 unlock:
237         if (!locked)
238                 up_write(&ltds->ltd_rw_sem);
239
240         return rc;
241 }
242
243 static int lfsck_add_target_from_orphan(const struct lu_env *env,
244                                         struct lfsck_instance *lfsck)
245 {
246         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
247         struct lfsck_tgt_desc   *ltd;
248         struct lfsck_tgt_desc   *next;
249         struct list_head        *head    = &lfsck_ost_orphan_list;
250         int                      rc;
251         bool                     for_ost = true;
252
253 again:
254         spin_lock(&lfsck_instance_lock);
255         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
256                 if (ltd->ltd_key == lfsck->li_bottom)
257                         list_move_tail(&ltd->ltd_orphan_list,
258                                        &ltds->ltd_orphan);
259         }
260         spin_unlock(&lfsck_instance_lock);
261
262         down_write(&ltds->ltd_rw_sem);
263         while (!list_empty(&ltds->ltd_orphan)) {
264                 ltd = list_entry(ltds->ltd_orphan.next,
265                                  struct lfsck_tgt_desc,
266                                  ltd_orphan_list);
267                 list_del_init(&ltd->ltd_orphan_list);
268                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
269                 /* Do not hold the semaphore for too long time. */
270                 up_write(&ltds->ltd_rw_sem);
271                 if (rc != 0)
272                         return rc;
273
274                 down_write(&ltds->ltd_rw_sem);
275         }
276         up_write(&ltds->ltd_rw_sem);
277
278         if (for_ost) {
279                 ltds = &lfsck->li_mdt_descs;
280                 head = &lfsck_mdt_orphan_list;
281                 for_ost = false;
282                 goto again;
283         }
284
285         return 0;
286 }
287
288 static inline struct lfsck_component *
289 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
290                        struct list_head *list)
291 {
292         struct lfsck_component *com;
293
294         list_for_each_entry(com, list, lc_link) {
295                 if (com->lc_type == type)
296                         return com;
297         }
298         return NULL;
299 }
300
301 struct lfsck_component *
302 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
303 {
304         struct lfsck_component *com;
305
306         spin_lock(&lfsck->li_lock);
307         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
308         if (com != NULL)
309                 goto unlock;
310
311         com = __lfsck_component_find(lfsck, type,
312                                      &lfsck->li_list_double_scan);
313         if (com != NULL)
314                 goto unlock;
315
316         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
317
318 unlock:
319         if (com != NULL)
320                 lfsck_component_get(com);
321         spin_unlock(&lfsck->li_lock);
322         return com;
323 }
324
325 void lfsck_component_cleanup(const struct lu_env *env,
326                              struct lfsck_component *com)
327 {
328         if (!list_empty(&com->lc_link))
329                 list_del_init(&com->lc_link);
330         if (!list_empty(&com->lc_link_dir))
331                 list_del_init(&com->lc_link_dir);
332
333         lfsck_component_put(env, com);
334 }
335
336 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
337                     struct lu_fid *fid, bool locked)
338 {
339         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
340         int                      rc = 0;
341         ENTRY;
342
343         if (!locked)
344                 mutex_lock(&lfsck->li_mutex);
345
346         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
347         if (rc >= 0) {
348                 bk->lb_last_fid = *fid;
349                 /* We do not care about whether the subsequent sub-operations
350                  * failed or not. The worst case is that one FID is lost that
351                  * is not a big issue for the LFSCK since it is relative rare
352                  * for LFSCK create. */
353                 rc = lfsck_bookmark_store(env, lfsck);
354         }
355
356         if (!locked)
357                 mutex_unlock(&lfsck->li_mutex);
358
359         RETURN(rc);
360 }
361
362 /**
363  * Request the specified ibits lock for the given object.
364  *
365  * Before the LFSCK modifying on the namespace visible object,
366  * it needs to acquire related ibits ldlm lock.
367  *
368  * \param[in] env       pointer to the thread context
369  * \param[in] lfsck     pointer to the lfsck instance
370  * \param[in] obj       pointer to the dt_object to be locked
371  * \param[out] lh       pointer to the lock handle
372  * \param[in] ibits     the bits for the ldlm lock to be acquired
373  * \param[in] mode      the mode for the ldlm lock to be acquired
374  *
375  * \retval              0 for success
376  * \retval              negative error number on failure
377  */
378 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
379                      struct dt_object *obj, struct lustre_handle *lh,
380                      __u64 bits, ldlm_mode_t mode)
381 {
382         struct lfsck_thread_info        *info   = lfsck_env_info(env);
383         ldlm_policy_data_t              *policy = &info->lti_policy;
384         struct ldlm_res_id              *resid  = &info->lti_resid;
385         __u64                            flags  = LDLM_FL_ATOMIC_CB;
386         int                              rc;
387
388         LASSERT(lfsck->li_namespace != NULL);
389
390         memset(policy, 0, sizeof(*policy));
391         policy->l_inodebits.bits = bits;
392         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
393         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
394                                     policy, mode, &flags, ldlm_blocking_ast,
395                                     ldlm_completion_ast, NULL, NULL, 0,
396                                     LVB_T_NONE, NULL, lh);
397         if (rc == ELDLM_OK) {
398                 rc = 0;
399         } else {
400                 memset(lh, 0, sizeof(*lh));
401                 rc = -EIO;
402         }
403
404         return rc;
405 }
406
407 /**
408  * Release the the specified ibits lock.
409  *
410  * If the lock has been acquired before, release it
411  * and cleanup the handle. Otherwise, do nothing.
412  *
413  * \param[in] lh        pointer to the lock handle
414  * \param[in] mode      the mode for the ldlm lock to be released
415  */
416 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
417 {
418         if (lustre_handle_is_used(lh)) {
419                 ldlm_lock_decref(lh, mode);
420                 memset(lh, 0, sizeof(*lh));
421         }
422 }
423
424 static const char dot[] = ".";
425 static const char dotdot[] = "..";
426 static const char dotlustre[] = ".lustre";
427 static const char lostfound[] = "lost+found";
428
429 static int lfsck_create_lpf_local(const struct lu_env *env,
430                                   struct lfsck_instance *lfsck,
431                                   struct dt_object *parent,
432                                   struct dt_object *child,
433                                   struct lu_attr *la,
434                                   struct dt_object_format *dof,
435                                   const char *name)
436 {
437         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
438         struct dt_device        *dev    = lfsck->li_bottom;
439         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
440         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
441         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
442         struct thandle          *th     = NULL;
443         struct linkea_data       ldata  = { 0 };
444         struct lu_buf            linkea_buf;
445         const struct lu_name    *cname;
446         loff_t                   pos    = 0;
447         int                      len    = sizeof(struct lfsck_bookmark);
448         int                      rc;
449         ENTRY;
450
451         rc = linkea_data_new(&ldata,
452                              &lfsck_env_info(env)->lti_linkea_buf);
453         if (rc != 0)
454                 RETURN(rc);
455
456         cname = lfsck_name_get_const(env, name, strlen(name));
457         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
458         if (rc != 0)
459                 RETURN(rc);
460
461         th = dt_trans_create(env, dev);
462         if (IS_ERR(th))
463                 RETURN(PTR_ERR(th));
464
465         /* 1a. create child */
466         rc = dt_declare_create(env, child, la, NULL, dof, th);
467         if (rc != 0)
468                 GOTO(stop, rc);
469
470         /* 2a. increase child nlink */
471         rc = dt_declare_ref_add(env, child, th);
472         if (rc != 0)
473                 GOTO(stop, rc);
474
475         /* 3a. insert linkEA for child */
476         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
477                        ldata.ld_leh->leh_len);
478         rc = dt_declare_xattr_set(env, child, &linkea_buf,
479                                   XATTR_NAME_LINK, 0, th);
480         if (rc != 0)
481                 GOTO(stop, rc);
482
483         /* 4a. insert name into parent dir */
484         rec->rec_type = S_IFDIR;
485         rec->rec_fid = cfid;
486         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
487                                (const struct dt_key *)name, th);
488         if (rc != 0)
489                 GOTO(stop, rc);
490
491         /* 5a. increase parent nlink */
492         rc = dt_declare_ref_add(env, parent, th);
493         if (rc != 0)
494                 GOTO(stop, rc);
495
496         /* 6a. update bookmark */
497         rc = dt_declare_record_write(env, bk_obj,
498                                      lfsck_buf_get(env, bk, len), 0, th);
499         if (rc != 0)
500                 GOTO(stop, rc);
501
502         rc = dt_trans_start_local(env, dev, th);
503         if (rc != 0)
504                 GOTO(stop, rc);
505
506         dt_write_lock(env, child, 0);
507         /* 1b.1. create child */
508         rc = dt_create(env, child, la, NULL, dof, th);
509         if (rc != 0)
510                 GOTO(unlock, rc);
511
512         if (unlikely(!dt_try_as_dir(env, child)))
513                 GOTO(unlock, rc = -ENOTDIR);
514
515         /* 1b.2. insert dot into child dir */
516         rec->rec_fid = cfid;
517         rc = dt_insert(env, child, (const struct dt_rec *)rec,
518                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
519         if (rc != 0)
520                 GOTO(unlock, rc);
521
522         /* 1b.3. insert dotdot into child dir */
523         rec->rec_fid = &LU_LPF_FID;
524         rc = dt_insert(env, child, (const struct dt_rec *)rec,
525                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
526         if (rc != 0)
527                 GOTO(unlock, rc);
528
529         /* 2b. increase child nlink */
530         rc = dt_ref_add(env, child, th);
531         if (rc != 0)
532                 GOTO(unlock, rc);
533
534         /* 3b. insert linkEA for child. */
535         rc = dt_xattr_set(env, child, &linkea_buf,
536                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
537         dt_write_unlock(env, child);
538         if (rc != 0)
539                 GOTO(stop, rc);
540
541         /* 4b. insert name into parent dir */
542         rec->rec_fid = cfid;
543         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
544                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
545         if (rc != 0)
546                 GOTO(stop, rc);
547
548         dt_write_lock(env, parent, 0);
549         /* 5b. increase parent nlink */
550         rc = dt_ref_add(env, parent, th);
551         dt_write_unlock(env, parent);
552         if (rc != 0)
553                 GOTO(stop, rc);
554
555         bk->lb_lpf_fid = *cfid;
556         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
557
558         /* 6b. update bookmark */
559         rc = dt_record_write(env, bk_obj,
560                              lfsck_buf_get(env, bk, len), &pos, th);
561
562         GOTO(stop, rc);
563
564 unlock:
565         dt_write_unlock(env, child);
566
567 stop:
568         dt_trans_stop(env, dev, th);
569
570         return rc;
571 }
572
573 static int lfsck_create_lpf_remote(const struct lu_env *env,
574                                    struct lfsck_instance *lfsck,
575                                    struct dt_object *parent,
576                                    struct dt_object *child,
577                                    struct lu_attr *la,
578                                    struct dt_object_format *dof,
579                                    const char *name)
580 {
581         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
582         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
583         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
584         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
585         struct thandle          *th     = NULL;
586         struct linkea_data       ldata  = { 0 };
587         struct lu_buf            linkea_buf;
588         const struct lu_name    *cname;
589         struct dt_device        *dev;
590         loff_t                   pos    = 0;
591         int                      len    = sizeof(struct lfsck_bookmark);
592         int                      rc;
593         ENTRY;
594
595         rc = linkea_data_new(&ldata,
596                              &lfsck_env_info(env)->lti_linkea_buf);
597         if (rc != 0)
598                 RETURN(rc);
599
600         cname = lfsck_name_get_const(env, name, strlen(name));
601         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
602         if (rc != 0)
603                 RETURN(rc);
604
605         /* Create .lustre/lost+found/MDTxxxx. */
606
607         /* XXX: Currently, cross-MDT create operation needs to create the child
608          *      object firstly, then insert name into the parent directory. For
609          *      this case, the child object resides on current MDT (local), but
610          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
611          *      easy to contain all the sub-modifications orderly within single
612          *      transaction.
613          *
614          *      To avoid more inconsistency, we split the create operation into
615          *      two transactions:
616          *
617          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
618          *         locally.
619          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
620          *         remotely.
621          *
622          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
623          *      repair such inconsistency when LFSCK run next time. */
624
625         /* Transaction I: locally */
626
627         dev = lfsck->li_bottom;
628         th = dt_trans_create(env, dev);
629         if (IS_ERR(th))
630                 RETURN(PTR_ERR(th));
631
632         /* 1a. create child */
633         rc = dt_declare_create(env, child, la, NULL, dof, th);
634         if (rc != 0)
635                 GOTO(stop, rc);
636
637         /* 2a. increase child nlink */
638         rc = dt_declare_ref_add(env, child, th);
639         if (rc != 0)
640                 GOTO(stop, rc);
641
642         /* 3a. insert linkEA for child */
643         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
644                        ldata.ld_leh->leh_len);
645         rc = dt_declare_xattr_set(env, child, &linkea_buf,
646                                   XATTR_NAME_LINK, 0, th);
647         if (rc != 0)
648                 GOTO(stop, rc);
649
650         /* 4a. update bookmark */
651         rc = dt_declare_record_write(env, bk_obj,
652                                      lfsck_buf_get(env, bk, len), 0, th);
653         if (rc != 0)
654                 GOTO(stop, rc);
655
656         rc = dt_trans_start_local(env, dev, th);
657         if (rc != 0)
658                 GOTO(stop, rc);
659
660         dt_write_lock(env, child, 0);
661         /* 1b.1. create child */
662         rc = dt_create(env, child, la, NULL, dof, th);
663         if (rc != 0)
664                 GOTO(unlock, rc);
665
666         if (unlikely(!dt_try_as_dir(env, child)))
667                 GOTO(unlock, rc = -ENOTDIR);
668
669         /* 1b.2. insert dot into child dir */
670         rec->rec_type = S_IFDIR;
671         rec->rec_fid = cfid;
672         rc = dt_insert(env, child, (const struct dt_rec *)rec,
673                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
674         if (rc != 0)
675                 GOTO(unlock, rc);
676
677         /* 1b.3. insert dotdot into child dir */
678         rec->rec_fid = &LU_LPF_FID;
679         rc = dt_insert(env, child, (const struct dt_rec *)rec,
680                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
681         if (rc != 0)
682                 GOTO(unlock, rc);
683
684         /* 2b. increase child nlink */
685         rc = dt_ref_add(env, child, th);
686         if (rc != 0)
687                 GOTO(unlock, rc);
688
689         /* 3b. insert linkEA for child */
690         rc = dt_xattr_set(env, child, &linkea_buf,
691                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
692         if (rc != 0)
693                 GOTO(unlock, rc);
694
695         bk->lb_lpf_fid = *cfid;
696         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
697
698         /* 4b. update bookmark */
699         rc = dt_record_write(env, bk_obj,
700                              lfsck_buf_get(env, bk, len), &pos, th);
701
702         dt_write_unlock(env, child);
703         dt_trans_stop(env, dev, th);
704         if (rc != 0)
705                 RETURN(rc);
706
707         /* Transaction II: remotely */
708
709         dev = lfsck->li_next;
710         th = dt_trans_create(env, dev);
711         if (IS_ERR(th))
712                 RETURN(PTR_ERR(th));
713
714         /* 5a. insert name into parent dir */
715         rec->rec_fid = cfid;
716         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
717                                (const struct dt_key *)name, th);
718         if (rc != 0)
719                 GOTO(stop, rc);
720
721         /* 6a. increase parent nlink */
722         rc = dt_declare_ref_add(env, parent, th);
723         if (rc != 0)
724                 GOTO(stop, rc);
725
726         rc = dt_trans_start(env, dev, th);
727         if (rc != 0)
728                 GOTO(stop, rc);
729
730         /* 5b. insert name into parent dir */
731         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
732                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
733         if (rc != 0)
734                 GOTO(stop, rc);
735
736         dt_write_lock(env, parent, 0);
737         /* 6b. increase parent nlink */
738         rc = dt_ref_add(env, parent, th);
739         dt_write_unlock(env, parent);
740
741         GOTO(stop, rc);
742
743 unlock:
744         dt_write_unlock(env, child);
745 stop:
746         dt_trans_stop(env, dev, th);
747
748         if (rc != 0 && dev == lfsck->li_next)
749                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
750                        "for orphans, but failed to insert the name %s "
751                        "to the .lustre/lost+found/. Such inconsistency "
752                        "will be repaired when LFSCK run next time: rc = %d\n",
753                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
754
755         return rc;
756 }
757
758 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
759  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
760  * only when it is required, such as orphan OST-objects repairing. */
761 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
762 {
763         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
764         struct lfsck_thread_info *info  = lfsck_env_info(env);
765         struct lu_fid            *cfid  = &info->lti_fid2;
766         struct lu_attr           *la    = &info->lti_la;
767         struct dt_object_format  *dof   = &info->lti_dof;
768         struct dt_object         *parent = NULL;
769         struct dt_object         *child = NULL;
770         struct lustre_handle      lh    = { 0 };
771         char                      name[8];
772         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
773         int                       rc    = 0;
774         ENTRY;
775
776         LASSERT(lfsck->li_master);
777
778         sprintf(name, "MDT%04x", node);
779         if (node == 0) {
780                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
781                                                   &LU_LPF_FID);
782         } else {
783                 struct lfsck_tgt_desc *ltd;
784
785                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
786                 if (unlikely(ltd == NULL))
787                         RETURN(-ENXIO);
788
789                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
790                                                   &LU_LPF_FID);
791                 lfsck_tgt_put(ltd);
792         }
793         if (IS_ERR(parent))
794                 RETURN(PTR_ERR(parent));
795
796         if (lfsck->li_lpf_obj != NULL)
797                 GOTO(out, rc = 0);
798
799         if (unlikely(!dt_try_as_dir(env, parent)))
800                 GOTO(out, rc = -ENOTDIR);
801
802         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
803                               MDS_INODELOCK_UPDATE, LCK_EX);
804         if (rc != 0)
805                 GOTO(out, rc);
806
807         mutex_lock(&lfsck->li_mutex);
808         if (lfsck->li_lpf_obj != NULL)
809                 GOTO(unlock, rc = 0);
810
811         if (fid_is_zero(&bk->lb_lpf_fid)) {
812                 /* There is corner case that: in former LFSCK scanning we have
813                  * created the .lustre/lost+found/MDTxxxx but failed to update
814                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
815                  * it from MDT0 firstly. */
816                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
817                                (const struct dt_key *)name, BYPASS_CAPA);
818                 if (rc != 0 && rc != -ENOENT)
819                         GOTO(unlock, rc);
820
821                 if (rc == 0) {
822                         bk->lb_lpf_fid = *cfid;
823                         rc = lfsck_bookmark_store(env, lfsck);
824                 } else {
825                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
826                 }
827                 if (rc != 0)
828                         GOTO(unlock, rc);
829         } else {
830                 *cfid = bk->lb_lpf_fid;
831         }
832
833         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
834         if (IS_ERR(child))
835                 GOTO(unlock, rc = PTR_ERR(child));
836
837         if (dt_object_exists(child) != 0) {
838                 if (unlikely(!dt_try_as_dir(env, child)))
839                         rc = -ENOTDIR;
840                 else
841                         lfsck->li_lpf_obj = child;
842
843                 GOTO(unlock, rc);
844         }
845
846         memset(la, 0, sizeof(*la));
847         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
848         la->la_mode = S_IFDIR | S_IRWXU;
849         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
850                        LA_UID | LA_GID;
851         memset(dof, 0, sizeof(*dof));
852         dof->dof_type = dt_mode_to_dft(S_IFDIR);
853
854         if (node == 0)
855                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
856                                             dof, name);
857         else
858                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
859                                              dof, name);
860         if (rc == 0)
861                 lfsck->li_lpf_obj = child;
862
863         GOTO(unlock, rc);
864
865 unlock:
866         mutex_unlock(&lfsck->li_mutex);
867         lfsck_ibits_unlock(&lh, LCK_EX);
868         if (rc != 0 && child != NULL && !IS_ERR(child))
869                 lu_object_put(env, &child->do_lu);
870 out:
871         if (parent != NULL && !IS_ERR(parent))
872                 lu_object_put(env, &parent->do_lu);
873
874         return rc;
875 }
876
877 /**
878  * Scan .lustre/lost+found for bad name entries and remove them.
879  *
880  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
881  * index in the system. Any other formatted name is invalid and should be
882  * removed.
883  *
884  * \param[in] env       pointer to the thread context
885  * \param[in] lfsck     pointer to the lfsck instance
886  * \param[in] parent    pointer to the lost+found object
887  *
888  * \retval              0 for success
889  * \retval              negative error number on failure
890  */
891 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
892                                       struct lfsck_instance *lfsck,
893                                       struct dt_object *parent)
894 {
895         struct lu_dirent        *ent    =
896                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
897         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
898         struct dt_it            *it;
899         int                      rc;
900         ENTRY;
901
902         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
903         if (IS_ERR(it))
904                 RETURN(PTR_ERR(it));
905
906         rc = iops->load(env, it, 0);
907         if (rc == 0)
908                 rc = iops->next(env, it);
909         else if (rc > 0)
910                 rc = 0;
911
912         while (rc == 0) {
913                 int off = 3;
914
915                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
916                 if (rc != 0)
917                         break;
918
919                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
920                 if (ent->lde_name[0] == '.') {
921                         if (ent->lde_namelen == 1)
922                                 goto next;
923
924                         if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
925                                 goto next;
926                 }
927
928                 /* name length must be strlen("MDTxxxx") */
929                 if (ent->lde_namelen != 7)
930                         goto remove;
931
932                 if (memcmp(ent->lde_name, "MDT", off) != 0)
933                         goto remove;
934
935                 while (off < 7 && isxdigit(ent->lde_name[off]))
936                         off++;
937
938                 if (off != 7) {
939
940 remove:
941                         rc = lfsck_remove_name_entry(env, lfsck, parent,
942                                                      ent->lde_name, S_IFDIR);
943                         if (rc != 0)
944                                 break;
945                 }
946
947 next:
948                 rc = iops->next(env, it);
949         }
950
951         iops->put(env, it);
952         iops->fini(env, it);
953
954         RETURN(rc > 0 ? 0 : rc);
955 }
956
957 static int lfsck_update_lpf_entry(const struct lu_env *env,
958                                   struct lfsck_instance *lfsck,
959                                   struct dt_object *parent,
960                                   struct dt_object *child,
961                                   const char *name,
962                                   enum lfsck_verify_lpf_types type)
963 {
964         int rc;
965
966         if (type == LVLT_BY_BOOKMARK) {
967                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
968                                              lfsck_dto2fid(child), S_IFDIR);
969         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
970                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
971                 rc = lfsck_bookmark_store(env, lfsck);
972
973                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
974                        " in the bookmark file: rc = %d\n",
975                        lfsck_lfsck2name(lfsck),
976                        PFID(lfsck_dto2fid(child)), rc);
977         }
978
979         return rc;
980 }
981
982 /**
983  * Check whether the @child back references the @parent.
984  *
985  * Two cases:
986  * 1) The child's FID is stored in the bookmark file. If the child back
987  *    references the parent (LU_LPF_FID object) via its ".." entry, then
988  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
989  *    the child back references another parent2, then:
990  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
991  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
992  *      references the child. So keep them there. As the LFSCK processing,
993  *      the parent3 may be found, then when the LFSCK run next time, the
994  *      inconsistency can be repaired.
995  *
996  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
997  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
998  *    via its ".." entry, then update the bookmark file, otherwise, if the child
999  *    back references another parent2, then:
1000  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1001  *      from .lustre/lost+found/;
1002  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1003  *      sub-directory name entry and update the child;
1004  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1005  *      or not, then keep them there.
1006  *
1007  * \param[in] env       pointer to the thread context
1008  * \param[in] lfsck     pointer to the lfsck instance
1009  * \param[in] parent    pointer to the lost+found object
1010  * \param[in] child     pointer to the lost+found sub-directory object
1011  * \param[in] name      the name for lost+found sub-directory object
1012  * \param[out] fid      pointer to the buffer to hold the FID of the object
1013  *                      (called it as parent2) that is referenced via the
1014  *                      child's dotdot entry; it also can be the FID that
1015  *                      is referenced by the name entry under the parent2.
1016  * \param[in] type      to indicate where the child's FID is stored in
1017  *
1018  * \retval              positive number for uncertain inconsistency
1019  * \retval              0 for success
1020  * \retval              negative error number on failure
1021  */
1022 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1023                                   struct lfsck_instance *lfsck,
1024                                   struct dt_object *parent,
1025                                   struct dt_object *child, const char *name,
1026                                   struct lu_fid *fid,
1027                                   enum lfsck_verify_lpf_types type)
1028 {
1029         struct lfsck_thread_info *info    = lfsck_env_info(env);
1030         char                     *name2   = info->lti_key;
1031         struct lu_fid            *fid2    = &info->lti_fid3;
1032         struct dt_object         *parent2 = NULL;
1033         struct lustre_handle      lh      = { 0 };
1034         int                       rc;
1035         ENTRY;
1036
1037         fid_zero(fid);
1038         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1039                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1040         if (rc != 0)
1041                 GOTO(linkea, rc);
1042
1043         if (!fid_is_sane(fid))
1044                 GOTO(linkea, rc = -EINVAL);
1045
1046         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1047                 const struct lu_name *cname;
1048
1049                 if (lfsck->li_lpf_obj == NULL) {
1050                         lu_object_get(&child->do_lu);
1051                         lfsck->li_lpf_obj = child;
1052                 }
1053
1054                 cname = lfsck_name_get_const(env, name, strlen(name));
1055                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1056                                          &LU_LPF_FID);
1057                 if (rc == 0)
1058                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1059                                                     name, type);
1060
1061                 GOTO(out_done, rc);
1062         }
1063
1064         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1065         if (IS_ERR(parent2))
1066                 GOTO(linkea, parent2);
1067
1068         if (!dt_object_exists(parent2)) {
1069                 lu_object_put(env, &parent2->do_lu);
1070
1071                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1072         }
1073
1074         if (!dt_try_as_dir(env, parent2)) {
1075                 lu_object_put(env, &parent2->do_lu);
1076
1077                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1078         }
1079
1080 linkea:
1081         /* To prevent rename/unlink race */
1082         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1083                               MDS_INODELOCK_UPDATE, LCK_PR);
1084         if (rc != 0)
1085                 GOTO(out_put, rc);
1086
1087         dt_read_lock(env, child, 0);
1088         rc = lfsck_links_get_first(env, child, name2, fid2);
1089         if (rc != 0) {
1090                 dt_read_unlock(env, child);
1091                 lfsck_ibits_unlock(&lh, LCK_PR);
1092
1093                 GOTO(out_put, rc = 1);
1094         }
1095
1096         /* It is almost impossible that the bookmark file (or the name entry)
1097          * and the linkEA hit the same data corruption. Trust the linkEA. */
1098         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1099                 dt_read_unlock(env, child);
1100                 lfsck_ibits_unlock(&lh, LCK_PR);
1101
1102                 *fid = *fid2;
1103                 if (lfsck->li_lpf_obj == NULL) {
1104                         lu_object_get(&child->do_lu);
1105                         lfsck->li_lpf_obj = child;
1106                 }
1107
1108                 /* Update the child's dotdot entry */
1109                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1110                                              &LU_LPF_FID, S_IFDIR);
1111                 if (rc == 0)
1112                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1113                                                     name, type);
1114
1115                 GOTO(out_put, rc);
1116         }
1117
1118         if (parent2 == NULL || IS_ERR(parent2)) {
1119                 dt_read_unlock(env, child);
1120                 lfsck_ibits_unlock(&lh, LCK_PR);
1121
1122                 GOTO(out_done, rc = 1);
1123         }
1124
1125         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1126                        (const struct dt_key *)name2, BYPASS_CAPA);
1127         dt_read_unlock(env, child);
1128         lfsck_ibits_unlock(&lh, LCK_PR);
1129         if (rc != 0 && rc != -ENOENT)
1130                 GOTO(out_put, rc);
1131
1132         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1133                 if (type == LVLT_BY_BOOKMARK)
1134                         GOTO(out_put, rc = 1);
1135
1136                 /* Trust the name entry, update the child's dotdot entry. */
1137                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1138                                              &LU_LPF_FID, S_IFDIR);
1139
1140                 GOTO(out_put, rc);
1141         }
1142
1143         if (type == LVLT_BY_BOOKMARK) {
1144                 /* Invalid FID record in the bookmark file, reset it. */
1145                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1146                 rc = lfsck_bookmark_store(env, lfsck);
1147
1148                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1149                        " in the bookmark file: rc = %d\n",
1150                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1151         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1152                 /* The name entry is wrong, remove it. */
1153                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1154         }
1155
1156         GOTO(out_put, rc);
1157
1158 out_put:
1159         if (parent2 != NULL && !IS_ERR(parent2))
1160                 lu_object_put(env, &parent2->do_lu);
1161
1162 out_done:
1163         return rc;
1164 }
1165
1166 /**
1167  * Verify the /ROOT/.lustre/lost+found/ directory.
1168  *
1169  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1170  * the LFSCK does not exactly know how to handle, such as orphans. So before
1171  * the LFSCK scanning the system, the consistency of such directory needs to
1172  * be verified firstly to allow the users to use it during the LFSCK.
1173  *
1174  * \param[in] env       pointer to the thread context
1175  * \param[in] lfsck     pointer to the lfsck instance
1176  *
1177  * \retval              positive number for uncertain inconsistency
1178  * \retval              0 for success
1179  * \retval              negative error number on failure
1180  */
1181 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1182 {
1183         struct lfsck_thread_info *info   = lfsck_env_info(env);
1184         struct lu_fid            *pfid   = &info->lti_fid;
1185         struct lu_fid            *cfid   = &info->lti_fid2;
1186         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1187         struct dt_object         *parent = NULL;
1188         /* child1's FID is in the bookmark file. */
1189         struct dt_object         *child1 = NULL;
1190         /* child2's FID is in the name entry MDTxxxx. */
1191         struct dt_object         *child2 = NULL;
1192         struct dt_device         *dev    = lfsck->li_bottom;
1193         const struct lu_name     *cname;
1194         char                      name[8];
1195         int                       node   = lfsck_dev_idx(dev);
1196         int                       rc     = 0;
1197         ENTRY;
1198
1199         LASSERT(lfsck->li_master);
1200
1201         if (node == 0) {
1202                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1203         } else {
1204                 struct lfsck_tgt_desc *ltd;
1205
1206                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1207                 if (unlikely(ltd == NULL))
1208                         RETURN(-ENXIO);
1209
1210                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1211                                                   &LU_LPF_FID);
1212                 lfsck_tgt_put(ltd);
1213         }
1214
1215         if (IS_ERR(parent))
1216                 RETURN(PTR_ERR(parent));
1217
1218         LASSERT(dt_object_exists(parent));
1219
1220         if (unlikely(!dt_try_as_dir(env, parent)))
1221                 GOTO(put, rc = -ENOTDIR);
1222
1223         if (node == 0) {
1224                 rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
1225                 if (rc != 0)
1226                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1227                                "for bad sub-directories: rc = %d\n",
1228                                lfsck_lfsck2name(lfsck), rc);
1229         }
1230
1231         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1232                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1233                         struct lu_fid tfid = bk->lb_lpf_fid;
1234
1235                         /* Invalid FID record in the bookmark file, reset it. */
1236                         fid_zero(&bk->lb_lpf_fid);
1237                         rc = lfsck_bookmark_store(env, lfsck);
1238
1239                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1240                                " in the bookmark file: rc = %d\n",
1241                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1242
1243                         if (rc != 0)
1244                                 GOTO(put, rc);
1245                 } else {
1246                         child1 = lfsck_object_find_by_dev(env, dev,
1247                                                           &bk->lb_lpf_fid);
1248                         if (IS_ERR(child1))
1249                                 GOTO(put, rc = PTR_ERR(child1));
1250
1251                         if (unlikely(!dt_object_exists(child1) ||
1252                                      dt_object_remote(child1)) ||
1253                                      !S_ISDIR(lfsck_object_type(child1))) {
1254                                 /* Invalid FID record in the bookmark file,
1255                                  * reset it. */
1256                                 fid_zero(&bk->lb_lpf_fid);
1257                                 rc = lfsck_bookmark_store(env, lfsck);
1258
1259                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1260                                        " in the bookmark file: rc = %d\n",
1261                                        lfsck_lfsck2name(lfsck),
1262                                        PFID(lfsck_dto2fid(child1)), rc);
1263
1264                                 if (rc != 0)
1265                                         GOTO(put, rc);
1266
1267                                 lu_object_put(env, &child1->do_lu);
1268                                 child1 = NULL;
1269                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1270                                 GOTO(put, rc = -ENOTDIR);
1271                         }
1272                 }
1273         }
1274
1275         snprintf(name, 8, "MDT%04x", node);
1276         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1277                        (const struct dt_key *)name, BYPASS_CAPA);
1278         if (rc == -ENOENT) {
1279                 if (!fid_is_zero(&bk->lb_lpf_fid))
1280                         goto check_child1;
1281
1282                 GOTO(put, rc = 0);
1283         }
1284
1285         if (rc != 0)
1286                 GOTO(put, rc);
1287
1288         /* Invalid FID in the name entry, remove the name entry. */
1289         if (!fid_is_norm(cfid)) {
1290                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1291                 if (rc != 0)
1292                         GOTO(put, rc);
1293
1294                 goto check_child1;
1295         }
1296
1297         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1298         if (IS_ERR(child2))
1299                 GOTO(put, rc = PTR_ERR(child2));
1300
1301         if (unlikely(!dt_object_exists(child2) ||
1302                      dt_object_remote(child2)) ||
1303                      !S_ISDIR(lfsck_object_type(child2))) {
1304                 rc = lfsck_remove_name_entry(env, lfsck, parent, name,
1305                                              S_IFDIR);
1306                 if (rc != 0)
1307                         GOTO(put, rc);
1308
1309                 goto check_child1;
1310         }
1311
1312         if (unlikely(!dt_try_as_dir(env, child2)))
1313                 GOTO(put, rc = -ENOTDIR);
1314
1315         if (child1 == NULL) {
1316                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
1317                                             pfid, LVLT_BY_NAMEENTRY);
1318         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1319                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1320                                             pfid, LVLT_BY_BOOKMARK);
1321                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1322                         rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
1323                                                     name, pfid,
1324                                                     LVLT_BY_NAMEENTRY);
1325         } else {
1326                 if (lfsck->li_lpf_obj == NULL) {
1327                         lu_object_get(&child2->do_lu);
1328                         lfsck->li_lpf_obj = child2;
1329                 }
1330
1331                 cname = lfsck_name_get_const(env, name, strlen(name));
1332                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1333         }
1334
1335         GOTO(put, rc);
1336
1337 check_child1:
1338         if (child1 != NULL)
1339                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1340                                             pfid, LVLT_BY_BOOKMARK);
1341
1342         GOTO(put, rc);
1343
1344 put:
1345         if (lfsck->li_lpf_obj != NULL &&
1346             unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
1347                 rc = -ENOTDIR;
1348
1349         if (child2 != NULL && !IS_ERR(child2))
1350                 lu_object_put(env, &child2->do_lu);
1351         if (child1 != NULL && !IS_ERR(child1))
1352                 lu_object_put(env, &child1->do_lu);
1353         if (parent != NULL && !IS_ERR(parent))
1354                 lu_object_put(env, &parent->do_lu);
1355
1356         return rc;
1357 }
1358
1359 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1360 {
1361         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1362         struct seq_server_site  *ss;
1363         char                    *prefix;
1364         int                      rc     = 0;
1365         ENTRY;
1366
1367         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1368         if (unlikely(ss == NULL))
1369                 RETURN(-ENXIO);
1370
1371         OBD_ALLOC_PTR(lfsck->li_seq);
1372         if (lfsck->li_seq == NULL)
1373                 RETURN(-ENOMEM);
1374
1375         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1376         if (prefix == NULL)
1377                 GOTO(out, rc = -ENOMEM);
1378
1379         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1380         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1381                              ss->ss_server_seq);
1382         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1383         if (rc != 0)
1384                 GOTO(out, rc);
1385
1386         if (fid_is_sane(&bk->lb_last_fid))
1387                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1388
1389         RETURN(0);
1390
1391 out:
1392         OBD_FREE_PTR(lfsck->li_seq);
1393         lfsck->li_seq = NULL;
1394
1395         return rc;
1396 }
1397
1398 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1399 {
1400         if (lfsck->li_seq != NULL) {
1401                 seq_client_fini(lfsck->li_seq);
1402                 OBD_FREE_PTR(lfsck->li_seq);
1403                 lfsck->li_seq = NULL;
1404         }
1405 }
1406
1407 void lfsck_instance_cleanup(const struct lu_env *env,
1408                             struct lfsck_instance *lfsck)
1409 {
1410         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1411         struct lfsck_component  *com;
1412         struct lfsck_component  *next;
1413         ENTRY;
1414
1415         LASSERT(list_empty(&lfsck->li_link));
1416         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1417
1418         if (lfsck->li_obj_oit != NULL) {
1419                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1420                 lfsck->li_obj_oit = NULL;
1421         }
1422
1423         LASSERT(lfsck->li_obj_dir == NULL);
1424
1425         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1426                 lfsck_component_cleanup(env, com);
1427         }
1428
1429         LASSERT(list_empty(&lfsck->li_list_dir));
1430
1431         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1432                                  lc_link) {
1433                 lfsck_component_cleanup(env, com);
1434         }
1435
1436         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1437                 lfsck_component_cleanup(env, com);
1438         }
1439
1440         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1441         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1442
1443         if (lfsck->li_bookmark_obj != NULL) {
1444                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1445                 lfsck->li_bookmark_obj = NULL;
1446         }
1447
1448         if (lfsck->li_lpf_obj != NULL) {
1449                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1450                 lfsck->li_lpf_obj = NULL;
1451         }
1452
1453         if (lfsck->li_los != NULL) {
1454                 local_oid_storage_fini(env, lfsck->li_los);
1455                 lfsck->li_los = NULL;
1456         }
1457
1458         lfsck_fid_fini(lfsck);
1459
1460         OBD_FREE_PTR(lfsck);
1461 }
1462
1463 static inline struct lfsck_instance *
1464 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1465 {
1466         struct lfsck_instance *lfsck;
1467
1468         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1469                 if (lfsck->li_bottom == key) {
1470                         if (ref)
1471                                 lfsck_instance_get(lfsck);
1472                         if (unlink)
1473                                 list_del_init(&lfsck->li_link);
1474
1475                         return lfsck;
1476                 }
1477         }
1478
1479         return NULL;
1480 }
1481
1482 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1483                                            bool unlink)
1484 {
1485         struct lfsck_instance *lfsck;
1486
1487         spin_lock(&lfsck_instance_lock);
1488         lfsck = __lfsck_instance_find(key, ref, unlink);
1489         spin_unlock(&lfsck_instance_lock);
1490
1491         return lfsck;
1492 }
1493
1494 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1495 {
1496         struct lfsck_instance *tmp;
1497
1498         spin_lock(&lfsck_instance_lock);
1499         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1500                 if (lfsck->li_bottom == tmp->li_bottom) {
1501                         spin_unlock(&lfsck_instance_lock);
1502                         return -EEXIST;
1503                 }
1504         }
1505
1506         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1507         spin_unlock(&lfsck_instance_lock);
1508         return 0;
1509 }
1510
1511 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1512                     const char *prefix)
1513 {
1514         int flag;
1515         int i;
1516         bool newline = (bits != 0 ? false : true);
1517
1518         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1519
1520         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1521                 if (flag & bits) {
1522                         bits &= ~flag;
1523                         if (names[i] != NULL) {
1524                                 if (bits == 0)
1525                                         newline = true;
1526
1527                                 seq_printf(m, "%s%c", names[i],
1528                                            newline ? '\n' : ',');
1529                         }
1530                 }
1531         }
1532
1533         if (!newline)
1534                 seq_printf(m, "\n");
1535         return 0;
1536 }
1537
1538 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1539 {
1540         if (time != 0)
1541                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1542                           cfs_time_current_sec() - time);
1543         else
1544                 seq_printf(m, "%s: N/A\n", prefix);
1545         return 0;
1546 }
1547
1548 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1549                    const char *prefix)
1550 {
1551         if (fid_is_zero(&pos->lp_dir_parent)) {
1552                 if (pos->lp_oit_cookie == 0)
1553                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1554                                    prefix);
1555                 else
1556                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1557                                    prefix, pos->lp_oit_cookie);
1558         } else {
1559                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1560                            prefix, pos->lp_oit_cookie,
1561                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1562         }
1563         return 0;
1564 }
1565
1566 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1567                     struct lfsck_position *pos, bool init)
1568 {
1569         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1570
1571         if (unlikely(lfsck->li_di_oit == NULL)) {
1572                 memset(pos, 0, sizeof(*pos));
1573                 return;
1574         }
1575
1576         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1577         if (!lfsck->li_current_oit_processed && !init)
1578                 pos->lp_oit_cookie--;
1579
1580         LASSERT(pos->lp_oit_cookie > 0);
1581
1582         if (lfsck->li_di_dir != NULL) {
1583                 struct dt_object *dto = lfsck->li_obj_dir;
1584
1585                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1586                                                         lfsck->li_di_dir);
1587
1588                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1589                         fid_zero(&pos->lp_dir_parent);
1590                         pos->lp_dir_cookie = 0;
1591                 } else {
1592                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1593                 }
1594         } else {
1595                 fid_zero(&pos->lp_dir_parent);
1596                 pos->lp_dir_cookie = 0;
1597         }
1598 }
1599
1600 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1601 {
1602         bool dirty = false;
1603
1604         if (limit != LFSCK_SPEED_NO_LIMIT) {
1605                 if (limit > HZ) {
1606                         lfsck->li_sleep_rate = limit / HZ;
1607                         lfsck->li_sleep_jif = 1;
1608                 } else {
1609                         lfsck->li_sleep_rate = 1;
1610                         lfsck->li_sleep_jif = HZ / limit;
1611                 }
1612         } else {
1613                 lfsck->li_sleep_jif = 0;
1614                 lfsck->li_sleep_rate = 0;
1615         }
1616
1617         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1618                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1619                 dirty = true;
1620         }
1621
1622         return dirty;
1623 }
1624
1625 void lfsck_control_speed(struct lfsck_instance *lfsck)
1626 {
1627         struct ptlrpc_thread *thread = &lfsck->li_thread;
1628         struct l_wait_info    lwi;
1629
1630         if (lfsck->li_sleep_jif > 0 &&
1631             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1632                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1633                                        LWI_ON_SIGNAL_NOOP, NULL);
1634
1635                 l_wait_event(thread->t_ctl_waitq,
1636                              !thread_is_running(thread),
1637                              &lwi);
1638                 lfsck->li_new_scanned = 0;
1639         }
1640 }
1641
1642 void lfsck_control_speed_by_self(struct lfsck_component *com)
1643 {
1644         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1645         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1646         struct l_wait_info       lwi;
1647
1648         if (lfsck->li_sleep_jif > 0 &&
1649             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1650                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1651                                        LWI_ON_SIGNAL_NOOP, NULL);
1652
1653                 l_wait_event(thread->t_ctl_waitq,
1654                              !thread_is_running(thread),
1655                              &lwi);
1656                 com->lc_new_scanned = 0;
1657         }
1658 }
1659
1660 static struct lfsck_thread_args *
1661 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1662                        struct lfsck_component *com,
1663                        struct lfsck_start_param *lsp)
1664 {
1665         struct lfsck_thread_args *lta;
1666         int                       rc;
1667
1668         OBD_ALLOC_PTR(lta);
1669         if (lta == NULL)
1670                 return ERR_PTR(-ENOMEM);
1671
1672         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1673         if (rc != 0) {
1674                 OBD_FREE_PTR(lta);
1675                 return ERR_PTR(rc);
1676         }
1677
1678         lta->lta_lfsck = lfsck_instance_get(lfsck);
1679         if (com != NULL)
1680                 lta->lta_com = lfsck_component_get(com);
1681
1682         lta->lta_lsp = lsp;
1683
1684         return lta;
1685 }
1686
1687 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1688 {
1689         if (lta->lta_com != NULL)
1690                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1691         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1692         lu_env_fini(&lta->lta_env);
1693         OBD_FREE_PTR(lta);
1694 }
1695
1696 struct lfsck_assistant_data *
1697 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1698                           const char *name)
1699 {
1700         struct lfsck_assistant_data *lad;
1701
1702         OBD_ALLOC_PTR(lad);
1703         if (lad != NULL) {
1704                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1705                 if (lad->lad_bitmap == NULL) {
1706                         OBD_FREE_PTR(lad);
1707                         return NULL;
1708                 }
1709
1710                 INIT_LIST_HEAD(&lad->lad_req_list);
1711                 spin_lock_init(&lad->lad_lock);
1712                 INIT_LIST_HEAD(&lad->lad_ost_list);
1713                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1714                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1715                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1716                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1717                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1718                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1719                 lad->lad_ops = lao;
1720                 lad->lad_name = name;
1721         }
1722
1723         return lad;
1724 }
1725
1726 /**
1727  * Generic LFSCK asynchronous communication interpretor function.
1728  * The LFSCK RPC reply for both the event notification and status
1729  * querying will be handled here.
1730  *
1731  * \param[in] env       pointer to the thread context
1732  * \param[in] req       pointer to the LFSCK request
1733  * \param[in] args      pointer to the lfsck_async_interpret_args
1734  * \param[in] rc        the result for handling the LFSCK request
1735  *
1736  * \retval              0 for success
1737  * \retval              negative error number on failure
1738  */
1739 int lfsck_async_interpret_common(const struct lu_env *env,
1740                                  struct ptlrpc_request *req,
1741                                  void *args, int rc)
1742 {
1743         struct lfsck_async_interpret_args *laia = args;
1744         struct lfsck_component            *com  = laia->laia_com;
1745         struct lfsck_assistant_data       *lad  = com->lc_data;
1746         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1747         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1748         struct lfsck_request              *lr   = laia->laia_lr;
1749
1750         LASSERT(com->lc_lfsck->li_master);
1751
1752         switch (lr->lr_event) {
1753         case LE_START:
1754                 if (rc != 0) {
1755                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1756                                "start: rc = %d\n",
1757                                lfsck_lfsck2name(com->lc_lfsck),
1758                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1759                                ltd->ltd_index, lad->lad_name, rc);
1760
1761                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1762                                 struct lfsck_layout *lo = com->lc_file_ram;
1763
1764                                 if (lr->lr_flags & LEF_TO_OST)
1765                                         lfsck_lad_set_bitmap(env, com,
1766                                                              ltd->ltd_index);
1767                                 else
1768                                         lo->ll_flags |= LF_INCOMPLETE;
1769                         } else {
1770                                 struct lfsck_namespace *ns = com->lc_file_ram;
1771
1772                                 /* If some MDT does not join the namespace
1773                                  * LFSCK, then we cannot know whether there
1774                                  * is some name entry on such MDT that with
1775                                  * the referenced MDT-object on this MDT or
1776                                  * not. So the namespace LFSCK on this MDT
1777                                  * cannot handle orphan MDT-objects properly.
1778                                  * So we mark the LFSCK as LF_INCOMPLETE and
1779                                  * skip orphan MDT-objects handling. */
1780                                 ns->ln_flags |= LF_INCOMPLETE;
1781                         }
1782                         break;
1783                 }
1784
1785                 spin_lock(&ltds->ltd_lock);
1786                 if (ltd->ltd_dead) {
1787                         spin_unlock(&ltds->ltd_lock);
1788                         break;
1789                 }
1790
1791                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1792                         struct list_head *list;
1793                         struct list_head *phase_list;
1794
1795                         if (ltd->ltd_layout_done) {
1796                                 spin_unlock(&ltds->ltd_lock);
1797                                 break;
1798                         }
1799
1800                         if (lr->lr_flags & LEF_TO_OST) {
1801                                 list = &lad->lad_ost_list;
1802                                 phase_list = &lad->lad_ost_phase1_list;
1803                         } else {
1804                                 list = &lad->lad_mdt_list;
1805                                 phase_list = &lad->lad_mdt_phase1_list;
1806                         }
1807
1808                         if (list_empty(&ltd->ltd_layout_list))
1809                                 list_add_tail(&ltd->ltd_layout_list, list);
1810                         if (list_empty(&ltd->ltd_layout_phase_list))
1811                                 list_add_tail(&ltd->ltd_layout_phase_list,
1812                                               phase_list);
1813                 } else {
1814                         if (ltd->ltd_namespace_done) {
1815                                 spin_unlock(&ltds->ltd_lock);
1816                                 break;
1817                         }
1818
1819                         if (list_empty(&ltd->ltd_namespace_list))
1820                                 list_add_tail(&ltd->ltd_namespace_list,
1821                                               &lad->lad_mdt_list);
1822                         if (list_empty(&ltd->ltd_namespace_phase_list))
1823                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1824                                               &lad->lad_mdt_phase1_list);
1825                 }
1826                 spin_unlock(&ltds->ltd_lock);
1827                 break;
1828         case LE_STOP:
1829         case LE_PHASE1_DONE:
1830         case LE_PHASE2_DONE:
1831         case LE_PEER_EXIT:
1832                 if (rc != 0 && rc != -EALREADY)
1833                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1834                               "event = %d, rc = %d\n",
1835                               lfsck_lfsck2name(com->lc_lfsck),
1836                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1837                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1838                 break;
1839         case LE_QUERY: {
1840                 struct lfsck_reply *reply;
1841                 struct list_head *list;
1842                 struct list_head *phase_list;
1843
1844                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1845                         list = &ltd->ltd_layout_list;
1846                         phase_list = &ltd->ltd_layout_phase_list;
1847                 } else {
1848                         list = &ltd->ltd_namespace_list;
1849                         phase_list = &ltd->ltd_namespace_phase_list;
1850                 }
1851
1852                 if (rc != 0) {
1853                         spin_lock(&ltds->ltd_lock);
1854                         list_del_init(phase_list);
1855                         list_del_init(list);
1856                         spin_unlock(&ltds->ltd_lock);
1857                         break;
1858                 }
1859
1860                 reply = req_capsule_server_get(&req->rq_pill,
1861                                                &RMF_LFSCK_REPLY);
1862                 if (reply == NULL) {
1863                         rc = -EPROTO;
1864                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1865                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1866                                lad->lad_name, rc);
1867                         spin_lock(&ltds->ltd_lock);
1868                         list_del_init(phase_list);
1869                         list_del_init(list);
1870                         spin_unlock(&ltds->ltd_lock);
1871                         break;
1872                 }
1873
1874                 switch (reply->lr_status) {
1875                 case LS_SCANNING_PHASE1:
1876                         break;
1877                 case LS_SCANNING_PHASE2:
1878                         spin_lock(&ltds->ltd_lock);
1879                         list_del_init(phase_list);
1880                         if (ltd->ltd_dead) {
1881                                 spin_unlock(&ltds->ltd_lock);
1882                                 break;
1883                         }
1884
1885                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1886                                 if (ltd->ltd_layout_done) {
1887                                         spin_unlock(&ltds->ltd_lock);
1888                                         break;
1889                                 }
1890
1891                                 if (lr->lr_flags & LEF_TO_OST)
1892                                         list_add_tail(phase_list,
1893                                                 &lad->lad_ost_phase2_list);
1894                                 else
1895                                         list_add_tail(phase_list,
1896                                                 &lad->lad_mdt_phase2_list);
1897                         } else {
1898                                 if (ltd->ltd_namespace_done) {
1899                                         spin_unlock(&ltds->ltd_lock);
1900                                         break;
1901                                 }
1902
1903                                 list_add_tail(phase_list,
1904                                               &lad->lad_mdt_phase2_list);
1905                         }
1906                         spin_unlock(&ltds->ltd_lock);
1907                         break;
1908                 default:
1909                         spin_lock(&ltds->ltd_lock);
1910                         list_del_init(phase_list);
1911                         list_del_init(list);
1912                         spin_unlock(&ltds->ltd_lock);
1913                         break;
1914                 }
1915                 break;
1916         }
1917         default:
1918                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
1919                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1920                 break;
1921         }
1922
1923         if (!laia->laia_shared) {
1924                 lfsck_tgt_put(ltd);
1925                 lfsck_component_put(env, com);
1926         }
1927
1928         return 0;
1929 }
1930
1931 static void lfsck_interpret(const struct lu_env *env,
1932                             struct lfsck_instance *lfsck,
1933                             struct ptlrpc_request *req, void *args, int result)
1934 {
1935         struct lfsck_async_interpret_args *laia = args;
1936         struct lfsck_component            *com;
1937
1938         LASSERT(laia->laia_com == NULL);
1939         LASSERT(laia->laia_shared);
1940
1941         spin_lock(&lfsck->li_lock);
1942         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1943                 laia->laia_com = com;
1944                 lfsck_async_interpret_common(env, req, laia, result);
1945         }
1946
1947         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1948                 laia->laia_com = com;
1949                 lfsck_async_interpret_common(env, req, laia, result);
1950         }
1951         spin_unlock(&lfsck->li_lock);
1952 }
1953
1954 static int lfsck_stop_notify(const struct lu_env *env,
1955                              struct lfsck_instance *lfsck,
1956                              struct lfsck_tgt_descs *ltds,
1957                              struct lfsck_tgt_desc *ltd, __u16 type)
1958 {
1959         struct lfsck_component *com;
1960         int                     rc = 0;
1961         ENTRY;
1962
1963         LASSERT(lfsck->li_master);
1964
1965         spin_lock(&lfsck->li_lock);
1966         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1967         if (com == NULL)
1968                 com = __lfsck_component_find(lfsck, type,
1969                                              &lfsck->li_list_double_scan);
1970         if (com != NULL)
1971                 lfsck_component_get(com);
1972         spin_unlock(&lfsck->li_lock);
1973
1974         if (com != NULL) {
1975                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
1976                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1977                 struct lfsck_request              *lr    = &info->lti_lr;
1978                 struct lfsck_assistant_data       *lad   = com->lc_data;
1979                 struct list_head                  *list;
1980                 struct list_head                  *phase_list;
1981                 struct ptlrpc_request_set         *set;
1982
1983                 set = ptlrpc_prep_set();
1984                 if (set == NULL) {
1985                         lfsck_component_put(env, com);
1986
1987                         RETURN(-ENOMEM);
1988                 }
1989
1990                 if (type == LFSCK_TYPE_LAYOUT) {
1991                         list = &ltd->ltd_layout_list;
1992                         phase_list = &ltd->ltd_layout_phase_list;
1993                 } else {
1994                         list = &ltd->ltd_namespace_list;
1995                         phase_list = &ltd->ltd_namespace_phase_list;
1996                 }
1997
1998                 spin_lock(&ltds->ltd_lock);
1999                 if (list_empty(list)) {
2000                         LASSERT(list_empty(phase_list));
2001                         spin_unlock(&ltds->ltd_lock);
2002                         ptlrpc_set_destroy(set);
2003
2004                         RETURN(0);
2005                 }
2006
2007                 list_del_init(phase_list);
2008                 list_del_init(list);
2009                 spin_unlock(&ltds->ltd_lock);
2010
2011                 memset(lr, 0, sizeof(*lr));
2012                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2013                 lr->lr_event = LE_PEER_EXIT;
2014                 lr->lr_active = type;
2015                 lr->lr_status = LS_CO_PAUSED;
2016                 if (ltds == &lfsck->li_ost_descs)
2017                         lr->lr_flags = LEF_TO_OST;
2018
2019                 laia->laia_com = com;
2020                 laia->laia_ltds = ltds;
2021                 atomic_inc(&ltd->ltd_ref);
2022                 laia->laia_ltd = ltd;
2023                 laia->laia_lr = lr;
2024                 laia->laia_shared = 0;
2025
2026                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2027                                          lfsck_async_interpret_common,
2028                                          laia, LFSCK_NOTIFY);
2029                 if (rc != 0) {
2030                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2031                                "co-stop for %s: rc = %d\n",
2032                                lfsck_lfsck2name(lfsck),
2033                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2034                                ltd->ltd_index, lad->lad_name, rc);
2035                         lfsck_tgt_put(ltd);
2036                 } else {
2037                         rc = ptlrpc_set_wait(set);
2038                 }
2039
2040                 ptlrpc_set_destroy(set);
2041                 lfsck_component_put(env, com);
2042         }
2043
2044         RETURN(rc);
2045 }
2046
2047 static int lfsck_async_interpret(const struct lu_env *env,
2048                                  struct ptlrpc_request *req,
2049                                  void *args, int rc)
2050 {
2051         struct lfsck_async_interpret_args *laia = args;
2052         struct lfsck_instance             *lfsck;
2053
2054         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2055                               li_mdt_descs);
2056         lfsck_interpret(env, lfsck, req, laia, rc);
2057         lfsck_tgt_put(laia->laia_ltd);
2058         if (rc != 0 && laia->laia_result != -EALREADY)
2059                 laia->laia_result = rc;
2060
2061         return 0;
2062 }
2063
2064 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2065                         struct lfsck_request *lr,
2066                         struct ptlrpc_request_set *set,
2067                         ptlrpc_interpterer_t interpreter,
2068                         void *args, int request)
2069 {
2070         struct lfsck_async_interpret_args *laia;
2071         struct ptlrpc_request             *req;
2072         struct lfsck_request              *tmp;
2073         struct req_format                 *format;
2074         int                                rc;
2075
2076         switch (request) {
2077         case LFSCK_NOTIFY:
2078                 format = &RQF_LFSCK_NOTIFY;
2079                 break;
2080         case LFSCK_QUERY:
2081                 format = &RQF_LFSCK_QUERY;
2082                 break;
2083         default:
2084                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2085                        exp->exp_obd->obd_name, request, -EINVAL);
2086                 return -EINVAL;
2087         }
2088
2089         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2090         if (req == NULL)
2091                 return -ENOMEM;
2092
2093         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2094         if (rc != 0) {
2095                 ptlrpc_request_free(req);
2096
2097                 return rc;
2098         }
2099
2100         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2101         *tmp = *lr;
2102         ptlrpc_request_set_replen(req);
2103
2104         laia = ptlrpc_req_async_args(req);
2105         *laia = *(struct lfsck_async_interpret_args *)args;
2106         if (laia->laia_com != NULL)
2107                 lfsck_component_get(laia->laia_com);
2108         req->rq_interpret_reply = interpreter;
2109         ptlrpc_set_add_req(set, req);
2110
2111         return 0;
2112 }
2113
2114 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2115                           struct lfsck_start_param *lsp)
2116 {
2117         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2118         struct lfsck_assistant_data     *lad     = com->lc_data;
2119         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2120         struct ptlrpc_thread            *athread = &lad->lad_thread;
2121         struct lfsck_thread_args        *lta;
2122         struct task_struct              *task;
2123         int                              rc;
2124         ENTRY;
2125
2126         lad->lad_assistant_status = 0;
2127         lad->lad_post_result = 0;
2128         lad->lad_to_post = 0;
2129         lad->lad_to_double_scan = 0;
2130         lad->lad_in_double_scan = 0;
2131         lad->lad_exit = 0;
2132         thread_set_flags(athread, 0);
2133
2134         lta = lfsck_thread_args_init(lfsck, com, lsp);
2135         if (IS_ERR(lta))
2136                 RETURN(PTR_ERR(lta));
2137
2138         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2139         if (IS_ERR(task)) {
2140                 rc = PTR_ERR(task);
2141                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2142                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2143                 lfsck_thread_args_fini(lta);
2144         } else {
2145                 struct l_wait_info lwi = { 0 };
2146
2147                 l_wait_event(mthread->t_ctl_waitq,
2148                              thread_is_running(athread) ||
2149                              thread_is_stopped(athread),
2150                              &lwi);
2151                 if (unlikely(!thread_is_running(athread)))
2152                         rc = lad->lad_assistant_status;
2153                 else
2154                         rc = 0;
2155         }
2156
2157         RETURN(rc);
2158 }
2159
2160 int lfsck_checkpoint_generic(const struct lu_env *env,
2161                              struct lfsck_component *com)
2162 {
2163         struct lfsck_assistant_data     *lad     = com->lc_data;
2164         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2165         struct ptlrpc_thread            *athread = &lad->lad_thread;
2166         struct l_wait_info               lwi     = { 0 };
2167
2168         if (com->lc_new_checked == 0)
2169                 return LFSCK_CHECKPOINT_SKIP;
2170
2171         l_wait_event(mthread->t_ctl_waitq,
2172                      list_empty(&lad->lad_req_list) ||
2173                      !thread_is_running(mthread) ||
2174                      thread_is_stopped(athread),
2175                      &lwi);
2176
2177         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2178                 return LFSCK_CHECKPOINT_SKIP;
2179
2180         return 0;
2181 }
2182
2183 void lfsck_post_generic(const struct lu_env *env,
2184                         struct lfsck_component *com, int *result)
2185 {
2186         struct lfsck_assistant_data     *lad     = com->lc_data;
2187         struct ptlrpc_thread            *athread = &lad->lad_thread;
2188         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2189         struct l_wait_info               lwi     = { 0 };
2190
2191         lad->lad_post_result = *result;
2192         if (*result <= 0)
2193                 lad->lad_exit = 1;
2194         lad->lad_to_post = 1;
2195
2196         wake_up_all(&athread->t_ctl_waitq);
2197         l_wait_event(mthread->t_ctl_waitq,
2198                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2199                      thread_is_stopped(athread),
2200                      &lwi);
2201
2202         if (lad->lad_assistant_status < 0)
2203                 *result = lad->lad_assistant_status;
2204 }
2205
2206 int lfsck_double_scan_generic(const struct lu_env *env,
2207                               struct lfsck_component *com, int status)
2208 {
2209         struct lfsck_assistant_data     *lad     = com->lc_data;
2210         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2211         struct ptlrpc_thread            *athread = &lad->lad_thread;
2212         struct l_wait_info               lwi     = { 0 };
2213
2214         if (status != LS_SCANNING_PHASE2)
2215                 lad->lad_exit = 1;
2216         else
2217                 lad->lad_to_double_scan = 1;
2218
2219         wake_up_all(&athread->t_ctl_waitq);
2220         l_wait_event(mthread->t_ctl_waitq,
2221                      lad->lad_in_double_scan ||
2222                      thread_is_stopped(athread),
2223                      &lwi);
2224
2225         if (lad->lad_assistant_status < 0)
2226                 return lad->lad_assistant_status;
2227
2228         return 0;
2229 }
2230
2231 void lfsck_quit_generic(const struct lu_env *env,
2232                         struct lfsck_component *com)
2233 {
2234         struct lfsck_assistant_data     *lad     = com->lc_data;
2235         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2236         struct ptlrpc_thread            *athread = &lad->lad_thread;
2237         struct l_wait_info               lwi     = { 0 };
2238
2239         lad->lad_exit = 1;
2240         wake_up_all(&athread->t_ctl_waitq);
2241         l_wait_event(mthread->t_ctl_waitq,
2242                      thread_is_init(athread) ||
2243                      thread_is_stopped(athread),
2244                      &lwi);
2245 }
2246
2247 /* external interfaces */
2248
2249 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2250 {
2251         struct lu_env           env;
2252         struct lfsck_instance  *lfsck;
2253         int                     rc;
2254         ENTRY;
2255
2256         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2257         if (rc != 0)
2258                 RETURN(rc);
2259
2260         lfsck = lfsck_instance_find(key, true, false);
2261         if (likely(lfsck != NULL)) {
2262                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2263                 lfsck_instance_put(&env, lfsck);
2264         } else {
2265                 rc = -ENXIO;
2266         }
2267
2268         lu_env_fini(&env);
2269
2270         RETURN(rc);
2271 }
2272 EXPORT_SYMBOL(lfsck_get_speed);
2273
2274 int lfsck_set_speed(struct dt_device *key, int val)
2275 {
2276         struct lu_env           env;
2277         struct lfsck_instance  *lfsck;
2278         int                     rc;
2279         ENTRY;
2280
2281         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2282         if (rc != 0)
2283                 RETURN(rc);
2284
2285         lfsck = lfsck_instance_find(key, true, false);
2286         if (likely(lfsck != NULL)) {
2287                 mutex_lock(&lfsck->li_mutex);
2288                 if (__lfsck_set_speed(lfsck, val))
2289                         rc = lfsck_bookmark_store(&env, lfsck);
2290                 mutex_unlock(&lfsck->li_mutex);
2291                 lfsck_instance_put(&env, lfsck);
2292         } else {
2293                 rc = -ENXIO;
2294         }
2295
2296         lu_env_fini(&env);
2297
2298         RETURN(rc);
2299 }
2300 EXPORT_SYMBOL(lfsck_set_speed);
2301
2302 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2303 {
2304         struct lu_env           env;
2305         struct lfsck_instance  *lfsck;
2306         int                     rc;
2307         ENTRY;
2308
2309         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2310         if (rc != 0)
2311                 RETURN(rc);
2312
2313         lfsck = lfsck_instance_find(key, true, false);
2314         if (likely(lfsck != NULL)) {
2315                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2316                 lfsck_instance_put(&env, lfsck);
2317         } else {
2318                 rc = -ENXIO;
2319         }
2320
2321         lu_env_fini(&env);
2322
2323         RETURN(rc);
2324 }
2325 EXPORT_SYMBOL(lfsck_get_windows);
2326
2327 int lfsck_set_windows(struct dt_device *key, int val)
2328 {
2329         struct lu_env           env;
2330         struct lfsck_instance  *lfsck;
2331         int                     rc;
2332         ENTRY;
2333
2334         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2335         if (rc != 0)
2336                 RETURN(rc);
2337
2338         lfsck = lfsck_instance_find(key, true, false);
2339         if (likely(lfsck != NULL)) {
2340                 if (val > LFSCK_ASYNC_WIN_MAX) {
2341                         CWARN("%s: Too large async window size, which "
2342                               "may cause memory issues. The valid range "
2343                               "is [0 - %u]. If you do not want to restrict "
2344                               "the window size for async requests pipeline, "
2345                               "just set it as 0.\n",
2346                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2347                         rc = -EINVAL;
2348                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2349                         mutex_lock(&lfsck->li_mutex);
2350                         lfsck->li_bookmark_ram.lb_async_windows = val;
2351                         rc = lfsck_bookmark_store(&env, lfsck);
2352                         mutex_unlock(&lfsck->li_mutex);
2353                 }
2354                 lfsck_instance_put(&env, lfsck);
2355         } else {
2356                 rc = -ENXIO;
2357         }
2358
2359         lu_env_fini(&env);
2360
2361         RETURN(rc);
2362 }
2363 EXPORT_SYMBOL(lfsck_set_windows);
2364
2365 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2366 {
2367         struct lu_env           env;
2368         struct lfsck_instance  *lfsck;
2369         struct lfsck_component *com;
2370         int                     rc;
2371         ENTRY;
2372
2373         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2374         if (rc != 0)
2375                 RETURN(rc);
2376
2377         lfsck = lfsck_instance_find(key, true, false);
2378         if (likely(lfsck != NULL)) {
2379                 com = lfsck_component_find(lfsck, type);
2380                 if (likely(com != NULL)) {
2381                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2382                         lfsck_component_put(&env, com);
2383                 } else {
2384                         rc = -ENOTSUPP;
2385                 }
2386
2387                 lfsck_instance_put(&env, lfsck);
2388         } else {
2389                 rc = -ENXIO;
2390         }
2391
2392         lu_env_fini(&env);
2393
2394         RETURN(rc);
2395 }
2396 EXPORT_SYMBOL(lfsck_dump);
2397
2398 static int lfsck_stop_all(const struct lu_env *env,
2399                           struct lfsck_instance *lfsck,
2400                           struct lfsck_stop *stop)
2401 {
2402         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2403         struct lfsck_request              *lr     = &info->lti_lr;
2404         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2405         struct ptlrpc_request_set         *set;
2406         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2407         struct lfsck_tgt_desc             *ltd;
2408         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2409         __u32                              idx;
2410         int                                rc     = 0;
2411         int                                rc1    = 0;
2412         ENTRY;
2413
2414         LASSERT(stop->ls_flags & LPF_BROADCAST);
2415
2416         set = ptlrpc_prep_set();
2417         if (unlikely(set == NULL))
2418                 RETURN(-ENOMEM);
2419
2420         memset(lr, 0, sizeof(*lr));
2421         lr->lr_event = LE_STOP;
2422         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2423         lr->lr_status = stop->ls_status;
2424         lr->lr_version = bk->lb_version;
2425         lr->lr_active = LFSCK_TYPES_ALL;
2426         lr->lr_param = stop->ls_flags;
2427
2428         laia->laia_com = NULL;
2429         laia->laia_ltds = ltds;
2430         laia->laia_lr = lr;
2431         laia->laia_result = 0;
2432         laia->laia_shared = 1;
2433
2434         down_read(&ltds->ltd_rw_sem);
2435         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2436                 ltd = lfsck_tgt_get(ltds, idx);
2437                 LASSERT(ltd != NULL);
2438
2439                 laia->laia_ltd = ltd;
2440                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2441                                          lfsck_async_interpret, laia,
2442                                          LFSCK_NOTIFY);
2443                 if (rc != 0) {
2444                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2445                         lfsck_tgt_put(ltd);
2446                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2447                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2448                         rc1 = rc;
2449                 }
2450         }
2451         up_read(&ltds->ltd_rw_sem);
2452
2453         rc = ptlrpc_set_wait(set);
2454         ptlrpc_set_destroy(set);
2455
2456         if (rc == 0)
2457                 rc = laia->laia_result;
2458
2459         if (rc == -EALREADY)
2460                 rc = 0;
2461
2462         if (rc != 0)
2463                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2464                        lfsck_lfsck2name(lfsck), rc);
2465
2466         RETURN(rc != 0 ? rc : rc1);
2467 }
2468
2469 static int lfsck_start_all(const struct lu_env *env,
2470                            struct lfsck_instance *lfsck,
2471                            struct lfsck_start *start)
2472 {
2473         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2474         struct lfsck_request              *lr     = &info->lti_lr;
2475         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2476         struct ptlrpc_request_set         *set;
2477         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2478         struct lfsck_tgt_desc             *ltd;
2479         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2480         __u32                              idx;
2481         int                                rc     = 0;
2482         ENTRY;
2483
2484         LASSERT(start->ls_flags & LPF_BROADCAST);
2485
2486         set = ptlrpc_prep_set();
2487         if (unlikely(set == NULL))
2488                 RETURN(-ENOMEM);
2489
2490         memset(lr, 0, sizeof(*lr));
2491         lr->lr_event = LE_START;
2492         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2493         lr->lr_speed = bk->lb_speed_limit;
2494         lr->lr_version = bk->lb_version;
2495         lr->lr_active = start->ls_active;
2496         lr->lr_param = start->ls_flags;
2497         lr->lr_async_windows = bk->lb_async_windows;
2498         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2499                        LSV_ASYNC_WINDOWS;
2500
2501         laia->laia_com = NULL;
2502         laia->laia_ltds = ltds;
2503         laia->laia_lr = lr;
2504         laia->laia_result = 0;
2505         laia->laia_shared = 1;
2506
2507         down_read(&ltds->ltd_rw_sem);
2508         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2509                 ltd = lfsck_tgt_get(ltds, idx);
2510                 LASSERT(ltd != NULL);
2511
2512                 laia->laia_ltd = ltd;
2513                 ltd->ltd_layout_done = 0;
2514                 ltd->ltd_namespace_done = 0;
2515                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2516                                          lfsck_async_interpret, laia,
2517                                          LFSCK_NOTIFY);
2518                 if (rc != 0) {
2519                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2520                         lfsck_tgt_put(ltd);
2521                         CERROR("%s: cannot notify MDT %x for LFSCK "
2522                                "start, failout: rc = %d\n",
2523                                lfsck_lfsck2name(lfsck), idx, rc);
2524                         break;
2525                 }
2526         }
2527         up_read(&ltds->ltd_rw_sem);
2528
2529         if (rc != 0) {
2530                 ptlrpc_set_destroy(set);
2531
2532                 RETURN(rc);
2533         }
2534
2535         rc = ptlrpc_set_wait(set);
2536         ptlrpc_set_destroy(set);
2537
2538         if (rc == 0)
2539                 rc = laia->laia_result;
2540
2541         if (rc != 0) {
2542                 struct lfsck_stop *stop = &info->lti_stop;
2543
2544                 CERROR("%s: cannot start LFSCK on some MDTs, "
2545                        "stop all: rc = %d\n",
2546                        lfsck_lfsck2name(lfsck), rc);
2547                 if (rc != -EALREADY) {
2548                         stop->ls_status = LS_FAILED;
2549                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2550                         lfsck_stop_all(env, lfsck, stop);
2551                 }
2552         }
2553
2554         RETURN(rc);
2555 }
2556
2557 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2558                 struct lfsck_start_param *lsp)
2559 {
2560         struct lfsck_start              *start  = lsp->lsp_start;
2561         struct lfsck_instance           *lfsck;
2562         struct lfsck_bookmark           *bk;
2563         struct ptlrpc_thread            *thread;
2564         struct lfsck_component          *com;
2565         struct l_wait_info               lwi    = { 0 };
2566         struct lfsck_thread_args        *lta;
2567         struct task_struct              *task;
2568         int                              rc     = 0;
2569         __u16                            valid  = 0;
2570         __u16                            flags  = 0;
2571         __u16                            type   = 1;
2572         ENTRY;
2573
2574         lfsck = lfsck_instance_find(key, true, false);
2575         if (unlikely(lfsck == NULL))
2576                 RETURN(-ENXIO);
2577
2578         /* System is not ready, try again later. */
2579         if (unlikely(lfsck->li_namespace == NULL))
2580                 GOTO(put, rc = -EAGAIN);
2581
2582         /* start == NULL means auto trigger paused LFSCK. */
2583         if ((start == NULL) &&
2584             (list_empty(&lfsck->li_list_scan) ||
2585              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2586                 GOTO(put, rc = 0);
2587
2588         bk = &lfsck->li_bookmark_ram;
2589         thread = &lfsck->li_thread;
2590         mutex_lock(&lfsck->li_mutex);
2591         spin_lock(&lfsck->li_lock);
2592         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2593                 rc = -EALREADY;
2594                 if (unlikely(start == NULL)) {
2595                         spin_unlock(&lfsck->li_lock);
2596                         GOTO(out, rc);
2597                 }
2598
2599                 while (start->ls_active != 0) {
2600                         if (!(type & start->ls_active)) {
2601                                 type <<= 1;
2602                                 continue;
2603                         }
2604
2605                         com = __lfsck_component_find(lfsck, type,
2606                                                      &lfsck->li_list_scan);
2607                         if (com == NULL)
2608                                 com = __lfsck_component_find(lfsck, type,
2609                                                 &lfsck->li_list_double_scan);
2610                         if (com == NULL) {
2611                                 rc = -EOPNOTSUPP;
2612                                 break;
2613                         }
2614
2615                         if (com->lc_ops->lfsck_join != NULL) {
2616                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2617                                 if (rc != 0 && rc != -EALREADY)
2618                                         break;
2619                         }
2620                         start->ls_active &= ~type;
2621                         type <<= 1;
2622                 }
2623                 spin_unlock(&lfsck->li_lock);
2624                 GOTO(out, rc);
2625         }
2626         spin_unlock(&lfsck->li_lock);
2627
2628         lfsck->li_status = 0;
2629         lfsck->li_oit_over = 0;
2630         lfsck->li_start_unplug = 0;
2631         lfsck->li_drop_dryrun = 0;
2632         lfsck->li_new_scanned = 0;
2633
2634         /* For auto trigger. */
2635         if (start == NULL)
2636                 goto trigger;
2637
2638         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2639                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2640                        lfsck_lfsck2name(lfsck));
2641
2642                 GOTO(out, rc = -EPERM);
2643         }
2644
2645         start->ls_version = bk->lb_version;
2646
2647         if (start->ls_active != 0) {
2648                 struct lfsck_component *next;
2649
2650                 if (start->ls_active == LFSCK_TYPES_ALL)
2651                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2652
2653                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2654                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2655                         GOTO(out, rc = -ENOTSUPP);
2656                 }
2657
2658                 list_for_each_entry_safe(com, next,
2659                                          &lfsck->li_list_scan, lc_link) {
2660                         if (!(com->lc_type & start->ls_active)) {
2661                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2662                                                              false);
2663                                 if (rc != 0)
2664                                         GOTO(out, rc);
2665                         }
2666                 }
2667
2668                 while (start->ls_active != 0) {
2669                         if (type & start->ls_active) {
2670                                 com = __lfsck_component_find(lfsck, type,
2671                                                         &lfsck->li_list_idle);
2672                                 if (com != NULL)
2673                                         /* The component status will be updated
2674                                          * when its prep() is called later by
2675                                          * the LFSCK main engine. */
2676                                         list_move_tail(&com->lc_link,
2677                                                        &lfsck->li_list_scan);
2678                                 start->ls_active &= ~type;
2679                         }
2680                         type <<= 1;
2681                 }
2682         }
2683
2684         if (list_empty(&lfsck->li_list_scan)) {
2685                 /* The speed limit will be used to control both the LFSCK and
2686                  * low layer scrub (if applied), need to be handled firstly. */
2687                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2688                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2689                                 rc = lfsck_bookmark_store(env, lfsck);
2690                                 if (rc != 0)
2691                                         GOTO(out, rc);
2692                         }
2693                 }
2694
2695                 goto trigger;
2696         }
2697
2698         if (start->ls_flags & LPF_RESET)
2699                 flags |= DOIF_RESET;
2700
2701         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2702         if (rc != 0)
2703                 GOTO(out, rc);
2704
2705         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2706                 start->ls_active |= com->lc_type;
2707                 if (flags & DOIF_RESET) {
2708                         rc = com->lc_ops->lfsck_reset(env, com, false);
2709                         if (rc != 0)
2710                                 GOTO(out, rc);
2711                 }
2712         }
2713
2714 trigger:
2715         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2716         if (bk->lb_param & LPF_DRYRUN)
2717                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2718
2719         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2720                 valid |= DOIV_ERROR_HANDLE;
2721                 if (start->ls_flags & LPF_FAILOUT)
2722                         flags |= DOIF_FAILOUT;
2723         }
2724
2725         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2726                 valid |= DOIV_DRYRUN;
2727                 if (start->ls_flags & LPF_DRYRUN)
2728                         flags |= DOIF_DRYRUN;
2729         }
2730
2731         if (!list_empty(&lfsck->li_list_scan))
2732                 flags |= DOIF_OUTUSED;
2733
2734         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2735         thread_set_flags(thread, 0);
2736         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2737         if (IS_ERR(lta))
2738                 GOTO(out, rc = PTR_ERR(lta));
2739
2740         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2741         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2742         if (IS_ERR(task)) {
2743                 rc = PTR_ERR(task);
2744                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2745                        lfsck_lfsck2name(lfsck), rc);
2746                 lfsck_thread_args_fini(lta);
2747
2748                 GOTO(out, rc);
2749         }
2750
2751         l_wait_event(thread->t_ctl_waitq,
2752                      thread_is_running(thread) ||
2753                      thread_is_stopped(thread),
2754                      &lwi);
2755         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2756                 lfsck->li_start_unplug = 1;
2757                 wake_up_all(&thread->t_ctl_waitq);
2758
2759                 GOTO(out, rc = 0);
2760         }
2761
2762         /* release lfsck::li_mutex to avoid deadlock. */
2763         mutex_unlock(&lfsck->li_mutex);
2764         rc = lfsck_start_all(env, lfsck, start);
2765         if (rc != 0) {
2766                 spin_lock(&lfsck->li_lock);
2767                 if (thread_is_stopped(thread)) {
2768                         spin_unlock(&lfsck->li_lock);
2769                 } else {
2770                         lfsck->li_status = LS_FAILED;
2771                         lfsck->li_flags = 0;
2772                         thread_set_flags(thread, SVC_STOPPING);
2773                         spin_unlock(&lfsck->li_lock);
2774
2775                         lfsck->li_start_unplug = 1;
2776                         wake_up_all(&thread->t_ctl_waitq);
2777                         l_wait_event(thread->t_ctl_waitq,
2778                                      thread_is_stopped(thread),
2779                                      &lwi);
2780                 }
2781         } else {
2782                 lfsck->li_start_unplug = 1;
2783                 wake_up_all(&thread->t_ctl_waitq);
2784         }
2785
2786         GOTO(put, rc);
2787
2788 out:
2789         mutex_unlock(&lfsck->li_mutex);
2790
2791 put:
2792         lfsck_instance_put(env, lfsck);
2793
2794         return rc < 0 ? rc : 0;
2795 }
2796 EXPORT_SYMBOL(lfsck_start);
2797
2798 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2799                struct lfsck_stop *stop)
2800 {
2801         struct lfsck_instance   *lfsck;
2802         struct ptlrpc_thread    *thread;
2803         struct l_wait_info       lwi    = { 0 };
2804         int                      rc     = 0;
2805         int                      rc1    = 0;
2806         ENTRY;
2807
2808         lfsck = lfsck_instance_find(key, true, false);
2809         if (unlikely(lfsck == NULL))
2810                 RETURN(-ENXIO);
2811
2812         thread = &lfsck->li_thread;
2813         /* release lfsck::li_mutex to avoid deadlock. */
2814         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2815                 if (!lfsck->li_master) {
2816                         CERROR("%s: only allow to specify '-A' via MDS\n",
2817                                lfsck_lfsck2name(lfsck));
2818
2819                         GOTO(out, rc = -EPERM);
2820                 }
2821
2822                 rc1 = lfsck_stop_all(env, lfsck, stop);
2823         }
2824
2825         mutex_lock(&lfsck->li_mutex);
2826         spin_lock(&lfsck->li_lock);
2827         /* no error if LFSCK is already stopped, or was never started */
2828         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2829                 spin_unlock(&lfsck->li_lock);
2830                 GOTO(out, rc = 0);
2831         }
2832
2833         if (stop != NULL) {
2834                 lfsck->li_status = stop->ls_status;
2835                 lfsck->li_flags = stop->ls_flags;
2836         } else {
2837                 lfsck->li_status = LS_STOPPED;
2838                 lfsck->li_flags = 0;
2839         }
2840
2841         thread_set_flags(thread, SVC_STOPPING);
2842         spin_unlock(&lfsck->li_lock);
2843
2844         wake_up_all(&thread->t_ctl_waitq);
2845         l_wait_event(thread->t_ctl_waitq,
2846                      thread_is_stopped(thread),
2847                      &lwi);
2848
2849         GOTO(out, rc = 0);
2850
2851 out:
2852         mutex_unlock(&lfsck->li_mutex);
2853         lfsck_instance_put(env, lfsck);
2854
2855         return rc != 0 ? rc : rc1;
2856 }
2857 EXPORT_SYMBOL(lfsck_stop);
2858
2859 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2860                     struct lfsck_request *lr)
2861 {
2862         int rc = -EOPNOTSUPP;
2863         ENTRY;
2864
2865         switch (lr->lr_event) {
2866         case LE_START: {
2867                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2868                 struct lfsck_start_param  lsp;
2869
2870                 memset(start, 0, sizeof(*start));
2871                 start->ls_valid = lr->lr_valid;
2872                 start->ls_speed_limit = lr->lr_speed;
2873                 start->ls_version = lr->lr_version;
2874                 start->ls_active = lr->lr_active;
2875                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2876                 start->ls_async_windows = lr->lr_async_windows;
2877
2878                 lsp.lsp_start = start;
2879                 lsp.lsp_index = lr->lr_index;
2880                 lsp.lsp_index_valid = 1;
2881                 rc = lfsck_start(env, key, &lsp);
2882                 break;
2883         }
2884         case LE_STOP: {
2885                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2886
2887                 memset(stop, 0, sizeof(*stop));
2888                 stop->ls_status = lr->lr_status;
2889                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2890                 rc = lfsck_stop(env, key, stop);
2891                 break;
2892         }
2893         case LE_PHASE1_DONE:
2894         case LE_PHASE2_DONE:
2895         case LE_FID_ACCESSED:
2896         case LE_PEER_EXIT:
2897         case LE_CONDITIONAL_DESTROY:
2898         case LE_PAIRS_VERIFY: {
2899                 struct lfsck_instance  *lfsck;
2900                 struct lfsck_component *com;
2901
2902                 lfsck = lfsck_instance_find(key, true, false);
2903                 if (unlikely(lfsck == NULL))
2904                         RETURN(-ENXIO);
2905
2906                 com = lfsck_component_find(lfsck, lr->lr_active);
2907                 if (likely(com != NULL)) {
2908                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2909                         lfsck_component_put(env, com);
2910                 }
2911
2912                 lfsck_instance_put(env, lfsck);
2913                 break;
2914         }
2915         default:
2916                 break;
2917         }
2918
2919         RETURN(rc);
2920 }
2921 EXPORT_SYMBOL(lfsck_in_notify);
2922
2923 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2924                 struct lfsck_request *lr)
2925 {
2926         struct lfsck_instance  *lfsck;
2927         struct lfsck_component *com;
2928         int                     rc;
2929         ENTRY;
2930
2931         lfsck = lfsck_instance_find(key, true, false);
2932         if (unlikely(lfsck == NULL))
2933                 RETURN(-ENXIO);
2934
2935         com = lfsck_component_find(lfsck, lr->lr_active);
2936         if (likely(com != NULL)) {
2937                 rc = com->lc_ops->lfsck_query(env, com);
2938                 lfsck_component_put(env, com);
2939         } else {
2940                 rc = -ENOTSUPP;
2941         }
2942
2943         lfsck_instance_put(env, lfsck);
2944
2945         RETURN(rc);
2946 }
2947 EXPORT_SYMBOL(lfsck_query);
2948
2949 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2950                              struct ldlm_namespace *ns)
2951 {
2952         struct lfsck_instance  *lfsck;
2953         int                     rc      = -ENXIO;
2954
2955         lfsck = lfsck_instance_find(key, true, false);
2956         if (likely(lfsck != NULL)) {
2957                 lfsck->li_namespace = ns;
2958                 lfsck_instance_put(env, lfsck);
2959                 rc = 0;
2960         }
2961
2962         return rc;
2963 }
2964 EXPORT_SYMBOL(lfsck_register_namespace);
2965
2966 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2967                    struct dt_device *next, struct obd_device *obd,
2968                    lfsck_out_notify notify, void *notify_data, bool master)
2969 {
2970         struct lfsck_instance   *lfsck;
2971         struct dt_object        *root  = NULL;
2972         struct dt_object        *obj   = NULL;
2973         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2974         int                      rc;
2975         ENTRY;
2976
2977         lfsck = lfsck_instance_find(key, false, false);
2978         if (unlikely(lfsck != NULL))
2979                 RETURN(-EEXIST);
2980
2981         OBD_ALLOC_PTR(lfsck);
2982         if (lfsck == NULL)
2983                 RETURN(-ENOMEM);
2984
2985         mutex_init(&lfsck->li_mutex);
2986         spin_lock_init(&lfsck->li_lock);
2987         INIT_LIST_HEAD(&lfsck->li_link);
2988         INIT_LIST_HEAD(&lfsck->li_list_scan);
2989         INIT_LIST_HEAD(&lfsck->li_list_dir);
2990         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2991         INIT_LIST_HEAD(&lfsck->li_list_idle);
2992         atomic_set(&lfsck->li_ref, 1);
2993         atomic_set(&lfsck->li_double_scan_count, 0);
2994         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2995         lfsck->li_out_notify = notify;
2996         lfsck->li_out_notify_data = notify_data;
2997         lfsck->li_next = next;
2998         lfsck->li_bottom = key;
2999         lfsck->li_obd = obd;
3000
3001         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3002         if (rc != 0)
3003                 GOTO(out, rc);
3004
3005         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3006         if (rc != 0)
3007                 GOTO(out, rc);
3008
3009         fid->f_seq = FID_SEQ_LOCAL_NAME;
3010         fid->f_oid = 1;
3011         fid->f_ver = 0;
3012         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3013         if (rc != 0)
3014                 GOTO(out, rc);
3015
3016         rc = dt_root_get(env, key, fid);
3017         if (rc != 0)
3018                 GOTO(out, rc);
3019
3020         root = dt_locate(env, key, fid);
3021         if (IS_ERR(root))
3022                 GOTO(out, rc = PTR_ERR(root));
3023
3024         if (unlikely(!dt_try_as_dir(env, root)))
3025                 GOTO(out, rc = -ENOTDIR);
3026
3027         lfsck->li_local_root_fid = *fid;
3028         if (master) {
3029                 lfsck->li_master = 1;
3030                 if (lfsck_dev_idx(key) == 0) {
3031                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3032                         const struct lu_name *cname;
3033
3034                         rc = dt_lookup(env, root,
3035                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3036                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3037                         if (rc != 0)
3038                                 GOTO(out, rc);
3039
3040                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3041                         if (IS_ERR(obj))
3042                                 GOTO(out, rc = PTR_ERR(obj));
3043
3044                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3045                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3046                         if (rc != 0)
3047                                 GOTO(out, rc);
3048
3049                         lu_object_put(env, &obj->do_lu);
3050                         obj = dt_locate(env, key, fid);
3051                         if (IS_ERR(obj))
3052                                 GOTO(out, rc = PTR_ERR(obj));
3053
3054                         cname = lfsck_name_get_const(env, dotlustre,
3055                                                      strlen(dotlustre));
3056                         rc = lfsck_verify_linkea(env, key, obj, cname,
3057                                                  &lfsck->li_global_root_fid);
3058                         if (rc != 0)
3059                                 GOTO(out, rc);
3060
3061                         *pfid = *fid;
3062                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3063                                        (const struct dt_key *)lostfound,
3064                                        BYPASS_CAPA);
3065                         if (rc != 0)
3066                                 GOTO(out, rc);
3067
3068                         lu_object_put(env, &obj->do_lu);
3069                         obj = dt_locate(env, key, fid);
3070                         if (IS_ERR(obj))
3071                                 GOTO(out, rc = PTR_ERR(obj));
3072
3073                         cname = lfsck_name_get_const(env, lostfound,
3074                                                      strlen(lostfound));
3075                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3076                         if (rc != 0)
3077                                 GOTO(out, rc);
3078
3079                         lu_object_put(env, &obj->do_lu);
3080                         obj = NULL;
3081                 }
3082         }
3083
3084         fid->f_seq = FID_SEQ_LOCAL_FILE;
3085         fid->f_oid = OTABLE_IT_OID;
3086         fid->f_ver = 0;
3087         obj = dt_locate(env, key, fid);
3088         if (IS_ERR(obj))
3089                 GOTO(out, rc = PTR_ERR(obj));
3090
3091         lu_object_get(&obj->do_lu);
3092         lfsck->li_obj_oit = obj;
3093         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3094         if (rc != 0)
3095                 GOTO(out, rc);
3096
3097         rc = lfsck_bookmark_setup(env, lfsck);
3098         if (rc != 0)
3099                 GOTO(out, rc);
3100
3101         if (master) {
3102                 rc = lfsck_fid_init(lfsck);
3103                 if (rc < 0)
3104                         GOTO(out, rc);
3105
3106                 rc = lfsck_namespace_setup(env, lfsck);
3107                 if (rc < 0)
3108                         GOTO(out, rc);
3109         }
3110
3111         rc = lfsck_layout_setup(env, lfsck);
3112         if (rc < 0)
3113                 GOTO(out, rc);
3114
3115         /* XXX: more LFSCK components initialization to be added here. */
3116
3117         rc = lfsck_instance_add(lfsck);
3118         if (rc == 0)
3119                 rc = lfsck_add_target_from_orphan(env, lfsck);
3120 out:
3121         if (obj != NULL && !IS_ERR(obj))
3122                 lu_object_put(env, &obj->do_lu);
3123         if (root != NULL && !IS_ERR(root))
3124                 lu_object_put(env, &root->do_lu);
3125         if (rc != 0)
3126                 lfsck_instance_cleanup(env, lfsck);
3127         return rc;
3128 }
3129 EXPORT_SYMBOL(lfsck_register);
3130
3131 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3132 {
3133         struct lfsck_instance *lfsck;
3134
3135         lfsck = lfsck_instance_find(key, false, true);
3136         if (lfsck != NULL)
3137                 lfsck_instance_put(env, lfsck);
3138 }
3139 EXPORT_SYMBOL(lfsck_degister);
3140
3141 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3142                      struct dt_device *tgt, struct obd_export *exp,
3143                      __u32 index, bool for_ost)
3144 {
3145         struct lfsck_instance   *lfsck;
3146         struct lfsck_tgt_desc   *ltd;
3147         int                      rc;
3148         ENTRY;
3149
3150         OBD_ALLOC_PTR(ltd);
3151         if (ltd == NULL)
3152                 RETURN(-ENOMEM);
3153
3154         ltd->ltd_tgt = tgt;
3155         ltd->ltd_key = key;
3156         ltd->ltd_exp = exp;
3157         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3158         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3159         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3160         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3161         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3162         atomic_set(&ltd->ltd_ref, 1);
3163         ltd->ltd_index = index;
3164
3165         spin_lock(&lfsck_instance_lock);
3166         lfsck = __lfsck_instance_find(key, true, false);
3167         if (lfsck == NULL) {
3168                 if (for_ost)
3169                         list_add_tail(&ltd->ltd_orphan_list,
3170                                       &lfsck_ost_orphan_list);
3171                 else
3172                         list_add_tail(&ltd->ltd_orphan_list,
3173                                       &lfsck_mdt_orphan_list);
3174                 spin_unlock(&lfsck_instance_lock);
3175
3176                 RETURN(0);
3177         }
3178         spin_unlock(&lfsck_instance_lock);
3179
3180         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3181         if (rc != 0)
3182                 lfsck_tgt_put(ltd);
3183
3184         lfsck_instance_put(env, lfsck);
3185
3186         RETURN(rc);
3187 }
3188 EXPORT_SYMBOL(lfsck_add_target);
3189
3190 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3191                       struct dt_device *tgt, __u32 index, bool for_ost)
3192 {
3193         struct lfsck_instance   *lfsck;
3194         struct lfsck_tgt_descs  *ltds;
3195         struct lfsck_tgt_desc   *ltd;
3196         struct list_head        *head;
3197
3198         if (for_ost)
3199                 head = &lfsck_ost_orphan_list;
3200         else
3201                 head = &lfsck_mdt_orphan_list;
3202
3203         spin_lock(&lfsck_instance_lock);
3204         list_for_each_entry(ltd, head, ltd_orphan_list) {
3205                 if (ltd->ltd_tgt == tgt) {
3206                         list_del_init(&ltd->ltd_orphan_list);
3207                         spin_unlock(&lfsck_instance_lock);
3208                         lfsck_tgt_put(ltd);
3209
3210                         return;
3211                 }
3212         }
3213
3214         ltd = NULL;
3215         lfsck = __lfsck_instance_find(key, true, false);
3216         spin_unlock(&lfsck_instance_lock);
3217         if (unlikely(lfsck == NULL))
3218                 return;
3219
3220         if (for_ost)
3221                 ltds = &lfsck->li_ost_descs;
3222         else
3223                 ltds = &lfsck->li_mdt_descs;
3224
3225         down_write(&ltds->ltd_rw_sem);
3226         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3227
3228         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3229                 goto unlock;
3230
3231         ltd = LTD_TGT(ltds, index);
3232         if (unlikely(ltd == NULL))
3233                 goto unlock;
3234
3235         LASSERT(ltds->ltd_tgtnr > 0);
3236
3237         ltds->ltd_tgtnr--;
3238         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3239         LTD_TGT(ltds, index) = NULL;
3240
3241 unlock:
3242         if (ltd == NULL) {
3243                 if (for_ost)
3244                         head = &lfsck->li_ost_descs.ltd_orphan;
3245                 else
3246                         head = &lfsck->li_mdt_descs.ltd_orphan;
3247
3248                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3249                         if (ltd->ltd_tgt == tgt) {
3250                                 list_del_init(&ltd->ltd_orphan_list);
3251                                 break;
3252                         }
3253                 }
3254         }
3255
3256         up_write(&ltds->ltd_rw_sem);
3257         if (ltd != NULL) {
3258                 spin_lock(&ltds->ltd_lock);
3259                 ltd->ltd_dead = 1;
3260                 spin_unlock(&ltds->ltd_lock);
3261                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3262                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3263                 lfsck_tgt_put(ltd);
3264         }
3265
3266         lfsck_instance_put(env, lfsck);
3267 }
3268 EXPORT_SYMBOL(lfsck_del_target);
3269
3270 static int __init lfsck_init(void)
3271 {
3272         int rc;
3273
3274         INIT_LIST_HEAD(&lfsck_instance_list);
3275         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3276         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3277         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3278         rc = lu_context_key_register(&lfsck_thread_key);
3279         if (rc == 0) {
3280                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3281                 tgt_register_lfsck_query(lfsck_query);
3282         }
3283
3284         return rc;
3285 }
3286
3287 static void __exit lfsck_exit(void)
3288 {
3289         struct lfsck_tgt_desc *ltd;
3290         struct lfsck_tgt_desc *next;
3291
3292         LASSERT(list_empty(&lfsck_instance_list));
3293
3294         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3295                                  ltd_orphan_list) {
3296                 list_del_init(&ltd->ltd_orphan_list);
3297                 lfsck_tgt_put(ltd);
3298         }
3299
3300         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3301                                  ltd_orphan_list) {
3302                 list_del_init(&ltd->ltd_orphan_list);
3303                 lfsck_tgt_put(ltd);
3304         }
3305
3306         lu_context_key_degister(&lfsck_thread_key);
3307 }
3308
3309 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3310 MODULE_DESCRIPTION("LFSCK");
3311 MODULE_LICENSE("GPL");
3312
3313 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);