Whamcloud - gitweb
42143f16e1d6c2bfa54da3ca9e514b148cd1cd1a
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         NULL
102 };
103
104 enum lfsck_verify_lpf_types {
105         LVLT_BY_BOOKMARK        = 0,
106         LVLT_BY_NAMEENTRY       = 1,
107 };
108
109 const char *lfsck_status2names(enum lfsck_status status)
110 {
111         if (unlikely(status < 0 || status >= LS_MAX))
112                 return "unknown";
113
114         return lfsck_status_names[status];
115 }
116
117 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
118 {
119         spin_lock_init(&ltds->ltd_lock);
120         init_rwsem(&ltds->ltd_rw_sem);
121         INIT_LIST_HEAD(&ltds->ltd_orphan);
122         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
123         if (ltds->ltd_tgts_bitmap == NULL)
124                 return -ENOMEM;
125
126         return 0;
127 }
128
129 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
130 {
131         struct lfsck_tgt_desc   *ltd;
132         struct lfsck_tgt_desc   *next;
133         int                      idx;
134
135         down_write(&ltds->ltd_rw_sem);
136
137         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
138                                  ltd_orphan_list) {
139                 list_del_init(&ltd->ltd_orphan_list);
140                 lfsck_tgt_put(ltd);
141         }
142
143         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
144                 up_write(&ltds->ltd_rw_sem);
145
146                 return;
147         }
148
149         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
150                 ltd = LTD_TGT(ltds, idx);
151                 if (likely(ltd != NULL)) {
152                         LASSERT(list_empty(&ltd->ltd_layout_list));
153                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
154                         LASSERT(list_empty(&ltd->ltd_namespace_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
156
157                         ltds->ltd_tgtnr--;
158                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
159                         LTD_TGT(ltds, idx) = NULL;
160                         lfsck_tgt_put(ltd);
161                 }
162         }
163
164         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
165                  ltds->ltd_tgtnr);
166
167         for (idx = 0; idx < TGT_PTRS; idx++) {
168                 if (ltds->ltd_tgts_idx[idx] != NULL) {
169                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
170                         ltds->ltd_tgts_idx[idx] = NULL;
171                 }
172         }
173
174         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
175         ltds->ltd_tgts_bitmap = NULL;
176         up_write(&ltds->ltd_rw_sem);
177 }
178
179 static int __lfsck_add_target(const struct lu_env *env,
180                               struct lfsck_instance *lfsck,
181                               struct lfsck_tgt_desc *ltd,
182                               bool for_ost, bool locked)
183 {
184         struct lfsck_tgt_descs *ltds;
185         __u32                   index = ltd->ltd_index;
186         int                     rc    = 0;
187         ENTRY;
188
189         if (for_ost)
190                 ltds = &lfsck->li_ost_descs;
191         else
192                 ltds = &lfsck->li_mdt_descs;
193
194         if (!locked)
195                 down_write(&ltds->ltd_rw_sem);
196
197         LASSERT(ltds->ltd_tgts_bitmap != NULL);
198
199         if (index >= ltds->ltd_tgts_bitmap->size) {
200                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
201                                     (__u32)BITS_PER_LONG);
202                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
203                 cfs_bitmap_t *new_bitmap;
204
205                 while (newsize < index + 1)
206                         newsize <<= 1;
207
208                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
209                 if (new_bitmap == NULL)
210                         GOTO(unlock, rc = -ENOMEM);
211
212                 if (ltds->ltd_tgtnr > 0)
213                         cfs_bitmap_copy(new_bitmap, old_bitmap);
214                 ltds->ltd_tgts_bitmap = new_bitmap;
215                 CFS_FREE_BITMAP(old_bitmap);
216         }
217
218         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
219                 CERROR("%s: the device %s (%u) is registered already\n",
220                        lfsck_lfsck2name(lfsck),
221                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
222                 GOTO(unlock, rc = -EEXIST);
223         }
224
225         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
226                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
227                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
228                         GOTO(unlock, rc = -ENOMEM);
229         }
230
231         LTD_TGT(ltds, index) = ltd;
232         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
233         ltds->ltd_tgtnr++;
234
235         GOTO(unlock, rc = 0);
236
237 unlock:
238         if (!locked)
239                 up_write(&ltds->ltd_rw_sem);
240
241         return rc;
242 }
243
244 static int lfsck_add_target_from_orphan(const struct lu_env *env,
245                                         struct lfsck_instance *lfsck)
246 {
247         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
248         struct lfsck_tgt_desc   *ltd;
249         struct lfsck_tgt_desc   *next;
250         struct list_head        *head    = &lfsck_ost_orphan_list;
251         int                      rc;
252         bool                     for_ost = true;
253
254 again:
255         spin_lock(&lfsck_instance_lock);
256         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
257                 if (ltd->ltd_key == lfsck->li_bottom)
258                         list_move_tail(&ltd->ltd_orphan_list,
259                                        &ltds->ltd_orphan);
260         }
261         spin_unlock(&lfsck_instance_lock);
262
263         down_write(&ltds->ltd_rw_sem);
264         while (!list_empty(&ltds->ltd_orphan)) {
265                 ltd = list_entry(ltds->ltd_orphan.next,
266                                  struct lfsck_tgt_desc,
267                                  ltd_orphan_list);
268                 list_del_init(&ltd->ltd_orphan_list);
269                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
270                 /* Do not hold the semaphore for too long time. */
271                 up_write(&ltds->ltd_rw_sem);
272                 if (rc != 0)
273                         return rc;
274
275                 down_write(&ltds->ltd_rw_sem);
276         }
277         up_write(&ltds->ltd_rw_sem);
278
279         if (for_ost) {
280                 ltds = &lfsck->li_mdt_descs;
281                 head = &lfsck_mdt_orphan_list;
282                 for_ost = false;
283                 goto again;
284         }
285
286         return 0;
287 }
288
289 static inline struct lfsck_component *
290 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
291                        struct list_head *list)
292 {
293         struct lfsck_component *com;
294
295         list_for_each_entry(com, list, lc_link) {
296                 if (com->lc_type == type)
297                         return com;
298         }
299         return NULL;
300 }
301
302 struct lfsck_component *
303 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
304 {
305         struct lfsck_component *com;
306
307         spin_lock(&lfsck->li_lock);
308         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
309         if (com != NULL)
310                 goto unlock;
311
312         com = __lfsck_component_find(lfsck, type,
313                                      &lfsck->li_list_double_scan);
314         if (com != NULL)
315                 goto unlock;
316
317         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
318
319 unlock:
320         if (com != NULL)
321                 lfsck_component_get(com);
322         spin_unlock(&lfsck->li_lock);
323         return com;
324 }
325
326 void lfsck_component_cleanup(const struct lu_env *env,
327                              struct lfsck_component *com)
328 {
329         if (!list_empty(&com->lc_link))
330                 list_del_init(&com->lc_link);
331         if (!list_empty(&com->lc_link_dir))
332                 list_del_init(&com->lc_link_dir);
333
334         lfsck_component_put(env, com);
335 }
336
337 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
338                     struct lu_fid *fid, bool locked)
339 {
340         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
341         int                      rc = 0;
342         ENTRY;
343
344         if (!locked)
345                 mutex_lock(&lfsck->li_mutex);
346
347         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
348         if (rc >= 0) {
349                 bk->lb_last_fid = *fid;
350                 /* We do not care about whether the subsequent sub-operations
351                  * failed or not. The worst case is that one FID is lost that
352                  * is not a big issue for the LFSCK since it is relative rare
353                  * for LFSCK create. */
354                 rc = lfsck_bookmark_store(env, lfsck);
355         }
356
357         if (!locked)
358                 mutex_unlock(&lfsck->li_mutex);
359
360         RETURN(rc);
361 }
362
363 /**
364  * Request the specified ibits lock for the given object.
365  *
366  * Before the LFSCK modifying on the namespace visible object,
367  * it needs to acquire related ibits ldlm lock.
368  *
369  * \param[in] env       pointer to the thread context
370  * \param[in] lfsck     pointer to the lfsck instance
371  * \param[in] obj       pointer to the dt_object to be locked
372  * \param[out] lh       pointer to the lock handle
373  * \param[in] ibits     the bits for the ldlm lock to be acquired
374  * \param[in] mode      the mode for the ldlm lock to be acquired
375  *
376  * \retval              0 for success
377  * \retval              negative error number on failure
378  */
379 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
380                      struct dt_object *obj, struct lustre_handle *lh,
381                      __u64 bits, ldlm_mode_t mode)
382 {
383         struct lfsck_thread_info        *info   = lfsck_env_info(env);
384         ldlm_policy_data_t              *policy = &info->lti_policy;
385         struct ldlm_res_id              *resid  = &info->lti_resid;
386         __u64                            flags  = LDLM_FL_ATOMIC_CB;
387         int                              rc;
388
389         LASSERT(lfsck->li_namespace != NULL);
390
391         memset(policy, 0, sizeof(*policy));
392         policy->l_inodebits.bits = bits;
393         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
394         if (dt_object_remote(obj)) {
395                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
396
397                 memset(einfo, 0, sizeof(*einfo));
398                 einfo->ei_type = LDLM_IBITS;
399                 einfo->ei_mode = mode;
400                 einfo->ei_cb_bl = ldlm_blocking_ast;
401                 einfo->ei_cb_cp = ldlm_completion_ast;
402                 einfo->ei_res_id = resid;
403
404                 rc = dt_object_lock(env, obj, lh, einfo, policy);
405         } else {
406                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
407                                             LDLM_IBITS, policy, mode,
408                                             &flags, ldlm_blocking_ast,
409                                             ldlm_completion_ast, NULL, NULL,
410                                             0, LVB_T_NONE, NULL, lh);
411         }
412
413         if (rc == ELDLM_OK) {
414                 rc = 0;
415         } else {
416                 memset(lh, 0, sizeof(*lh));
417                 rc = -EIO;
418         }
419
420         return rc;
421 }
422
423 /**
424  * Release the the specified ibits lock.
425  *
426  * If the lock has been acquired before, release it
427  * and cleanup the handle. Otherwise, do nothing.
428  *
429  * \param[in] lh        pointer to the lock handle
430  * \param[in] mode      the mode for the ldlm lock to be released
431  */
432 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
433 {
434         if (lustre_handle_is_used(lh)) {
435                 ldlm_lock_decref(lh, mode);
436                 memset(lh, 0, sizeof(*lh));
437         }
438 }
439
440 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
441                               struct lfsck_instance *lfsck,
442                               const struct lu_fid *fid)
443 {
444         struct seq_server_site  *ss     =
445                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
446         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
447         int                      rc;
448
449         fld_range_set_mdt(range);
450         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
451         if (rc == 0)
452                 rc = range->lsr_index;
453
454         return rc;
455 }
456
457 const char dot[] = ".";
458 const char dotdot[] = "..";
459 static const char dotlustre[] = ".lustre";
460 static const char lostfound[] = "lost+found";
461
462 static int lfsck_create_lpf_local(const struct lu_env *env,
463                                   struct lfsck_instance *lfsck,
464                                   struct dt_object *parent,
465                                   struct dt_object *child,
466                                   struct lu_attr *la,
467                                   struct dt_object_format *dof,
468                                   const char *name)
469 {
470         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
471         struct dt_device        *dev    = lfsck->li_bottom;
472         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
473         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
474         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
475         struct thandle          *th     = NULL;
476         struct linkea_data       ldata  = { 0 };
477         struct lu_buf            linkea_buf;
478         const struct lu_name    *cname;
479         loff_t                   pos    = 0;
480         int                      len    = sizeof(struct lfsck_bookmark);
481         int                      rc;
482         ENTRY;
483
484         rc = linkea_data_new(&ldata,
485                              &lfsck_env_info(env)->lti_linkea_buf2);
486         if (rc != 0)
487                 RETURN(rc);
488
489         cname = lfsck_name_get_const(env, name, strlen(name));
490         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
491         if (rc != 0)
492                 RETURN(rc);
493
494         th = dt_trans_create(env, dev);
495         if (IS_ERR(th))
496                 RETURN(PTR_ERR(th));
497
498         /* 1a. create child */
499         rc = dt_declare_create(env, child, la, NULL, dof, th);
500         if (rc != 0)
501                 GOTO(stop, rc);
502
503         /* 2a. increase child nlink */
504         rc = dt_declare_ref_add(env, child, th);
505         if (rc != 0)
506                 GOTO(stop, rc);
507
508         /* 3a. insert linkEA for child */
509         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
510                        ldata.ld_leh->leh_len);
511         rc = dt_declare_xattr_set(env, child, &linkea_buf,
512                                   XATTR_NAME_LINK, 0, th);
513         if (rc != 0)
514                 GOTO(stop, rc);
515
516         /* 4a. insert name into parent dir */
517         rec->rec_type = S_IFDIR;
518         rec->rec_fid = cfid;
519         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
520                                (const struct dt_key *)name, th);
521         if (rc != 0)
522                 GOTO(stop, rc);
523
524         /* 5a. increase parent nlink */
525         rc = dt_declare_ref_add(env, parent, th);
526         if (rc != 0)
527                 GOTO(stop, rc);
528
529         /* 6a. update bookmark */
530         rc = dt_declare_record_write(env, bk_obj,
531                                      lfsck_buf_get(env, bk, len), 0, th);
532         if (rc != 0)
533                 GOTO(stop, rc);
534
535         rc = dt_trans_start_local(env, dev, th);
536         if (rc != 0)
537                 GOTO(stop, rc);
538
539         dt_write_lock(env, child, 0);
540         /* 1b.1. create child */
541         rc = dt_create(env, child, la, NULL, dof, th);
542         if (rc != 0)
543                 GOTO(unlock, rc);
544
545         if (unlikely(!dt_try_as_dir(env, child)))
546                 GOTO(unlock, rc = -ENOTDIR);
547
548         /* 1b.2. insert dot into child dir */
549         rec->rec_fid = cfid;
550         rc = dt_insert(env, child, (const struct dt_rec *)rec,
551                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
552         if (rc != 0)
553                 GOTO(unlock, rc);
554
555         /* 1b.3. insert dotdot into child dir */
556         rec->rec_fid = &LU_LPF_FID;
557         rc = dt_insert(env, child, (const struct dt_rec *)rec,
558                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
559         if (rc != 0)
560                 GOTO(unlock, rc);
561
562         /* 2b. increase child nlink */
563         rc = dt_ref_add(env, child, th);
564         if (rc != 0)
565                 GOTO(unlock, rc);
566
567         /* 3b. insert linkEA for child. */
568         rc = dt_xattr_set(env, child, &linkea_buf,
569                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
570         dt_write_unlock(env, child);
571         if (rc != 0)
572                 GOTO(stop, rc);
573
574         /* 4b. insert name into parent dir */
575         rec->rec_fid = cfid;
576         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
577                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
578         if (rc != 0)
579                 GOTO(stop, rc);
580
581         dt_write_lock(env, parent, 0);
582         /* 5b. increase parent nlink */
583         rc = dt_ref_add(env, parent, th);
584         dt_write_unlock(env, parent);
585         if (rc != 0)
586                 GOTO(stop, rc);
587
588         bk->lb_lpf_fid = *cfid;
589         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
590
591         /* 6b. update bookmark */
592         rc = dt_record_write(env, bk_obj,
593                              lfsck_buf_get(env, bk, len), &pos, th);
594
595         GOTO(stop, rc);
596
597 unlock:
598         dt_write_unlock(env, child);
599
600 stop:
601         dt_trans_stop(env, dev, th);
602
603         return rc;
604 }
605
606 static int lfsck_create_lpf_remote(const struct lu_env *env,
607                                    struct lfsck_instance *lfsck,
608                                    struct dt_object *parent,
609                                    struct dt_object *child,
610                                    struct lu_attr *la,
611                                    struct dt_object_format *dof,
612                                    const char *name)
613 {
614         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
615         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
616         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
617         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
618         struct thandle          *th     = NULL;
619         struct linkea_data       ldata  = { 0 };
620         struct lu_buf            linkea_buf;
621         const struct lu_name    *cname;
622         struct dt_device        *dev;
623         loff_t                   pos    = 0;
624         int                      len    = sizeof(struct lfsck_bookmark);
625         int                      rc;
626         ENTRY;
627
628         rc = linkea_data_new(&ldata,
629                              &lfsck_env_info(env)->lti_linkea_buf2);
630         if (rc != 0)
631                 RETURN(rc);
632
633         cname = lfsck_name_get_const(env, name, strlen(name));
634         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
635         if (rc != 0)
636                 RETURN(rc);
637
638         /* Create .lustre/lost+found/MDTxxxx. */
639
640         /* XXX: Currently, cross-MDT create operation needs to create the child
641          *      object firstly, then insert name into the parent directory. For
642          *      this case, the child object resides on current MDT (local), but
643          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
644          *      easy to contain all the sub-modifications orderly within single
645          *      transaction.
646          *
647          *      To avoid more inconsistency, we split the create operation into
648          *      two transactions:
649          *
650          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
651          *         locally.
652          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
653          *         remotely.
654          *
655          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
656          *      repair such inconsistency when LFSCK run next time. */
657
658         /* Transaction I: locally */
659
660         dev = lfsck->li_bottom;
661         th = dt_trans_create(env, dev);
662         if (IS_ERR(th))
663                 RETURN(PTR_ERR(th));
664
665         /* 1a. create child */
666         rc = dt_declare_create(env, child, la, NULL, dof, th);
667         if (rc != 0)
668                 GOTO(stop, rc);
669
670         /* 2a. increase child nlink */
671         rc = dt_declare_ref_add(env, child, th);
672         if (rc != 0)
673                 GOTO(stop, rc);
674
675         /* 3a. insert linkEA for child */
676         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
677                        ldata.ld_leh->leh_len);
678         rc = dt_declare_xattr_set(env, child, &linkea_buf,
679                                   XATTR_NAME_LINK, 0, th);
680         if (rc != 0)
681                 GOTO(stop, rc);
682
683         /* 4a. update bookmark */
684         rc = dt_declare_record_write(env, bk_obj,
685                                      lfsck_buf_get(env, bk, len), 0, th);
686         if (rc != 0)
687                 GOTO(stop, rc);
688
689         rc = dt_trans_start_local(env, dev, th);
690         if (rc != 0)
691                 GOTO(stop, rc);
692
693         dt_write_lock(env, child, 0);
694         /* 1b.1. create child */
695         rc = dt_create(env, child, la, NULL, dof, th);
696         if (rc != 0)
697                 GOTO(unlock, rc);
698
699         if (unlikely(!dt_try_as_dir(env, child)))
700                 GOTO(unlock, rc = -ENOTDIR);
701
702         /* 1b.2. insert dot into child dir */
703         rec->rec_type = S_IFDIR;
704         rec->rec_fid = cfid;
705         rc = dt_insert(env, child, (const struct dt_rec *)rec,
706                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
707         if (rc != 0)
708                 GOTO(unlock, rc);
709
710         /* 1b.3. insert dotdot into child dir */
711         rec->rec_fid = &LU_LPF_FID;
712         rc = dt_insert(env, child, (const struct dt_rec *)rec,
713                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
714         if (rc != 0)
715                 GOTO(unlock, rc);
716
717         /* 2b. increase child nlink */
718         rc = dt_ref_add(env, child, th);
719         if (rc != 0)
720                 GOTO(unlock, rc);
721
722         /* 3b. insert linkEA for child */
723         rc = dt_xattr_set(env, child, &linkea_buf,
724                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
725         if (rc != 0)
726                 GOTO(unlock, rc);
727
728         bk->lb_lpf_fid = *cfid;
729         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
730
731         /* 4b. update bookmark */
732         rc = dt_record_write(env, bk_obj,
733                              lfsck_buf_get(env, bk, len), &pos, th);
734
735         dt_write_unlock(env, child);
736         dt_trans_stop(env, dev, th);
737         if (rc != 0)
738                 RETURN(rc);
739
740         /* Transaction II: remotely */
741
742         dev = lfsck->li_next;
743         th = dt_trans_create(env, dev);
744         if (IS_ERR(th))
745                 RETURN(PTR_ERR(th));
746
747         /* 5a. insert name into parent dir */
748         rec->rec_fid = cfid;
749         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
750                                (const struct dt_key *)name, th);
751         if (rc != 0)
752                 GOTO(stop, rc);
753
754         /* 6a. increase parent nlink */
755         rc = dt_declare_ref_add(env, parent, th);
756         if (rc != 0)
757                 GOTO(stop, rc);
758
759         rc = dt_trans_start(env, dev, th);
760         if (rc != 0)
761                 GOTO(stop, rc);
762
763         /* 5b. insert name into parent dir */
764         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
765                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
766         if (rc != 0)
767                 GOTO(stop, rc);
768
769         dt_write_lock(env, parent, 0);
770         /* 6b. increase parent nlink */
771         rc = dt_ref_add(env, parent, th);
772         dt_write_unlock(env, parent);
773
774         GOTO(stop, rc);
775
776 unlock:
777         dt_write_unlock(env, child);
778 stop:
779         dt_trans_stop(env, dev, th);
780
781         if (rc != 0 && dev == lfsck->li_next)
782                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
783                        "for orphans, but failed to insert the name %s "
784                        "to the .lustre/lost+found/. Such inconsistency "
785                        "will be repaired when LFSCK run next time: rc = %d\n",
786                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
787
788         return rc;
789 }
790
791 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
792  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
793  * only when it is required, such as orphan OST-objects repairing. */
794 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
795 {
796         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
797         struct lfsck_thread_info *info  = lfsck_env_info(env);
798         struct lu_fid            *cfid  = &info->lti_fid2;
799         struct lu_attr           *la    = &info->lti_la;
800         struct dt_object_format  *dof   = &info->lti_dof;
801         struct dt_object         *parent = NULL;
802         struct dt_object         *child = NULL;
803         struct lustre_handle      lh    = { 0 };
804         char                      name[8];
805         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
806         int                       rc    = 0;
807         ENTRY;
808
809         LASSERT(lfsck->li_master);
810
811         sprintf(name, "MDT%04x", node);
812         if (node == 0) {
813                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
814                                                   &LU_LPF_FID);
815         } else {
816                 struct lfsck_tgt_desc *ltd;
817
818                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
819                 if (unlikely(ltd == NULL))
820                         RETURN(-ENXIO);
821
822                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
823                                                   &LU_LPF_FID);
824                 lfsck_tgt_put(ltd);
825         }
826         if (IS_ERR(parent))
827                 RETURN(PTR_ERR(parent));
828
829         if (lfsck->li_lpf_obj != NULL)
830                 GOTO(out, rc = 0);
831
832         if (unlikely(!dt_try_as_dir(env, parent)))
833                 GOTO(out, rc = -ENOTDIR);
834
835         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
836                               MDS_INODELOCK_UPDATE, LCK_EX);
837         if (rc != 0)
838                 GOTO(out, rc);
839
840         mutex_lock(&lfsck->li_mutex);
841         if (lfsck->li_lpf_obj != NULL)
842                 GOTO(unlock, rc = 0);
843
844         if (fid_is_zero(&bk->lb_lpf_fid)) {
845                 /* There is corner case that: in former LFSCK scanning we have
846                  * created the .lustre/lost+found/MDTxxxx but failed to update
847                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
848                  * it from MDT0 firstly. */
849                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
850                                (const struct dt_key *)name, BYPASS_CAPA);
851                 if (rc != 0 && rc != -ENOENT)
852                         GOTO(unlock, rc);
853
854                 if (rc == 0) {
855                         bk->lb_lpf_fid = *cfid;
856                         rc = lfsck_bookmark_store(env, lfsck);
857                 } else {
858                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
859                 }
860                 if (rc != 0)
861                         GOTO(unlock, rc);
862         } else {
863                 *cfid = bk->lb_lpf_fid;
864         }
865
866         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
867         if (IS_ERR(child))
868                 GOTO(unlock, rc = PTR_ERR(child));
869
870         if (dt_object_exists(child) != 0) {
871                 if (unlikely(!dt_try_as_dir(env, child)))
872                         rc = -ENOTDIR;
873                 else
874                         lfsck->li_lpf_obj = child;
875
876                 GOTO(unlock, rc);
877         }
878
879         memset(la, 0, sizeof(*la));
880         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
881         la->la_mode = S_IFDIR | S_IRWXU;
882         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
883                        LA_UID | LA_GID;
884         memset(dof, 0, sizeof(*dof));
885         dof->dof_type = dt_mode_to_dft(S_IFDIR);
886
887         if (node == 0)
888                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
889                                             dof, name);
890         else
891                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
892                                              dof, name);
893         if (rc == 0)
894                 lfsck->li_lpf_obj = child;
895
896         GOTO(unlock, rc);
897
898 unlock:
899         mutex_unlock(&lfsck->li_mutex);
900         lfsck_ibits_unlock(&lh, LCK_EX);
901         if (rc != 0 && child != NULL && !IS_ERR(child))
902                 lu_object_put(env, &child->do_lu);
903 out:
904         if (parent != NULL && !IS_ERR(parent))
905                 lu_object_put(env, &parent->do_lu);
906
907         return rc;
908 }
909
910 /**
911  * Scan .lustre/lost+found for bad name entries and remove them.
912  *
913  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
914  * index in the system. Any other formatted name is invalid and should be
915  * removed.
916  *
917  * \param[in] env       pointer to the thread context
918  * \param[in] lfsck     pointer to the lfsck instance
919  * \param[in] parent    pointer to the lost+found object
920  *
921  * \retval              0 for success
922  * \retval              negative error number on failure
923  */
924 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
925                                       struct lfsck_instance *lfsck,
926                                       struct dt_object *parent)
927 {
928         struct lu_dirent        *ent    =
929                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
930         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
931         struct dt_it            *it;
932         int                      rc;
933         ENTRY;
934
935         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
936         if (IS_ERR(it))
937                 RETURN(PTR_ERR(it));
938
939         rc = iops->load(env, it, 0);
940         if (rc == 0)
941                 rc = iops->next(env, it);
942         else if (rc > 0)
943                 rc = 0;
944
945         while (rc == 0) {
946                 int off = 3;
947
948                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
949                 if (rc != 0)
950                         break;
951
952                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
953                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
954                         goto next;
955
956                 /* name length must be strlen("MDTxxxx") */
957                 if (ent->lde_namelen != 7)
958                         goto remove;
959
960                 if (memcmp(ent->lde_name, "MDT", off) != 0)
961                         goto remove;
962
963                 while (off < 7 && isxdigit(ent->lde_name[off]))
964                         off++;
965
966                 if (off != 7) {
967
968 remove:
969                         rc = lfsck_remove_name_entry(env, lfsck, parent,
970                                                      ent->lde_name, S_IFDIR);
971                         if (rc != 0)
972                                 break;
973                 }
974
975 next:
976                 rc = iops->next(env, it);
977         }
978
979         iops->put(env, it);
980         iops->fini(env, it);
981
982         RETURN(rc > 0 ? 0 : rc);
983 }
984
985 static int lfsck_update_lpf_entry(const struct lu_env *env,
986                                   struct lfsck_instance *lfsck,
987                                   struct dt_object *parent,
988                                   struct dt_object *child,
989                                   const char *name,
990                                   enum lfsck_verify_lpf_types type)
991 {
992         int rc;
993
994         if (type == LVLT_BY_BOOKMARK) {
995                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
996                                              lfsck_dto2fid(child), S_IFDIR);
997         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
998                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
999                 rc = lfsck_bookmark_store(env, lfsck);
1000
1001                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1002                        " in the bookmark file: rc = %d\n",
1003                        lfsck_lfsck2name(lfsck),
1004                        PFID(lfsck_dto2fid(child)), rc);
1005         }
1006
1007         return rc;
1008 }
1009
1010 /**
1011  * Check whether the @child back references the @parent.
1012  *
1013  * Two cases:
1014  * 1) The child's FID is stored in the bookmark file. If the child back
1015  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1016  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1017  *    the child back references another parent2, then:
1018  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1019  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1020  *      references the child. So keep them there. As the LFSCK processing,
1021  *      the parent3 may be found, then when the LFSCK run next time, the
1022  *      inconsistency can be repaired.
1023  *
1024  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1025  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1026  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1027  *    back references another parent2, then:
1028  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1029  *      from .lustre/lost+found/;
1030  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1031  *      sub-directory name entry and update the child;
1032  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1033  *      or not, then keep them there.
1034  *
1035  * \param[in] env       pointer to the thread context
1036  * \param[in] lfsck     pointer to the lfsck instance
1037  * \param[in] parent    pointer to the lost+found object
1038  * \param[in] child     pointer to the lost+found sub-directory object
1039  * \param[in] name      the name for lost+found sub-directory object
1040  * \param[out] fid      pointer to the buffer to hold the FID of the object
1041  *                      (called it as parent2) that is referenced via the
1042  *                      child's dotdot entry; it also can be the FID that
1043  *                      is referenced by the name entry under the parent2.
1044  * \param[in] type      to indicate where the child's FID is stored in
1045  *
1046  * \retval              positive number for uncertain inconsistency
1047  * \retval              0 for success
1048  * \retval              negative error number on failure
1049  */
1050 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1051                                   struct lfsck_instance *lfsck,
1052                                   struct dt_object *parent,
1053                                   struct dt_object *child, const char *name,
1054                                   struct lu_fid *fid,
1055                                   enum lfsck_verify_lpf_types type)
1056 {
1057         struct lfsck_thread_info *info    = lfsck_env_info(env);
1058         char                     *name2   = info->lti_key;
1059         struct lu_fid            *fid2    = &info->lti_fid3;
1060         struct dt_object         *parent2 = NULL;
1061         struct lustre_handle      lh      = { 0 };
1062         int                       rc;
1063         ENTRY;
1064
1065         fid_zero(fid);
1066         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1067                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1068         if (rc != 0)
1069                 GOTO(linkea, rc);
1070
1071         if (!fid_is_sane(fid))
1072                 GOTO(linkea, rc = -EINVAL);
1073
1074         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1075                 const struct lu_name *cname;
1076
1077                 if (lfsck->li_lpf_obj == NULL) {
1078                         lu_object_get(&child->do_lu);
1079                         lfsck->li_lpf_obj = child;
1080                 }
1081
1082                 cname = lfsck_name_get_const(env, name, strlen(name));
1083                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1084                                          &LU_LPF_FID);
1085                 if (rc == 0)
1086                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1087                                                     name, type);
1088
1089                 GOTO(out_done, rc);
1090         }
1091
1092         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1093         if (IS_ERR(parent2))
1094                 GOTO(linkea, parent2);
1095
1096         if (!dt_object_exists(parent2)) {
1097                 lu_object_put(env, &parent2->do_lu);
1098
1099                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1100         }
1101
1102         if (!dt_try_as_dir(env, parent2)) {
1103                 lu_object_put(env, &parent2->do_lu);
1104
1105                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1106         }
1107
1108 linkea:
1109         /* To prevent rename/unlink race */
1110         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1111                               MDS_INODELOCK_UPDATE, LCK_PR);
1112         if (rc != 0)
1113                 GOTO(out_put, rc);
1114
1115         dt_read_lock(env, child, 0);
1116         rc = lfsck_links_get_first(env, child, name2, fid2);
1117         if (rc != 0) {
1118                 dt_read_unlock(env, child);
1119                 lfsck_ibits_unlock(&lh, LCK_PR);
1120
1121                 GOTO(out_put, rc = 1);
1122         }
1123
1124         /* It is almost impossible that the bookmark file (or the name entry)
1125          * and the linkEA hit the same data corruption. Trust the linkEA. */
1126         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1127                 dt_read_unlock(env, child);
1128                 lfsck_ibits_unlock(&lh, LCK_PR);
1129
1130                 *fid = *fid2;
1131                 if (lfsck->li_lpf_obj == NULL) {
1132                         lu_object_get(&child->do_lu);
1133                         lfsck->li_lpf_obj = child;
1134                 }
1135
1136                 /* Update the child's dotdot entry */
1137                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1138                                              &LU_LPF_FID, S_IFDIR);
1139                 if (rc == 0)
1140                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1141                                                     name, type);
1142
1143                 GOTO(out_put, rc);
1144         }
1145
1146         if (parent2 == NULL || IS_ERR(parent2)) {
1147                 dt_read_unlock(env, child);
1148                 lfsck_ibits_unlock(&lh, LCK_PR);
1149
1150                 GOTO(out_done, rc = 1);
1151         }
1152
1153         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1154                        (const struct dt_key *)name2, BYPASS_CAPA);
1155         dt_read_unlock(env, child);
1156         lfsck_ibits_unlock(&lh, LCK_PR);
1157         if (rc != 0 && rc != -ENOENT)
1158                 GOTO(out_put, rc);
1159
1160         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1161                 if (type == LVLT_BY_BOOKMARK)
1162                         GOTO(out_put, rc = 1);
1163
1164                 /* Trust the name entry, update the child's dotdot entry. */
1165                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1166                                              &LU_LPF_FID, S_IFDIR);
1167
1168                 GOTO(out_put, rc);
1169         }
1170
1171         if (type == LVLT_BY_BOOKMARK) {
1172                 /* Invalid FID record in the bookmark file, reset it. */
1173                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1174                 rc = lfsck_bookmark_store(env, lfsck);
1175
1176                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1177                        " in the bookmark file: rc = %d\n",
1178                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1179         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1180                 /* The name entry is wrong, remove it. */
1181                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1182         }
1183
1184         GOTO(out_put, rc);
1185
1186 out_put:
1187         if (parent2 != NULL && !IS_ERR(parent2))
1188                 lu_object_put(env, &parent2->do_lu);
1189
1190 out_done:
1191         return rc;
1192 }
1193
1194 /**
1195  * Verify the /ROOT/.lustre/lost+found/ directory.
1196  *
1197  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1198  * the LFSCK does not exactly know how to handle, such as orphans. So before
1199  * the LFSCK scanning the system, the consistency of such directory needs to
1200  * be verified firstly to allow the users to use it during the LFSCK.
1201  *
1202  * \param[in] env       pointer to the thread context
1203  * \param[in] lfsck     pointer to the lfsck instance
1204  *
1205  * \retval              positive number for uncertain inconsistency
1206  * \retval              0 for success
1207  * \retval              negative error number on failure
1208  */
1209 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1210 {
1211         struct lfsck_thread_info *info   = lfsck_env_info(env);
1212         struct lu_fid            *pfid   = &info->lti_fid;
1213         struct lu_fid            *cfid   = &info->lti_fid2;
1214         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1215         struct dt_object         *parent = NULL;
1216         /* child1's FID is in the bookmark file. */
1217         struct dt_object         *child1 = NULL;
1218         /* child2's FID is in the name entry MDTxxxx. */
1219         struct dt_object         *child2 = NULL;
1220         struct dt_device         *dev    = lfsck->li_bottom;
1221         const struct lu_name     *cname;
1222         char                      name[8];
1223         int                       node   = lfsck_dev_idx(dev);
1224         int                       rc     = 0;
1225         ENTRY;
1226
1227         LASSERT(lfsck->li_master);
1228
1229         if (node == 0) {
1230                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1231         } else {
1232                 struct lfsck_tgt_desc *ltd;
1233
1234                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1235                 if (unlikely(ltd == NULL))
1236                         RETURN(-ENXIO);
1237
1238                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1239                                                   &LU_LPF_FID);
1240                 lfsck_tgt_put(ltd);
1241         }
1242
1243         if (IS_ERR(parent))
1244                 RETURN(PTR_ERR(parent));
1245
1246         LASSERT(dt_object_exists(parent));
1247
1248         if (unlikely(!dt_try_as_dir(env, parent)))
1249                 GOTO(put, rc = -ENOTDIR);
1250
1251         if (node == 0) {
1252                 rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
1253                 if (rc != 0)
1254                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1255                                "for bad sub-directories: rc = %d\n",
1256                                lfsck_lfsck2name(lfsck), rc);
1257         }
1258
1259         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1260                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1261                         struct lu_fid tfid = bk->lb_lpf_fid;
1262
1263                         /* Invalid FID record in the bookmark file, reset it. */
1264                         fid_zero(&bk->lb_lpf_fid);
1265                         rc = lfsck_bookmark_store(env, lfsck);
1266
1267                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1268                                " in the bookmark file: rc = %d\n",
1269                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1270
1271                         if (rc != 0)
1272                                 GOTO(put, rc);
1273                 } else {
1274                         child1 = lfsck_object_find_by_dev(env, dev,
1275                                                           &bk->lb_lpf_fid);
1276                         if (IS_ERR(child1))
1277                                 GOTO(put, rc = PTR_ERR(child1));
1278
1279                         if (unlikely(!dt_object_exists(child1) ||
1280                                      dt_object_remote(child1)) ||
1281                                      !S_ISDIR(lfsck_object_type(child1))) {
1282                                 /* Invalid FID record in the bookmark file,
1283                                  * reset it. */
1284                                 fid_zero(&bk->lb_lpf_fid);
1285                                 rc = lfsck_bookmark_store(env, lfsck);
1286
1287                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1288                                        " in the bookmark file: rc = %d\n",
1289                                        lfsck_lfsck2name(lfsck),
1290                                        PFID(lfsck_dto2fid(child1)), rc);
1291
1292                                 if (rc != 0)
1293                                         GOTO(put, rc);
1294
1295                                 lu_object_put(env, &child1->do_lu);
1296                                 child1 = NULL;
1297                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1298                                 GOTO(put, rc = -ENOTDIR);
1299                         }
1300                 }
1301         }
1302
1303         snprintf(name, 8, "MDT%04x", node);
1304         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1305                        (const struct dt_key *)name, BYPASS_CAPA);
1306         if (rc == -ENOENT) {
1307                 if (!fid_is_zero(&bk->lb_lpf_fid))
1308                         goto check_child1;
1309
1310                 GOTO(put, rc = 0);
1311         }
1312
1313         if (rc != 0)
1314                 GOTO(put, rc);
1315
1316         /* Invalid FID in the name entry, remove the name entry. */
1317         if (!fid_is_norm(cfid)) {
1318                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1319                 if (rc != 0)
1320                         GOTO(put, rc);
1321
1322                 goto check_child1;
1323         }
1324
1325         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1326         if (IS_ERR(child2))
1327                 GOTO(put, rc = PTR_ERR(child2));
1328
1329         if (unlikely(!dt_object_exists(child2) ||
1330                      dt_object_remote(child2)) ||
1331                      !S_ISDIR(lfsck_object_type(child2))) {
1332                 rc = lfsck_remove_name_entry(env, lfsck, parent, name,
1333                                              S_IFDIR);
1334                 if (rc != 0)
1335                         GOTO(put, rc);
1336
1337                 goto check_child1;
1338         }
1339
1340         if (unlikely(!dt_try_as_dir(env, child2)))
1341                 GOTO(put, rc = -ENOTDIR);
1342
1343         if (child1 == NULL) {
1344                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
1345                                             pfid, LVLT_BY_NAMEENTRY);
1346         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1347                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1348                                             pfid, LVLT_BY_BOOKMARK);
1349                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1350                         rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
1351                                                     name, pfid,
1352                                                     LVLT_BY_NAMEENTRY);
1353         } else {
1354                 if (lfsck->li_lpf_obj == NULL) {
1355                         lu_object_get(&child2->do_lu);
1356                         lfsck->li_lpf_obj = child2;
1357                 }
1358
1359                 cname = lfsck_name_get_const(env, name, strlen(name));
1360                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1361         }
1362
1363         GOTO(put, rc);
1364
1365 check_child1:
1366         if (child1 != NULL)
1367                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1368                                             pfid, LVLT_BY_BOOKMARK);
1369
1370         GOTO(put, rc);
1371
1372 put:
1373         if (lfsck->li_lpf_obj != NULL &&
1374             unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
1375                 rc = -ENOTDIR;
1376
1377         if (child2 != NULL && !IS_ERR(child2))
1378                 lu_object_put(env, &child2->do_lu);
1379         if (child1 != NULL && !IS_ERR(child1))
1380                 lu_object_put(env, &child1->do_lu);
1381         if (parent != NULL && !IS_ERR(parent))
1382                 lu_object_put(env, &parent->do_lu);
1383
1384         return rc;
1385 }
1386
1387 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1388 {
1389         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1390         struct seq_server_site  *ss;
1391         char                    *prefix;
1392         int                      rc     = 0;
1393         ENTRY;
1394
1395         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1396         if (unlikely(ss == NULL))
1397                 RETURN(-ENXIO);
1398
1399         OBD_ALLOC_PTR(lfsck->li_seq);
1400         if (lfsck->li_seq == NULL)
1401                 RETURN(-ENOMEM);
1402
1403         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1404         if (prefix == NULL)
1405                 GOTO(out, rc = -ENOMEM);
1406
1407         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1408         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1409                              ss->ss_server_seq);
1410         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1411         if (rc != 0)
1412                 GOTO(out, rc);
1413
1414         if (fid_is_sane(&bk->lb_last_fid))
1415                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1416
1417         RETURN(0);
1418
1419 out:
1420         OBD_FREE_PTR(lfsck->li_seq);
1421         lfsck->li_seq = NULL;
1422
1423         return rc;
1424 }
1425
1426 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1427 {
1428         if (lfsck->li_seq != NULL) {
1429                 seq_client_fini(lfsck->li_seq);
1430                 OBD_FREE_PTR(lfsck->li_seq);
1431                 lfsck->li_seq = NULL;
1432         }
1433 }
1434
1435 void lfsck_instance_cleanup(const struct lu_env *env,
1436                             struct lfsck_instance *lfsck)
1437 {
1438         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1439         struct lfsck_component  *com;
1440         struct lfsck_component  *next;
1441         struct lfsck_lmv_unit   *llu;
1442         struct lfsck_lmv_unit   *llu_next;
1443         struct lfsck_lmv        *llmv;
1444         ENTRY;
1445
1446         LASSERT(list_empty(&lfsck->li_link));
1447         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1448
1449         if (lfsck->li_obj_oit != NULL) {
1450                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1451                 lfsck->li_obj_oit = NULL;
1452         }
1453
1454         LASSERT(lfsck->li_obj_dir == NULL);
1455         LASSERT(lfsck->li_lmv == NULL);
1456
1457         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1458                 llmv = &llu->llu_lmv;
1459
1460                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1461                          "still in using: %u\n",
1462                          atomic_read(&llmv->ll_ref));
1463
1464                 lfsck_lmv_put(env, llmv);
1465         }
1466
1467         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1468                 lfsck_component_cleanup(env, com);
1469         }
1470
1471         LASSERT(list_empty(&lfsck->li_list_dir));
1472
1473         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1474                                  lc_link) {
1475                 lfsck_component_cleanup(env, com);
1476         }
1477
1478         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1479                 lfsck_component_cleanup(env, com);
1480         }
1481
1482         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1483         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1484
1485         if (lfsck->li_bookmark_obj != NULL) {
1486                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1487                 lfsck->li_bookmark_obj = NULL;
1488         }
1489
1490         if (lfsck->li_lpf_obj != NULL) {
1491                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1492                 lfsck->li_lpf_obj = NULL;
1493         }
1494
1495         if (lfsck->li_los != NULL) {
1496                 local_oid_storage_fini(env, lfsck->li_los);
1497                 lfsck->li_los = NULL;
1498         }
1499
1500         lfsck_fid_fini(lfsck);
1501
1502         OBD_FREE_PTR(lfsck);
1503 }
1504
1505 static inline struct lfsck_instance *
1506 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1507 {
1508         struct lfsck_instance *lfsck;
1509
1510         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1511                 if (lfsck->li_bottom == key) {
1512                         if (ref)
1513                                 lfsck_instance_get(lfsck);
1514                         if (unlink)
1515                                 list_del_init(&lfsck->li_link);
1516
1517                         return lfsck;
1518                 }
1519         }
1520
1521         return NULL;
1522 }
1523
1524 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1525                                            bool unlink)
1526 {
1527         struct lfsck_instance *lfsck;
1528
1529         spin_lock(&lfsck_instance_lock);
1530         lfsck = __lfsck_instance_find(key, ref, unlink);
1531         spin_unlock(&lfsck_instance_lock);
1532
1533         return lfsck;
1534 }
1535
1536 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1537 {
1538         struct lfsck_instance *tmp;
1539
1540         spin_lock(&lfsck_instance_lock);
1541         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1542                 if (lfsck->li_bottom == tmp->li_bottom) {
1543                         spin_unlock(&lfsck_instance_lock);
1544                         return -EEXIST;
1545                 }
1546         }
1547
1548         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1549         spin_unlock(&lfsck_instance_lock);
1550         return 0;
1551 }
1552
1553 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1554                     const char *prefix)
1555 {
1556         int flag;
1557         int i;
1558         bool newline = (bits != 0 ? false : true);
1559
1560         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1561
1562         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1563                 if (flag & bits) {
1564                         bits &= ~flag;
1565                         if (names[i] != NULL) {
1566                                 if (bits == 0)
1567                                         newline = true;
1568
1569                                 seq_printf(m, "%s%c", names[i],
1570                                            newline ? '\n' : ',');
1571                         }
1572                 }
1573         }
1574
1575         if (!newline)
1576                 seq_printf(m, "\n");
1577         return 0;
1578 }
1579
1580 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1581 {
1582         if (time != 0)
1583                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1584                           cfs_time_current_sec() - time);
1585         else
1586                 seq_printf(m, "%s: N/A\n", prefix);
1587         return 0;
1588 }
1589
1590 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1591                    const char *prefix)
1592 {
1593         if (fid_is_zero(&pos->lp_dir_parent)) {
1594                 if (pos->lp_oit_cookie == 0)
1595                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1596                                    prefix);
1597                 else
1598                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1599                                    prefix, pos->lp_oit_cookie);
1600         } else {
1601                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1602                            prefix, pos->lp_oit_cookie,
1603                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1604         }
1605         return 0;
1606 }
1607
1608 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1609                     struct lfsck_position *pos, bool init)
1610 {
1611         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1612
1613         if (unlikely(lfsck->li_di_oit == NULL)) {
1614                 memset(pos, 0, sizeof(*pos));
1615                 return;
1616         }
1617
1618         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1619         if (!lfsck->li_current_oit_processed && !init)
1620                 pos->lp_oit_cookie--;
1621
1622         LASSERT(pos->lp_oit_cookie > 0);
1623
1624         if (lfsck->li_di_dir != NULL) {
1625                 struct dt_object *dto = lfsck->li_obj_dir;
1626
1627                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1628                                                         lfsck->li_di_dir);
1629
1630                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1631                         fid_zero(&pos->lp_dir_parent);
1632                         pos->lp_dir_cookie = 0;
1633                 } else {
1634                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1635                 }
1636         } else {
1637                 fid_zero(&pos->lp_dir_parent);
1638                 pos->lp_dir_cookie = 0;
1639         }
1640 }
1641
1642 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1643 {
1644         bool dirty = false;
1645
1646         if (limit != LFSCK_SPEED_NO_LIMIT) {
1647                 if (limit > HZ) {
1648                         lfsck->li_sleep_rate = limit / HZ;
1649                         lfsck->li_sleep_jif = 1;
1650                 } else {
1651                         lfsck->li_sleep_rate = 1;
1652                         lfsck->li_sleep_jif = HZ / limit;
1653                 }
1654         } else {
1655                 lfsck->li_sleep_jif = 0;
1656                 lfsck->li_sleep_rate = 0;
1657         }
1658
1659         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1660                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1661                 dirty = true;
1662         }
1663
1664         return dirty;
1665 }
1666
1667 void lfsck_control_speed(struct lfsck_instance *lfsck)
1668 {
1669         struct ptlrpc_thread *thread = &lfsck->li_thread;
1670         struct l_wait_info    lwi;
1671
1672         if (lfsck->li_sleep_jif > 0 &&
1673             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1674                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1675                                        LWI_ON_SIGNAL_NOOP, NULL);
1676
1677                 l_wait_event(thread->t_ctl_waitq,
1678                              !thread_is_running(thread),
1679                              &lwi);
1680                 lfsck->li_new_scanned = 0;
1681         }
1682 }
1683
1684 void lfsck_control_speed_by_self(struct lfsck_component *com)
1685 {
1686         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1687         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1688         struct l_wait_info       lwi;
1689
1690         if (lfsck->li_sleep_jif > 0 &&
1691             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1692                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1693                                        LWI_ON_SIGNAL_NOOP, NULL);
1694
1695                 l_wait_event(thread->t_ctl_waitq,
1696                              !thread_is_running(thread),
1697                              &lwi);
1698                 com->lc_new_scanned = 0;
1699         }
1700 }
1701
1702 static struct lfsck_thread_args *
1703 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1704                        struct lfsck_component *com,
1705                        struct lfsck_start_param *lsp)
1706 {
1707         struct lfsck_thread_args *lta;
1708         int                       rc;
1709
1710         OBD_ALLOC_PTR(lta);
1711         if (lta == NULL)
1712                 return ERR_PTR(-ENOMEM);
1713
1714         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1715         if (rc != 0) {
1716                 OBD_FREE_PTR(lta);
1717                 return ERR_PTR(rc);
1718         }
1719
1720         lta->lta_lfsck = lfsck_instance_get(lfsck);
1721         if (com != NULL)
1722                 lta->lta_com = lfsck_component_get(com);
1723
1724         lta->lta_lsp = lsp;
1725
1726         return lta;
1727 }
1728
1729 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1730 {
1731         if (lta->lta_com != NULL)
1732                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1733         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1734         lu_env_fini(&lta->lta_env);
1735         OBD_FREE_PTR(lta);
1736 }
1737
1738 struct lfsck_assistant_data *
1739 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1740                           const char *name)
1741 {
1742         struct lfsck_assistant_data *lad;
1743
1744         OBD_ALLOC_PTR(lad);
1745         if (lad != NULL) {
1746                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1747                 if (lad->lad_bitmap == NULL) {
1748                         OBD_FREE_PTR(lad);
1749                         return NULL;
1750                 }
1751
1752                 INIT_LIST_HEAD(&lad->lad_req_list);
1753                 spin_lock_init(&lad->lad_lock);
1754                 INIT_LIST_HEAD(&lad->lad_ost_list);
1755                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1756                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1757                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1758                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1759                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1760                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1761                 lad->lad_ops = lao;
1762                 lad->lad_name = name;
1763         }
1764
1765         return lad;
1766 }
1767
1768 /**
1769  * Generic LFSCK asynchronous communication interpretor function.
1770  * The LFSCK RPC reply for both the event notification and status
1771  * querying will be handled here.
1772  *
1773  * \param[in] env       pointer to the thread context
1774  * \param[in] req       pointer to the LFSCK request
1775  * \param[in] args      pointer to the lfsck_async_interpret_args
1776  * \param[in] rc        the result for handling the LFSCK request
1777  *
1778  * \retval              0 for success
1779  * \retval              negative error number on failure
1780  */
1781 int lfsck_async_interpret_common(const struct lu_env *env,
1782                                  struct ptlrpc_request *req,
1783                                  void *args, int rc)
1784 {
1785         struct lfsck_async_interpret_args *laia = args;
1786         struct lfsck_component            *com  = laia->laia_com;
1787         struct lfsck_assistant_data       *lad  = com->lc_data;
1788         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1789         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1790         struct lfsck_request              *lr   = laia->laia_lr;
1791
1792         LASSERT(com->lc_lfsck->li_master);
1793
1794         switch (lr->lr_event) {
1795         case LE_START:
1796                 if (rc != 0) {
1797                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1798                                "start: rc = %d\n",
1799                                lfsck_lfsck2name(com->lc_lfsck),
1800                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1801                                ltd->ltd_index, lad->lad_name, rc);
1802
1803                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1804                                 struct lfsck_layout *lo = com->lc_file_ram;
1805
1806                                 if (lr->lr_flags & LEF_TO_OST)
1807                                         lfsck_lad_set_bitmap(env, com,
1808                                                              ltd->ltd_index);
1809                                 else
1810                                         lo->ll_flags |= LF_INCOMPLETE;
1811                         } else {
1812                                 struct lfsck_namespace *ns = com->lc_file_ram;
1813
1814                                 /* If some MDT does not join the namespace
1815                                  * LFSCK, then we cannot know whether there
1816                                  * is some name entry on such MDT that with
1817                                  * the referenced MDT-object on this MDT or
1818                                  * not. So the namespace LFSCK on this MDT
1819                                  * cannot handle orphan MDT-objects properly.
1820                                  * So we mark the LFSCK as LF_INCOMPLETE and
1821                                  * skip orphan MDT-objects handling. */
1822                                 ns->ln_flags |= LF_INCOMPLETE;
1823                         }
1824                         break;
1825                 }
1826
1827                 spin_lock(&ltds->ltd_lock);
1828                 if (ltd->ltd_dead) {
1829                         spin_unlock(&ltds->ltd_lock);
1830                         break;
1831                 }
1832
1833                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1834                         struct list_head *list;
1835                         struct list_head *phase_list;
1836
1837                         if (ltd->ltd_layout_done) {
1838                                 spin_unlock(&ltds->ltd_lock);
1839                                 break;
1840                         }
1841
1842                         if (lr->lr_flags & LEF_TO_OST) {
1843                                 list = &lad->lad_ost_list;
1844                                 phase_list = &lad->lad_ost_phase1_list;
1845                         } else {
1846                                 list = &lad->lad_mdt_list;
1847                                 phase_list = &lad->lad_mdt_phase1_list;
1848                         }
1849
1850                         if (list_empty(&ltd->ltd_layout_list))
1851                                 list_add_tail(&ltd->ltd_layout_list, list);
1852                         if (list_empty(&ltd->ltd_layout_phase_list))
1853                                 list_add_tail(&ltd->ltd_layout_phase_list,
1854                                               phase_list);
1855                 } else {
1856                         if (ltd->ltd_namespace_done) {
1857                                 spin_unlock(&ltds->ltd_lock);
1858                                 break;
1859                         }
1860
1861                         if (list_empty(&ltd->ltd_namespace_list))
1862                                 list_add_tail(&ltd->ltd_namespace_list,
1863                                               &lad->lad_mdt_list);
1864                         if (list_empty(&ltd->ltd_namespace_phase_list))
1865                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1866                                               &lad->lad_mdt_phase1_list);
1867                 }
1868                 spin_unlock(&ltds->ltd_lock);
1869                 break;
1870         case LE_STOP:
1871         case LE_PHASE1_DONE:
1872         case LE_PHASE2_DONE:
1873         case LE_PEER_EXIT:
1874                 if (rc != 0 && rc != -EALREADY)
1875                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1876                               "event = %d, rc = %d\n",
1877                               lfsck_lfsck2name(com->lc_lfsck),
1878                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1879                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1880                 break;
1881         case LE_QUERY: {
1882                 struct lfsck_reply *reply;
1883                 struct list_head *list;
1884                 struct list_head *phase_list;
1885
1886                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1887                         list = &ltd->ltd_layout_list;
1888                         phase_list = &ltd->ltd_layout_phase_list;
1889                 } else {
1890                         list = &ltd->ltd_namespace_list;
1891                         phase_list = &ltd->ltd_namespace_phase_list;
1892                 }
1893
1894                 if (rc != 0) {
1895                         spin_lock(&ltds->ltd_lock);
1896                         list_del_init(phase_list);
1897                         list_del_init(list);
1898                         spin_unlock(&ltds->ltd_lock);
1899                         break;
1900                 }
1901
1902                 reply = req_capsule_server_get(&req->rq_pill,
1903                                                &RMF_LFSCK_REPLY);
1904                 if (reply == NULL) {
1905                         rc = -EPROTO;
1906                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1907                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1908                                lad->lad_name, rc);
1909                         spin_lock(&ltds->ltd_lock);
1910                         list_del_init(phase_list);
1911                         list_del_init(list);
1912                         spin_unlock(&ltds->ltd_lock);
1913                         break;
1914                 }
1915
1916                 switch (reply->lr_status) {
1917                 case LS_SCANNING_PHASE1:
1918                         break;
1919                 case LS_SCANNING_PHASE2:
1920                         spin_lock(&ltds->ltd_lock);
1921                         list_del_init(phase_list);
1922                         if (ltd->ltd_dead) {
1923                                 spin_unlock(&ltds->ltd_lock);
1924                                 break;
1925                         }
1926
1927                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1928                                 if (ltd->ltd_layout_done) {
1929                                         spin_unlock(&ltds->ltd_lock);
1930                                         break;
1931                                 }
1932
1933                                 if (lr->lr_flags & LEF_TO_OST)
1934                                         list_add_tail(phase_list,
1935                                                 &lad->lad_ost_phase2_list);
1936                                 else
1937                                         list_add_tail(phase_list,
1938                                                 &lad->lad_mdt_phase2_list);
1939                         } else {
1940                                 if (ltd->ltd_namespace_done) {
1941                                         spin_unlock(&ltds->ltd_lock);
1942                                         break;
1943                                 }
1944
1945                                 list_add_tail(phase_list,
1946                                               &lad->lad_mdt_phase2_list);
1947                         }
1948                         spin_unlock(&ltds->ltd_lock);
1949                         break;
1950                 default:
1951                         spin_lock(&ltds->ltd_lock);
1952                         list_del_init(phase_list);
1953                         list_del_init(list);
1954                         spin_unlock(&ltds->ltd_lock);
1955                         break;
1956                 }
1957                 break;
1958         }
1959         default:
1960                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
1961                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1962                 break;
1963         }
1964
1965         if (!laia->laia_shared) {
1966                 lfsck_tgt_put(ltd);
1967                 lfsck_component_put(env, com);
1968         }
1969
1970         return 0;
1971 }
1972
1973 static void lfsck_interpret(const struct lu_env *env,
1974                             struct lfsck_instance *lfsck,
1975                             struct ptlrpc_request *req, void *args, int result)
1976 {
1977         struct lfsck_async_interpret_args *laia = args;
1978         struct lfsck_component            *com;
1979
1980         LASSERT(laia->laia_com == NULL);
1981         LASSERT(laia->laia_shared);
1982
1983         spin_lock(&lfsck->li_lock);
1984         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1985                 laia->laia_com = com;
1986                 lfsck_async_interpret_common(env, req, laia, result);
1987         }
1988
1989         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1990                 laia->laia_com = com;
1991                 lfsck_async_interpret_common(env, req, laia, result);
1992         }
1993         spin_unlock(&lfsck->li_lock);
1994 }
1995
1996 static int lfsck_stop_notify(const struct lu_env *env,
1997                              struct lfsck_instance *lfsck,
1998                              struct lfsck_tgt_descs *ltds,
1999                              struct lfsck_tgt_desc *ltd, __u16 type)
2000 {
2001         struct lfsck_component *com;
2002         int                     rc = 0;
2003         ENTRY;
2004
2005         LASSERT(lfsck->li_master);
2006
2007         spin_lock(&lfsck->li_lock);
2008         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2009         if (com == NULL)
2010                 com = __lfsck_component_find(lfsck, type,
2011                                              &lfsck->li_list_double_scan);
2012         if (com != NULL)
2013                 lfsck_component_get(com);
2014         spin_unlock(&lfsck->li_lock);
2015
2016         if (com != NULL) {
2017                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2018                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2019                 struct lfsck_request              *lr    = &info->lti_lr;
2020                 struct lfsck_assistant_data       *lad   = com->lc_data;
2021                 struct list_head                  *list;
2022                 struct list_head                  *phase_list;
2023                 struct ptlrpc_request_set         *set;
2024
2025                 set = ptlrpc_prep_set();
2026                 if (set == NULL) {
2027                         lfsck_component_put(env, com);
2028
2029                         RETURN(-ENOMEM);
2030                 }
2031
2032                 if (type == LFSCK_TYPE_LAYOUT) {
2033                         list = &ltd->ltd_layout_list;
2034                         phase_list = &ltd->ltd_layout_phase_list;
2035                 } else {
2036                         list = &ltd->ltd_namespace_list;
2037                         phase_list = &ltd->ltd_namespace_phase_list;
2038                 }
2039
2040                 spin_lock(&ltds->ltd_lock);
2041                 if (list_empty(list)) {
2042                         LASSERT(list_empty(phase_list));
2043                         spin_unlock(&ltds->ltd_lock);
2044                         ptlrpc_set_destroy(set);
2045
2046                         RETURN(0);
2047                 }
2048
2049                 list_del_init(phase_list);
2050                 list_del_init(list);
2051                 spin_unlock(&ltds->ltd_lock);
2052
2053                 memset(lr, 0, sizeof(*lr));
2054                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2055                 lr->lr_event = LE_PEER_EXIT;
2056                 lr->lr_active = type;
2057                 lr->lr_status = LS_CO_PAUSED;
2058                 if (ltds == &lfsck->li_ost_descs)
2059                         lr->lr_flags = LEF_TO_OST;
2060
2061                 laia->laia_com = com;
2062                 laia->laia_ltds = ltds;
2063                 atomic_inc(&ltd->ltd_ref);
2064                 laia->laia_ltd = ltd;
2065                 laia->laia_lr = lr;
2066                 laia->laia_shared = 0;
2067
2068                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2069                                          lfsck_async_interpret_common,
2070                                          laia, LFSCK_NOTIFY);
2071                 if (rc != 0) {
2072                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2073                                "co-stop for %s: rc = %d\n",
2074                                lfsck_lfsck2name(lfsck),
2075                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2076                                ltd->ltd_index, lad->lad_name, rc);
2077                         lfsck_tgt_put(ltd);
2078                 } else {
2079                         rc = ptlrpc_set_wait(set);
2080                 }
2081
2082                 ptlrpc_set_destroy(set);
2083                 lfsck_component_put(env, com);
2084         }
2085
2086         RETURN(rc);
2087 }
2088
2089 static int lfsck_async_interpret(const struct lu_env *env,
2090                                  struct ptlrpc_request *req,
2091                                  void *args, int rc)
2092 {
2093         struct lfsck_async_interpret_args *laia = args;
2094         struct lfsck_instance             *lfsck;
2095
2096         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2097                               li_mdt_descs);
2098         lfsck_interpret(env, lfsck, req, laia, rc);
2099         lfsck_tgt_put(laia->laia_ltd);
2100         if (rc != 0 && laia->laia_result != -EALREADY)
2101                 laia->laia_result = rc;
2102
2103         return 0;
2104 }
2105
2106 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2107                         struct lfsck_request *lr,
2108                         struct ptlrpc_request_set *set,
2109                         ptlrpc_interpterer_t interpreter,
2110                         void *args, int request)
2111 {
2112         struct lfsck_async_interpret_args *laia;
2113         struct ptlrpc_request             *req;
2114         struct lfsck_request              *tmp;
2115         struct req_format                 *format;
2116         int                                rc;
2117
2118         switch (request) {
2119         case LFSCK_NOTIFY:
2120                 format = &RQF_LFSCK_NOTIFY;
2121                 break;
2122         case LFSCK_QUERY:
2123                 format = &RQF_LFSCK_QUERY;
2124                 break;
2125         default:
2126                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2127                        exp->exp_obd->obd_name, request, -EINVAL);
2128                 return -EINVAL;
2129         }
2130
2131         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2132         if (req == NULL)
2133                 return -ENOMEM;
2134
2135         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2136         if (rc != 0) {
2137                 ptlrpc_request_free(req);
2138
2139                 return rc;
2140         }
2141
2142         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2143         *tmp = *lr;
2144         ptlrpc_request_set_replen(req);
2145
2146         laia = ptlrpc_req_async_args(req);
2147         *laia = *(struct lfsck_async_interpret_args *)args;
2148         if (laia->laia_com != NULL)
2149                 lfsck_component_get(laia->laia_com);
2150         req->rq_interpret_reply = interpreter;
2151         ptlrpc_set_add_req(set, req);
2152
2153         return 0;
2154 }
2155
2156 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2157                           struct lfsck_start_param *lsp)
2158 {
2159         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2160         struct lfsck_assistant_data     *lad     = com->lc_data;
2161         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2162         struct ptlrpc_thread            *athread = &lad->lad_thread;
2163         struct lfsck_thread_args        *lta;
2164         struct task_struct              *task;
2165         int                              rc;
2166         ENTRY;
2167
2168         lad->lad_assistant_status = 0;
2169         lad->lad_post_result = 0;
2170         lad->lad_to_post = 0;
2171         lad->lad_to_double_scan = 0;
2172         lad->lad_in_double_scan = 0;
2173         lad->lad_exit = 0;
2174         thread_set_flags(athread, 0);
2175
2176         lta = lfsck_thread_args_init(lfsck, com, lsp);
2177         if (IS_ERR(lta))
2178                 RETURN(PTR_ERR(lta));
2179
2180         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2181         if (IS_ERR(task)) {
2182                 rc = PTR_ERR(task);
2183                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2184                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2185                 lfsck_thread_args_fini(lta);
2186         } else {
2187                 struct l_wait_info lwi = { 0 };
2188
2189                 l_wait_event(mthread->t_ctl_waitq,
2190                              thread_is_running(athread) ||
2191                              thread_is_stopped(athread),
2192                              &lwi);
2193                 if (unlikely(!thread_is_running(athread)))
2194                         rc = lad->lad_assistant_status;
2195                 else
2196                         rc = 0;
2197         }
2198
2199         RETURN(rc);
2200 }
2201
2202 int lfsck_checkpoint_generic(const struct lu_env *env,
2203                              struct lfsck_component *com)
2204 {
2205         struct lfsck_assistant_data     *lad     = com->lc_data;
2206         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2207         struct ptlrpc_thread            *athread = &lad->lad_thread;
2208         struct l_wait_info               lwi     = { 0 };
2209
2210         if (com->lc_new_checked == 0)
2211                 return LFSCK_CHECKPOINT_SKIP;
2212
2213         l_wait_event(mthread->t_ctl_waitq,
2214                      list_empty(&lad->lad_req_list) ||
2215                      !thread_is_running(mthread) ||
2216                      thread_is_stopped(athread),
2217                      &lwi);
2218
2219         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2220                 return LFSCK_CHECKPOINT_SKIP;
2221
2222         return 0;
2223 }
2224
2225 void lfsck_post_generic(const struct lu_env *env,
2226                         struct lfsck_component *com, int *result)
2227 {
2228         struct lfsck_assistant_data     *lad     = com->lc_data;
2229         struct ptlrpc_thread            *athread = &lad->lad_thread;
2230         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2231         struct l_wait_info               lwi     = { 0 };
2232
2233         lad->lad_post_result = *result;
2234         if (*result <= 0)
2235                 lad->lad_exit = 1;
2236         lad->lad_to_post = 1;
2237
2238         wake_up_all(&athread->t_ctl_waitq);
2239         l_wait_event(mthread->t_ctl_waitq,
2240                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2241                      thread_is_stopped(athread),
2242                      &lwi);
2243
2244         if (lad->lad_assistant_status < 0)
2245                 *result = lad->lad_assistant_status;
2246 }
2247
2248 int lfsck_double_scan_generic(const struct lu_env *env,
2249                               struct lfsck_component *com, int status)
2250 {
2251         struct lfsck_assistant_data     *lad     = com->lc_data;
2252         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2253         struct ptlrpc_thread            *athread = &lad->lad_thread;
2254         struct l_wait_info               lwi     = { 0 };
2255
2256         if (status != LS_SCANNING_PHASE2)
2257                 lad->lad_exit = 1;
2258         else
2259                 lad->lad_to_double_scan = 1;
2260
2261         wake_up_all(&athread->t_ctl_waitq);
2262         l_wait_event(mthread->t_ctl_waitq,
2263                      lad->lad_in_double_scan ||
2264                      thread_is_stopped(athread),
2265                      &lwi);
2266
2267         if (lad->lad_assistant_status < 0)
2268                 return lad->lad_assistant_status;
2269
2270         return 0;
2271 }
2272
2273 void lfsck_quit_generic(const struct lu_env *env,
2274                         struct lfsck_component *com)
2275 {
2276         struct lfsck_assistant_data     *lad     = com->lc_data;
2277         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2278         struct ptlrpc_thread            *athread = &lad->lad_thread;
2279         struct l_wait_info               lwi     = { 0 };
2280
2281         lad->lad_exit = 1;
2282         wake_up_all(&athread->t_ctl_waitq);
2283         l_wait_event(mthread->t_ctl_waitq,
2284                      thread_is_init(athread) ||
2285                      thread_is_stopped(athread),
2286                      &lwi);
2287 }
2288
2289 /* external interfaces */
2290
2291 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2292 {
2293         struct lu_env           env;
2294         struct lfsck_instance  *lfsck;
2295         int                     rc;
2296         ENTRY;
2297
2298         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2299         if (rc != 0)
2300                 RETURN(rc);
2301
2302         lfsck = lfsck_instance_find(key, true, false);
2303         if (likely(lfsck != NULL)) {
2304                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2305                 lfsck_instance_put(&env, lfsck);
2306         } else {
2307                 rc = -ENXIO;
2308         }
2309
2310         lu_env_fini(&env);
2311
2312         RETURN(rc);
2313 }
2314 EXPORT_SYMBOL(lfsck_get_speed);
2315
2316 int lfsck_set_speed(struct dt_device *key, int val)
2317 {
2318         struct lu_env           env;
2319         struct lfsck_instance  *lfsck;
2320         int                     rc;
2321         ENTRY;
2322
2323         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2324         if (rc != 0)
2325                 RETURN(rc);
2326
2327         lfsck = lfsck_instance_find(key, true, false);
2328         if (likely(lfsck != NULL)) {
2329                 mutex_lock(&lfsck->li_mutex);
2330                 if (__lfsck_set_speed(lfsck, val))
2331                         rc = lfsck_bookmark_store(&env, lfsck);
2332                 mutex_unlock(&lfsck->li_mutex);
2333                 lfsck_instance_put(&env, lfsck);
2334         } else {
2335                 rc = -ENXIO;
2336         }
2337
2338         lu_env_fini(&env);
2339
2340         RETURN(rc);
2341 }
2342 EXPORT_SYMBOL(lfsck_set_speed);
2343
2344 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2345 {
2346         struct lu_env           env;
2347         struct lfsck_instance  *lfsck;
2348         int                     rc;
2349         ENTRY;
2350
2351         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2352         if (rc != 0)
2353                 RETURN(rc);
2354
2355         lfsck = lfsck_instance_find(key, true, false);
2356         if (likely(lfsck != NULL)) {
2357                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2358                 lfsck_instance_put(&env, lfsck);
2359         } else {
2360                 rc = -ENXIO;
2361         }
2362
2363         lu_env_fini(&env);
2364
2365         RETURN(rc);
2366 }
2367 EXPORT_SYMBOL(lfsck_get_windows);
2368
2369 int lfsck_set_windows(struct dt_device *key, int val)
2370 {
2371         struct lu_env           env;
2372         struct lfsck_instance  *lfsck;
2373         int                     rc;
2374         ENTRY;
2375
2376         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2377         if (rc != 0)
2378                 RETURN(rc);
2379
2380         lfsck = lfsck_instance_find(key, true, false);
2381         if (likely(lfsck != NULL)) {
2382                 if (val > LFSCK_ASYNC_WIN_MAX) {
2383                         CWARN("%s: Too large async window size, which "
2384                               "may cause memory issues. The valid range "
2385                               "is [0 - %u]. If you do not want to restrict "
2386                               "the window size for async requests pipeline, "
2387                               "just set it as 0.\n",
2388                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2389                         rc = -EINVAL;
2390                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2391                         mutex_lock(&lfsck->li_mutex);
2392                         lfsck->li_bookmark_ram.lb_async_windows = val;
2393                         rc = lfsck_bookmark_store(&env, lfsck);
2394                         mutex_unlock(&lfsck->li_mutex);
2395                 }
2396                 lfsck_instance_put(&env, lfsck);
2397         } else {
2398                 rc = -ENXIO;
2399         }
2400
2401         lu_env_fini(&env);
2402
2403         RETURN(rc);
2404 }
2405 EXPORT_SYMBOL(lfsck_set_windows);
2406
2407 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2408 {
2409         struct lu_env           env;
2410         struct lfsck_instance  *lfsck;
2411         struct lfsck_component *com;
2412         int                     rc;
2413         ENTRY;
2414
2415         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2416         if (rc != 0)
2417                 RETURN(rc);
2418
2419         lfsck = lfsck_instance_find(key, true, false);
2420         if (likely(lfsck != NULL)) {
2421                 com = lfsck_component_find(lfsck, type);
2422                 if (likely(com != NULL)) {
2423                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2424                         lfsck_component_put(&env, com);
2425                 } else {
2426                         rc = -ENOTSUPP;
2427                 }
2428
2429                 lfsck_instance_put(&env, lfsck);
2430         } else {
2431                 rc = -ENXIO;
2432         }
2433
2434         lu_env_fini(&env);
2435
2436         RETURN(rc);
2437 }
2438 EXPORT_SYMBOL(lfsck_dump);
2439
2440 static int lfsck_stop_all(const struct lu_env *env,
2441                           struct lfsck_instance *lfsck,
2442                           struct lfsck_stop *stop)
2443 {
2444         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2445         struct lfsck_request              *lr     = &info->lti_lr;
2446         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2447         struct ptlrpc_request_set         *set;
2448         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2449         struct lfsck_tgt_desc             *ltd;
2450         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2451         __u32                              idx;
2452         int                                rc     = 0;
2453         int                                rc1    = 0;
2454         ENTRY;
2455
2456         LASSERT(stop->ls_flags & LPF_BROADCAST);
2457
2458         set = ptlrpc_prep_set();
2459         if (unlikely(set == NULL))
2460                 RETURN(-ENOMEM);
2461
2462         memset(lr, 0, sizeof(*lr));
2463         lr->lr_event = LE_STOP;
2464         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2465         lr->lr_status = stop->ls_status;
2466         lr->lr_version = bk->lb_version;
2467         lr->lr_active = LFSCK_TYPES_ALL;
2468         lr->lr_param = stop->ls_flags;
2469
2470         laia->laia_com = NULL;
2471         laia->laia_ltds = ltds;
2472         laia->laia_lr = lr;
2473         laia->laia_result = 0;
2474         laia->laia_shared = 1;
2475
2476         down_read(&ltds->ltd_rw_sem);
2477         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2478                 ltd = lfsck_tgt_get(ltds, idx);
2479                 LASSERT(ltd != NULL);
2480
2481                 laia->laia_ltd = ltd;
2482                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2483                                          lfsck_async_interpret, laia,
2484                                          LFSCK_NOTIFY);
2485                 if (rc != 0) {
2486                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2487                         lfsck_tgt_put(ltd);
2488                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2489                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2490                         rc1 = rc;
2491                 }
2492         }
2493         up_read(&ltds->ltd_rw_sem);
2494
2495         rc = ptlrpc_set_wait(set);
2496         ptlrpc_set_destroy(set);
2497
2498         if (rc == 0)
2499                 rc = laia->laia_result;
2500
2501         if (rc == -EALREADY)
2502                 rc = 0;
2503
2504         if (rc != 0)
2505                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2506                        lfsck_lfsck2name(lfsck), rc);
2507
2508         RETURN(rc != 0 ? rc : rc1);
2509 }
2510
2511 static int lfsck_start_all(const struct lu_env *env,
2512                            struct lfsck_instance *lfsck,
2513                            struct lfsck_start *start)
2514 {
2515         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2516         struct lfsck_request              *lr     = &info->lti_lr;
2517         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2518         struct ptlrpc_request_set         *set;
2519         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2520         struct lfsck_tgt_desc             *ltd;
2521         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2522         __u32                              idx;
2523         int                                rc     = 0;
2524         ENTRY;
2525
2526         LASSERT(start->ls_flags & LPF_BROADCAST);
2527
2528         set = ptlrpc_prep_set();
2529         if (unlikely(set == NULL))
2530                 RETURN(-ENOMEM);
2531
2532         memset(lr, 0, sizeof(*lr));
2533         lr->lr_event = LE_START;
2534         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2535         lr->lr_speed = bk->lb_speed_limit;
2536         lr->lr_version = bk->lb_version;
2537         lr->lr_active = start->ls_active;
2538         lr->lr_param = start->ls_flags;
2539         lr->lr_async_windows = bk->lb_async_windows;
2540         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2541                        LSV_ASYNC_WINDOWS;
2542
2543         laia->laia_com = NULL;
2544         laia->laia_ltds = ltds;
2545         laia->laia_lr = lr;
2546         laia->laia_result = 0;
2547         laia->laia_shared = 1;
2548
2549         down_read(&ltds->ltd_rw_sem);
2550         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2551                 ltd = lfsck_tgt_get(ltds, idx);
2552                 LASSERT(ltd != NULL);
2553
2554                 laia->laia_ltd = ltd;
2555                 ltd->ltd_layout_done = 0;
2556                 ltd->ltd_namespace_done = 0;
2557                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2558                                          lfsck_async_interpret, laia,
2559                                          LFSCK_NOTIFY);
2560                 if (rc != 0) {
2561                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2562                         lfsck_tgt_put(ltd);
2563                         CERROR("%s: cannot notify MDT %x for LFSCK "
2564                                "start, failout: rc = %d\n",
2565                                lfsck_lfsck2name(lfsck), idx, rc);
2566                         break;
2567                 }
2568         }
2569         up_read(&ltds->ltd_rw_sem);
2570
2571         if (rc != 0) {
2572                 ptlrpc_set_destroy(set);
2573
2574                 RETURN(rc);
2575         }
2576
2577         rc = ptlrpc_set_wait(set);
2578         ptlrpc_set_destroy(set);
2579
2580         if (rc == 0)
2581                 rc = laia->laia_result;
2582
2583         if (rc != 0) {
2584                 struct lfsck_stop *stop = &info->lti_stop;
2585
2586                 CERROR("%s: cannot start LFSCK on some MDTs, "
2587                        "stop all: rc = %d\n",
2588                        lfsck_lfsck2name(lfsck), rc);
2589                 if (rc != -EALREADY) {
2590                         stop->ls_status = LS_FAILED;
2591                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2592                         lfsck_stop_all(env, lfsck, stop);
2593                 }
2594         }
2595
2596         RETURN(rc);
2597 }
2598
2599 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2600                 struct lfsck_start_param *lsp)
2601 {
2602         struct lfsck_start              *start  = lsp->lsp_start;
2603         struct lfsck_instance           *lfsck;
2604         struct lfsck_bookmark           *bk;
2605         struct ptlrpc_thread            *thread;
2606         struct lfsck_component          *com;
2607         struct l_wait_info               lwi    = { 0 };
2608         struct lfsck_thread_args        *lta;
2609         struct task_struct              *task;
2610         int                              rc     = 0;
2611         __u16                            valid  = 0;
2612         __u16                            flags  = 0;
2613         __u16                            type   = 1;
2614         ENTRY;
2615
2616         lfsck = lfsck_instance_find(key, true, false);
2617         if (unlikely(lfsck == NULL))
2618                 RETURN(-ENXIO);
2619
2620         /* System is not ready, try again later. */
2621         if (unlikely(lfsck->li_namespace == NULL))
2622                 GOTO(put, rc = -EAGAIN);
2623
2624         /* start == NULL means auto trigger paused LFSCK. */
2625         if ((start == NULL) &&
2626             (list_empty(&lfsck->li_list_scan) ||
2627              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2628                 GOTO(put, rc = 0);
2629
2630         bk = &lfsck->li_bookmark_ram;
2631         thread = &lfsck->li_thread;
2632         mutex_lock(&lfsck->li_mutex);
2633         spin_lock(&lfsck->li_lock);
2634         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2635                 rc = -EALREADY;
2636                 if (unlikely(start == NULL)) {
2637                         spin_unlock(&lfsck->li_lock);
2638                         GOTO(out, rc);
2639                 }
2640
2641                 while (start->ls_active != 0) {
2642                         if (!(type & start->ls_active)) {
2643                                 type <<= 1;
2644                                 continue;
2645                         }
2646
2647                         com = __lfsck_component_find(lfsck, type,
2648                                                      &lfsck->li_list_scan);
2649                         if (com == NULL)
2650                                 com = __lfsck_component_find(lfsck, type,
2651                                                 &lfsck->li_list_double_scan);
2652                         if (com == NULL) {
2653                                 rc = -EOPNOTSUPP;
2654                                 break;
2655                         }
2656
2657                         if (com->lc_ops->lfsck_join != NULL) {
2658                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2659                                 if (rc != 0 && rc != -EALREADY)
2660                                         break;
2661                         }
2662                         start->ls_active &= ~type;
2663                         type <<= 1;
2664                 }
2665                 spin_unlock(&lfsck->li_lock);
2666                 GOTO(out, rc);
2667         }
2668         spin_unlock(&lfsck->li_lock);
2669
2670         lfsck->li_status = 0;
2671         lfsck->li_oit_over = 0;
2672         lfsck->li_start_unplug = 0;
2673         lfsck->li_drop_dryrun = 0;
2674         lfsck->li_new_scanned = 0;
2675
2676         /* For auto trigger. */
2677         if (start == NULL)
2678                 goto trigger;
2679
2680         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2681                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2682                        lfsck_lfsck2name(lfsck));
2683
2684                 GOTO(out, rc = -EPERM);
2685         }
2686
2687         start->ls_version = bk->lb_version;
2688
2689         if (start->ls_active != 0) {
2690                 struct lfsck_component *next;
2691
2692                 if (start->ls_active == LFSCK_TYPES_ALL)
2693                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2694
2695                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2696                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2697                         GOTO(out, rc = -ENOTSUPP);
2698                 }
2699
2700                 list_for_each_entry_safe(com, next,
2701                                          &lfsck->li_list_scan, lc_link) {
2702                         if (!(com->lc_type & start->ls_active)) {
2703                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2704                                                              false);
2705                                 if (rc != 0)
2706                                         GOTO(out, rc);
2707                         }
2708                 }
2709
2710                 while (start->ls_active != 0) {
2711                         if (type & start->ls_active) {
2712                                 com = __lfsck_component_find(lfsck, type,
2713                                                         &lfsck->li_list_idle);
2714                                 if (com != NULL)
2715                                         /* The component status will be updated
2716                                          * when its prep() is called later by
2717                                          * the LFSCK main engine. */
2718                                         list_move_tail(&com->lc_link,
2719                                                        &lfsck->li_list_scan);
2720                                 start->ls_active &= ~type;
2721                         }
2722                         type <<= 1;
2723                 }
2724         }
2725
2726         if (list_empty(&lfsck->li_list_scan)) {
2727                 /* The speed limit will be used to control both the LFSCK and
2728                  * low layer scrub (if applied), need to be handled firstly. */
2729                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2730                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2731                                 rc = lfsck_bookmark_store(env, lfsck);
2732                                 if (rc != 0)
2733                                         GOTO(out, rc);
2734                         }
2735                 }
2736
2737                 goto trigger;
2738         }
2739
2740         if (start->ls_flags & LPF_RESET)
2741                 flags |= DOIF_RESET;
2742
2743         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2744         if (rc != 0)
2745                 GOTO(out, rc);
2746
2747         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2748                 start->ls_active |= com->lc_type;
2749                 if (flags & DOIF_RESET) {
2750                         rc = com->lc_ops->lfsck_reset(env, com, false);
2751                         if (rc != 0)
2752                                 GOTO(out, rc);
2753                 }
2754         }
2755
2756 trigger:
2757         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2758         if (bk->lb_param & LPF_DRYRUN)
2759                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2760
2761         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2762                 valid |= DOIV_ERROR_HANDLE;
2763                 if (start->ls_flags & LPF_FAILOUT)
2764                         flags |= DOIF_FAILOUT;
2765         }
2766
2767         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2768                 valid |= DOIV_DRYRUN;
2769                 if (start->ls_flags & LPF_DRYRUN)
2770                         flags |= DOIF_DRYRUN;
2771         }
2772
2773         if (!list_empty(&lfsck->li_list_scan))
2774                 flags |= DOIF_OUTUSED;
2775
2776         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2777         thread_set_flags(thread, 0);
2778         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2779         if (IS_ERR(lta))
2780                 GOTO(out, rc = PTR_ERR(lta));
2781
2782         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2783         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2784         if (IS_ERR(task)) {
2785                 rc = PTR_ERR(task);
2786                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2787                        lfsck_lfsck2name(lfsck), rc);
2788                 lfsck_thread_args_fini(lta);
2789
2790                 GOTO(out, rc);
2791         }
2792
2793         l_wait_event(thread->t_ctl_waitq,
2794                      thread_is_running(thread) ||
2795                      thread_is_stopped(thread),
2796                      &lwi);
2797         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2798                 lfsck->li_start_unplug = 1;
2799                 wake_up_all(&thread->t_ctl_waitq);
2800
2801                 GOTO(out, rc = 0);
2802         }
2803
2804         /* release lfsck::li_mutex to avoid deadlock. */
2805         mutex_unlock(&lfsck->li_mutex);
2806         rc = lfsck_start_all(env, lfsck, start);
2807         if (rc != 0) {
2808                 spin_lock(&lfsck->li_lock);
2809                 if (thread_is_stopped(thread)) {
2810                         spin_unlock(&lfsck->li_lock);
2811                 } else {
2812                         lfsck->li_status = LS_FAILED;
2813                         lfsck->li_flags = 0;
2814                         thread_set_flags(thread, SVC_STOPPING);
2815                         spin_unlock(&lfsck->li_lock);
2816
2817                         lfsck->li_start_unplug = 1;
2818                         wake_up_all(&thread->t_ctl_waitq);
2819                         l_wait_event(thread->t_ctl_waitq,
2820                                      thread_is_stopped(thread),
2821                                      &lwi);
2822                 }
2823         } else {
2824                 lfsck->li_start_unplug = 1;
2825                 wake_up_all(&thread->t_ctl_waitq);
2826         }
2827
2828         GOTO(put, rc);
2829
2830 out:
2831         mutex_unlock(&lfsck->li_mutex);
2832
2833 put:
2834         lfsck_instance_put(env, lfsck);
2835
2836         return rc < 0 ? rc : 0;
2837 }
2838 EXPORT_SYMBOL(lfsck_start);
2839
2840 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2841                struct lfsck_stop *stop)
2842 {
2843         struct lfsck_instance   *lfsck;
2844         struct ptlrpc_thread    *thread;
2845         struct l_wait_info       lwi    = { 0 };
2846         int                      rc     = 0;
2847         int                      rc1    = 0;
2848         ENTRY;
2849
2850         lfsck = lfsck_instance_find(key, true, false);
2851         if (unlikely(lfsck == NULL))
2852                 RETURN(-ENXIO);
2853
2854         thread = &lfsck->li_thread;
2855         /* release lfsck::li_mutex to avoid deadlock. */
2856         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2857                 if (!lfsck->li_master) {
2858                         CERROR("%s: only allow to specify '-A' via MDS\n",
2859                                lfsck_lfsck2name(lfsck));
2860
2861                         GOTO(out, rc = -EPERM);
2862                 }
2863
2864                 rc1 = lfsck_stop_all(env, lfsck, stop);
2865         }
2866
2867         mutex_lock(&lfsck->li_mutex);
2868         spin_lock(&lfsck->li_lock);
2869         /* no error if LFSCK is already stopped, or was never started */
2870         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2871                 spin_unlock(&lfsck->li_lock);
2872                 GOTO(out, rc = 0);
2873         }
2874
2875         if (stop != NULL) {
2876                 lfsck->li_status = stop->ls_status;
2877                 lfsck->li_flags = stop->ls_flags;
2878         } else {
2879                 lfsck->li_status = LS_STOPPED;
2880                 lfsck->li_flags = 0;
2881         }
2882
2883         thread_set_flags(thread, SVC_STOPPING);
2884         spin_unlock(&lfsck->li_lock);
2885
2886         wake_up_all(&thread->t_ctl_waitq);
2887         l_wait_event(thread->t_ctl_waitq,
2888                      thread_is_stopped(thread),
2889                      &lwi);
2890
2891         GOTO(out, rc = 0);
2892
2893 out:
2894         mutex_unlock(&lfsck->li_mutex);
2895         lfsck_instance_put(env, lfsck);
2896
2897         return rc != 0 ? rc : rc1;
2898 }
2899 EXPORT_SYMBOL(lfsck_stop);
2900
2901 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2902                     struct lfsck_request *lr, struct thandle *th)
2903 {
2904         int rc = -EOPNOTSUPP;
2905         ENTRY;
2906
2907         switch (lr->lr_event) {
2908         case LE_START: {
2909                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2910                 struct lfsck_start_param  lsp;
2911
2912                 memset(start, 0, sizeof(*start));
2913                 start->ls_valid = lr->lr_valid;
2914                 start->ls_speed_limit = lr->lr_speed;
2915                 start->ls_version = lr->lr_version;
2916                 start->ls_active = lr->lr_active;
2917                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2918                 start->ls_async_windows = lr->lr_async_windows;
2919
2920                 lsp.lsp_start = start;
2921                 lsp.lsp_index = lr->lr_index;
2922                 lsp.lsp_index_valid = 1;
2923                 rc = lfsck_start(env, key, &lsp);
2924                 break;
2925         }
2926         case LE_STOP: {
2927                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2928
2929                 memset(stop, 0, sizeof(*stop));
2930                 stop->ls_status = lr->lr_status;
2931                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2932                 rc = lfsck_stop(env, key, stop);
2933                 break;
2934         }
2935         case LE_PHASE1_DONE:
2936         case LE_PHASE2_DONE:
2937         case LE_FID_ACCESSED:
2938         case LE_PEER_EXIT:
2939         case LE_CONDITIONAL_DESTROY:
2940         case LE_CREATE_ORPHAN:
2941         case LE_SKIP_NLINK_DECLARE:
2942         case LE_SKIP_NLINK:
2943         case LE_SET_LMV_MASTER:
2944         case LE_PAIRS_VERIFY: {
2945                 struct lfsck_instance  *lfsck;
2946                 struct lfsck_component *com;
2947
2948                 lfsck = lfsck_instance_find(key, true, false);
2949                 if (unlikely(lfsck == NULL))
2950                         RETURN(-ENXIO);
2951
2952                 com = lfsck_component_find(lfsck, lr->lr_active);
2953                 if (likely(com != NULL)) {
2954                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
2955                         lfsck_component_put(env, com);
2956                 }
2957
2958                 lfsck_instance_put(env, lfsck);
2959                 break;
2960         }
2961         default:
2962                 break;
2963         }
2964
2965         RETURN(rc);
2966 }
2967 EXPORT_SYMBOL(lfsck_in_notify);
2968
2969 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2970                 struct lfsck_request *lr)
2971 {
2972         struct lfsck_instance  *lfsck;
2973         struct lfsck_component *com;
2974         int                     rc;
2975         ENTRY;
2976
2977         lfsck = lfsck_instance_find(key, true, false);
2978         if (unlikely(lfsck == NULL))
2979                 RETURN(-ENXIO);
2980
2981         com = lfsck_component_find(lfsck, lr->lr_active);
2982         if (likely(com != NULL)) {
2983                 rc = com->lc_ops->lfsck_query(env, com);
2984                 lfsck_component_put(env, com);
2985         } else {
2986                 rc = -ENOTSUPP;
2987         }
2988
2989         lfsck_instance_put(env, lfsck);
2990
2991         RETURN(rc);
2992 }
2993 EXPORT_SYMBOL(lfsck_query);
2994
2995 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2996                              struct ldlm_namespace *ns)
2997 {
2998         struct lfsck_instance  *lfsck;
2999         int                     rc      = -ENXIO;
3000
3001         lfsck = lfsck_instance_find(key, true, false);
3002         if (likely(lfsck != NULL)) {
3003                 lfsck->li_namespace = ns;
3004                 lfsck_instance_put(env, lfsck);
3005                 rc = 0;
3006         }
3007
3008         return rc;
3009 }
3010 EXPORT_SYMBOL(lfsck_register_namespace);
3011
3012 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3013                    struct dt_device *next, struct obd_device *obd,
3014                    lfsck_out_notify notify, void *notify_data, bool master)
3015 {
3016         struct lfsck_instance   *lfsck;
3017         struct dt_object        *root  = NULL;
3018         struct dt_object        *obj   = NULL;
3019         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3020         int                      rc;
3021         ENTRY;
3022
3023         lfsck = lfsck_instance_find(key, false, false);
3024         if (unlikely(lfsck != NULL))
3025                 RETURN(-EEXIST);
3026
3027         OBD_ALLOC_PTR(lfsck);
3028         if (lfsck == NULL)
3029                 RETURN(-ENOMEM);
3030
3031         mutex_init(&lfsck->li_mutex);
3032         spin_lock_init(&lfsck->li_lock);
3033         INIT_LIST_HEAD(&lfsck->li_link);
3034         INIT_LIST_HEAD(&lfsck->li_list_scan);
3035         INIT_LIST_HEAD(&lfsck->li_list_dir);
3036         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3037         INIT_LIST_HEAD(&lfsck->li_list_idle);
3038         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3039         atomic_set(&lfsck->li_ref, 1);
3040         atomic_set(&lfsck->li_double_scan_count, 0);
3041         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3042         lfsck->li_out_notify = notify;
3043         lfsck->li_out_notify_data = notify_data;
3044         lfsck->li_next = next;
3045         lfsck->li_bottom = key;
3046         lfsck->li_obd = obd;
3047
3048         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3049         if (rc != 0)
3050                 GOTO(out, rc);
3051
3052         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3053         if (rc != 0)
3054                 GOTO(out, rc);
3055
3056         fid->f_seq = FID_SEQ_LOCAL_NAME;
3057         fid->f_oid = 1;
3058         fid->f_ver = 0;
3059         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3060         if (rc != 0)
3061                 GOTO(out, rc);
3062
3063         rc = dt_root_get(env, key, fid);
3064         if (rc != 0)
3065                 GOTO(out, rc);
3066
3067         root = dt_locate(env, key, fid);
3068         if (IS_ERR(root))
3069                 GOTO(out, rc = PTR_ERR(root));
3070
3071         if (unlikely(!dt_try_as_dir(env, root)))
3072                 GOTO(out, rc = -ENOTDIR);
3073
3074         lfsck->li_local_root_fid = *fid;
3075         if (master) {
3076                 lfsck->li_master = 1;
3077                 if (lfsck_dev_idx(key) == 0) {
3078                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3079                         const struct lu_name *cname;
3080
3081                         rc = dt_lookup(env, root,
3082                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3083                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3084                         if (rc != 0)
3085                                 GOTO(out, rc);
3086
3087                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3088                         if (IS_ERR(obj))
3089                                 GOTO(out, rc = PTR_ERR(obj));
3090
3091                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3092                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3093                         if (rc != 0)
3094                                 GOTO(out, rc);
3095
3096                         lu_object_put(env, &obj->do_lu);
3097                         obj = dt_locate(env, key, fid);
3098                         if (IS_ERR(obj))
3099                                 GOTO(out, rc = PTR_ERR(obj));
3100
3101                         cname = lfsck_name_get_const(env, dotlustre,
3102                                                      strlen(dotlustre));
3103                         rc = lfsck_verify_linkea(env, key, obj, cname,
3104                                                  &lfsck->li_global_root_fid);
3105                         if (rc != 0)
3106                                 GOTO(out, rc);
3107
3108                         *pfid = *fid;
3109                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3110                                        (const struct dt_key *)lostfound,
3111                                        BYPASS_CAPA);
3112                         if (rc != 0)
3113                                 GOTO(out, rc);
3114
3115                         lu_object_put(env, &obj->do_lu);
3116                         obj = dt_locate(env, key, fid);
3117                         if (IS_ERR(obj))
3118                                 GOTO(out, rc = PTR_ERR(obj));
3119
3120                         cname = lfsck_name_get_const(env, lostfound,
3121                                                      strlen(lostfound));
3122                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3123                         if (rc != 0)
3124                                 GOTO(out, rc);
3125
3126                         lu_object_put(env, &obj->do_lu);
3127                         obj = NULL;
3128                 }
3129         }
3130
3131         fid->f_seq = FID_SEQ_LOCAL_FILE;
3132         fid->f_oid = OTABLE_IT_OID;
3133         fid->f_ver = 0;
3134         obj = dt_locate(env, key, fid);
3135         if (IS_ERR(obj))
3136                 GOTO(out, rc = PTR_ERR(obj));
3137
3138         lu_object_get(&obj->do_lu);
3139         lfsck->li_obj_oit = obj;
3140         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3141         if (rc != 0)
3142                 GOTO(out, rc);
3143
3144         rc = lfsck_bookmark_setup(env, lfsck);
3145         if (rc != 0)
3146                 GOTO(out, rc);
3147
3148         if (master) {
3149                 rc = lfsck_fid_init(lfsck);
3150                 if (rc < 0)
3151                         GOTO(out, rc);
3152
3153                 rc = lfsck_namespace_setup(env, lfsck);
3154                 if (rc < 0)
3155                         GOTO(out, rc);
3156         }
3157
3158         rc = lfsck_layout_setup(env, lfsck);
3159         if (rc < 0)
3160                 GOTO(out, rc);
3161
3162         /* XXX: more LFSCK components initialization to be added here. */
3163
3164         rc = lfsck_instance_add(lfsck);
3165         if (rc == 0)
3166                 rc = lfsck_add_target_from_orphan(env, lfsck);
3167 out:
3168         if (obj != NULL && !IS_ERR(obj))
3169                 lu_object_put(env, &obj->do_lu);
3170         if (root != NULL && !IS_ERR(root))
3171                 lu_object_put(env, &root->do_lu);
3172         if (rc != 0)
3173                 lfsck_instance_cleanup(env, lfsck);
3174         return rc;
3175 }
3176 EXPORT_SYMBOL(lfsck_register);
3177
3178 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3179 {
3180         struct lfsck_instance *lfsck;
3181
3182         lfsck = lfsck_instance_find(key, false, true);
3183         if (lfsck != NULL)
3184                 lfsck_instance_put(env, lfsck);
3185 }
3186 EXPORT_SYMBOL(lfsck_degister);
3187
3188 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3189                      struct dt_device *tgt, struct obd_export *exp,
3190                      __u32 index, bool for_ost)
3191 {
3192         struct lfsck_instance   *lfsck;
3193         struct lfsck_tgt_desc   *ltd;
3194         int                      rc;
3195         ENTRY;
3196
3197         OBD_ALLOC_PTR(ltd);
3198         if (ltd == NULL)
3199                 RETURN(-ENOMEM);
3200
3201         ltd->ltd_tgt = tgt;
3202         ltd->ltd_key = key;
3203         ltd->ltd_exp = exp;
3204         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3205         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3206         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3207         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3208         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3209         atomic_set(&ltd->ltd_ref, 1);
3210         ltd->ltd_index = index;
3211
3212         spin_lock(&lfsck_instance_lock);
3213         lfsck = __lfsck_instance_find(key, true, false);
3214         if (lfsck == NULL) {
3215                 if (for_ost)
3216                         list_add_tail(&ltd->ltd_orphan_list,
3217                                       &lfsck_ost_orphan_list);
3218                 else
3219                         list_add_tail(&ltd->ltd_orphan_list,
3220                                       &lfsck_mdt_orphan_list);
3221                 spin_unlock(&lfsck_instance_lock);
3222
3223                 RETURN(0);
3224         }
3225         spin_unlock(&lfsck_instance_lock);
3226
3227         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3228         if (rc != 0)
3229                 lfsck_tgt_put(ltd);
3230
3231         lfsck_instance_put(env, lfsck);
3232
3233         RETURN(rc);
3234 }
3235 EXPORT_SYMBOL(lfsck_add_target);
3236
3237 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3238                       struct dt_device *tgt, __u32 index, bool for_ost)
3239 {
3240         struct lfsck_instance   *lfsck;
3241         struct lfsck_tgt_descs  *ltds;
3242         struct lfsck_tgt_desc   *ltd;
3243         struct list_head        *head;
3244
3245         if (for_ost)
3246                 head = &lfsck_ost_orphan_list;
3247         else
3248                 head = &lfsck_mdt_orphan_list;
3249
3250         spin_lock(&lfsck_instance_lock);
3251         list_for_each_entry(ltd, head, ltd_orphan_list) {
3252                 if (ltd->ltd_tgt == tgt) {
3253                         list_del_init(&ltd->ltd_orphan_list);
3254                         spin_unlock(&lfsck_instance_lock);
3255                         lfsck_tgt_put(ltd);
3256
3257                         return;
3258                 }
3259         }
3260
3261         ltd = NULL;
3262         lfsck = __lfsck_instance_find(key, true, false);
3263         spin_unlock(&lfsck_instance_lock);
3264         if (unlikely(lfsck == NULL))
3265                 return;
3266
3267         if (for_ost)
3268                 ltds = &lfsck->li_ost_descs;
3269         else
3270                 ltds = &lfsck->li_mdt_descs;
3271
3272         down_write(&ltds->ltd_rw_sem);
3273         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3274
3275         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3276                 goto unlock;
3277
3278         ltd = LTD_TGT(ltds, index);
3279         if (unlikely(ltd == NULL))
3280                 goto unlock;
3281
3282         LASSERT(ltds->ltd_tgtnr > 0);
3283
3284         ltds->ltd_tgtnr--;
3285         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3286         LTD_TGT(ltds, index) = NULL;
3287
3288 unlock:
3289         if (ltd == NULL) {
3290                 if (for_ost)
3291                         head = &lfsck->li_ost_descs.ltd_orphan;
3292                 else
3293                         head = &lfsck->li_mdt_descs.ltd_orphan;
3294
3295                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3296                         if (ltd->ltd_tgt == tgt) {
3297                                 list_del_init(&ltd->ltd_orphan_list);
3298                                 break;
3299                         }
3300                 }
3301         }
3302
3303         up_write(&ltds->ltd_rw_sem);
3304         if (ltd != NULL) {
3305                 spin_lock(&ltds->ltd_lock);
3306                 ltd->ltd_dead = 1;
3307                 spin_unlock(&ltds->ltd_lock);
3308                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3309                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3310                 lfsck_tgt_put(ltd);
3311         }
3312
3313         lfsck_instance_put(env, lfsck);
3314 }
3315 EXPORT_SYMBOL(lfsck_del_target);
3316
3317 static int __init lfsck_init(void)
3318 {
3319         int rc;
3320
3321         INIT_LIST_HEAD(&lfsck_instance_list);
3322         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3323         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3324         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3325         rc = lu_context_key_register(&lfsck_thread_key);
3326         if (rc == 0) {
3327                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3328                 tgt_register_lfsck_query(lfsck_query);
3329         }
3330
3331         return rc;
3332 }
3333
3334 static void __exit lfsck_exit(void)
3335 {
3336         struct lfsck_tgt_desc *ltd;
3337         struct lfsck_tgt_desc *next;
3338
3339         LASSERT(list_empty(&lfsck_instance_list));
3340
3341         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3342                                  ltd_orphan_list) {
3343                 list_del_init(&ltd->ltd_orphan_list);
3344                 lfsck_tgt_put(ltd);
3345         }
3346
3347         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3348                                  ltd_orphan_list) {
3349                 list_del_init(&ltd->ltd_orphan_list);
3350                 lfsck_tgt_put(ltd);
3351         }
3352
3353         lu_context_key_degister(&lfsck_thread_key);
3354 }
3355
3356 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3357 MODULE_DESCRIPTION("LFSCK");
3358 MODULE_LICENSE("GPL");
3359
3360 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);