Whamcloud - gitweb
ce8452f646f890d0817ac411b1169c523335d458
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         NULL
102 };
103
104 enum lfsck_verify_lpf_types {
105         LVLT_BY_BOOKMARK        = 0,
106         LVLT_BY_NAMEENTRY       = 1,
107 };
108
109 const char *lfsck_status2names(enum lfsck_status status)
110 {
111         if (unlikely(status < 0 || status >= LS_MAX))
112                 return "unknown";
113
114         return lfsck_status_names[status];
115 }
116
117 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
118 {
119         spin_lock_init(&ltds->ltd_lock);
120         init_rwsem(&ltds->ltd_rw_sem);
121         INIT_LIST_HEAD(&ltds->ltd_orphan);
122         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
123         if (ltds->ltd_tgts_bitmap == NULL)
124                 return -ENOMEM;
125
126         return 0;
127 }
128
129 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
130 {
131         struct lfsck_tgt_desc   *ltd;
132         struct lfsck_tgt_desc   *next;
133         int                      idx;
134
135         down_write(&ltds->ltd_rw_sem);
136
137         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
138                                  ltd_orphan_list) {
139                 list_del_init(&ltd->ltd_orphan_list);
140                 lfsck_tgt_put(ltd);
141         }
142
143         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
144                 up_write(&ltds->ltd_rw_sem);
145
146                 return;
147         }
148
149         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
150                 ltd = LTD_TGT(ltds, idx);
151                 if (likely(ltd != NULL)) {
152                         LASSERT(list_empty(&ltd->ltd_layout_list));
153                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
154                         LASSERT(list_empty(&ltd->ltd_namespace_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
156
157                         ltds->ltd_tgtnr--;
158                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
159                         LTD_TGT(ltds, idx) = NULL;
160                         lfsck_tgt_put(ltd);
161                 }
162         }
163
164         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
165                  ltds->ltd_tgtnr);
166
167         for (idx = 0; idx < TGT_PTRS; idx++) {
168                 if (ltds->ltd_tgts_idx[idx] != NULL) {
169                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
170                         ltds->ltd_tgts_idx[idx] = NULL;
171                 }
172         }
173
174         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
175         ltds->ltd_tgts_bitmap = NULL;
176         up_write(&ltds->ltd_rw_sem);
177 }
178
179 static int __lfsck_add_target(const struct lu_env *env,
180                               struct lfsck_instance *lfsck,
181                               struct lfsck_tgt_desc *ltd,
182                               bool for_ost, bool locked)
183 {
184         struct lfsck_tgt_descs *ltds;
185         __u32                   index = ltd->ltd_index;
186         int                     rc    = 0;
187         ENTRY;
188
189         if (for_ost)
190                 ltds = &lfsck->li_ost_descs;
191         else
192                 ltds = &lfsck->li_mdt_descs;
193
194         if (!locked)
195                 down_write(&ltds->ltd_rw_sem);
196
197         LASSERT(ltds->ltd_tgts_bitmap != NULL);
198
199         if (index >= ltds->ltd_tgts_bitmap->size) {
200                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
201                                     (__u32)BITS_PER_LONG);
202                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
203                 cfs_bitmap_t *new_bitmap;
204
205                 while (newsize < index + 1)
206                         newsize <<= 1;
207
208                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
209                 if (new_bitmap == NULL)
210                         GOTO(unlock, rc = -ENOMEM);
211
212                 if (ltds->ltd_tgtnr > 0)
213                         cfs_bitmap_copy(new_bitmap, old_bitmap);
214                 ltds->ltd_tgts_bitmap = new_bitmap;
215                 CFS_FREE_BITMAP(old_bitmap);
216         }
217
218         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
219                 CERROR("%s: the device %s (%u) is registered already\n",
220                        lfsck_lfsck2name(lfsck),
221                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
222                 GOTO(unlock, rc = -EEXIST);
223         }
224
225         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
226                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
227                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
228                         GOTO(unlock, rc = -ENOMEM);
229         }
230
231         LTD_TGT(ltds, index) = ltd;
232         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
233         ltds->ltd_tgtnr++;
234
235         GOTO(unlock, rc = 0);
236
237 unlock:
238         if (!locked)
239                 up_write(&ltds->ltd_rw_sem);
240
241         return rc;
242 }
243
244 static int lfsck_add_target_from_orphan(const struct lu_env *env,
245                                         struct lfsck_instance *lfsck)
246 {
247         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
248         struct lfsck_tgt_desc   *ltd;
249         struct lfsck_tgt_desc   *next;
250         struct list_head        *head    = &lfsck_ost_orphan_list;
251         int                      rc;
252         bool                     for_ost = true;
253
254 again:
255         spin_lock(&lfsck_instance_lock);
256         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
257                 if (ltd->ltd_key == lfsck->li_bottom)
258                         list_move_tail(&ltd->ltd_orphan_list,
259                                        &ltds->ltd_orphan);
260         }
261         spin_unlock(&lfsck_instance_lock);
262
263         down_write(&ltds->ltd_rw_sem);
264         while (!list_empty(&ltds->ltd_orphan)) {
265                 ltd = list_entry(ltds->ltd_orphan.next,
266                                  struct lfsck_tgt_desc,
267                                  ltd_orphan_list);
268                 list_del_init(&ltd->ltd_orphan_list);
269                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
270                 /* Do not hold the semaphore for too long time. */
271                 up_write(&ltds->ltd_rw_sem);
272                 if (rc != 0)
273                         return rc;
274
275                 down_write(&ltds->ltd_rw_sem);
276         }
277         up_write(&ltds->ltd_rw_sem);
278
279         if (for_ost) {
280                 ltds = &lfsck->li_mdt_descs;
281                 head = &lfsck_mdt_orphan_list;
282                 for_ost = false;
283                 goto again;
284         }
285
286         return 0;
287 }
288
289 static inline struct lfsck_component *
290 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
291                        struct list_head *list)
292 {
293         struct lfsck_component *com;
294
295         list_for_each_entry(com, list, lc_link) {
296                 if (com->lc_type == type)
297                         return com;
298         }
299         return NULL;
300 }
301
302 struct lfsck_component *
303 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
304 {
305         struct lfsck_component *com;
306
307         spin_lock(&lfsck->li_lock);
308         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
309         if (com != NULL)
310                 goto unlock;
311
312         com = __lfsck_component_find(lfsck, type,
313                                      &lfsck->li_list_double_scan);
314         if (com != NULL)
315                 goto unlock;
316
317         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
318
319 unlock:
320         if (com != NULL)
321                 lfsck_component_get(com);
322         spin_unlock(&lfsck->li_lock);
323         return com;
324 }
325
326 void lfsck_component_cleanup(const struct lu_env *env,
327                              struct lfsck_component *com)
328 {
329         if (!list_empty(&com->lc_link))
330                 list_del_init(&com->lc_link);
331         if (!list_empty(&com->lc_link_dir))
332                 list_del_init(&com->lc_link_dir);
333
334         lfsck_component_put(env, com);
335 }
336
337 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
338                     struct lu_fid *fid, bool locked)
339 {
340         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
341         int                      rc = 0;
342         ENTRY;
343
344         if (!locked)
345                 mutex_lock(&lfsck->li_mutex);
346
347         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
348         if (rc >= 0) {
349                 bk->lb_last_fid = *fid;
350                 /* We do not care about whether the subsequent sub-operations
351                  * failed or not. The worst case is that one FID is lost that
352                  * is not a big issue for the LFSCK since it is relative rare
353                  * for LFSCK create. */
354                 rc = lfsck_bookmark_store(env, lfsck);
355         }
356
357         if (!locked)
358                 mutex_unlock(&lfsck->li_mutex);
359
360         RETURN(rc);
361 }
362
363 /**
364  * Request the specified ibits lock for the given object.
365  *
366  * Before the LFSCK modifying on the namespace visible object,
367  * it needs to acquire related ibits ldlm lock.
368  *
369  * \param[in] env       pointer to the thread context
370  * \param[in] lfsck     pointer to the lfsck instance
371  * \param[in] obj       pointer to the dt_object to be locked
372  * \param[out] lh       pointer to the lock handle
373  * \param[in] ibits     the bits for the ldlm lock to be acquired
374  * \param[in] mode      the mode for the ldlm lock to be acquired
375  *
376  * \retval              0 for success
377  * \retval              negative error number on failure
378  */
379 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
380                      struct dt_object *obj, struct lustre_handle *lh,
381                      __u64 bits, ldlm_mode_t mode)
382 {
383         struct lfsck_thread_info        *info   = lfsck_env_info(env);
384         ldlm_policy_data_t              *policy = &info->lti_policy;
385         struct ldlm_res_id              *resid  = &info->lti_resid;
386         __u64                            flags  = LDLM_FL_ATOMIC_CB;
387         int                              rc;
388
389         LASSERT(lfsck->li_namespace != NULL);
390
391         memset(policy, 0, sizeof(*policy));
392         policy->l_inodebits.bits = bits;
393         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
394         if (dt_object_remote(obj)) {
395                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
396
397                 memset(einfo, 0, sizeof(*einfo));
398                 einfo->ei_type = LDLM_IBITS;
399                 einfo->ei_mode = mode;
400                 einfo->ei_cb_bl = ldlm_blocking_ast;
401                 einfo->ei_cb_cp = ldlm_completion_ast;
402                 einfo->ei_res_id = resid;
403
404                 rc = dt_object_lock(env, obj, lh, einfo, policy);
405         } else {
406                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
407                                             LDLM_IBITS, policy, mode,
408                                             &flags, ldlm_blocking_ast,
409                                             ldlm_completion_ast, NULL, NULL,
410                                             0, LVB_T_NONE, NULL, lh);
411         }
412
413         if (rc == ELDLM_OK) {
414                 rc = 0;
415         } else {
416                 memset(lh, 0, sizeof(*lh));
417                 rc = -EIO;
418         }
419
420         return rc;
421 }
422
423 /**
424  * Release the the specified ibits lock.
425  *
426  * If the lock has been acquired before, release it
427  * and cleanup the handle. Otherwise, do nothing.
428  *
429  * \param[in] lh        pointer to the lock handle
430  * \param[in] mode      the mode for the ldlm lock to be released
431  */
432 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
433 {
434         if (lustre_handle_is_used(lh)) {
435                 ldlm_lock_decref(lh, mode);
436                 memset(lh, 0, sizeof(*lh));
437         }
438 }
439
440 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
441                               struct lfsck_instance *lfsck,
442                               const struct lu_fid *fid)
443 {
444         struct seq_server_site  *ss     =
445                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
446         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
447         int                      rc;
448
449         fld_range_set_mdt(range);
450         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
451         if (rc == 0)
452                 rc = range->lsr_index;
453
454         return rc;
455 }
456
457 const char dot[] = ".";
458 const char dotdot[] = "..";
459 static const char dotlustre[] = ".lustre";
460 static const char lostfound[] = "lost+found";
461
462 static int lfsck_create_lpf_local(const struct lu_env *env,
463                                   struct lfsck_instance *lfsck,
464                                   struct dt_object *parent,
465                                   struct dt_object *child,
466                                   struct lu_attr *la,
467                                   struct dt_object_format *dof,
468                                   const char *name)
469 {
470         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
471         struct dt_device        *dev    = lfsck->li_bottom;
472         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
473         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
474         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
475         struct thandle          *th     = NULL;
476         struct linkea_data       ldata  = { 0 };
477         struct lu_buf            linkea_buf;
478         const struct lu_name    *cname;
479         loff_t                   pos    = 0;
480         int                      len    = sizeof(struct lfsck_bookmark);
481         int                      rc;
482         ENTRY;
483
484         rc = linkea_data_new(&ldata,
485                              &lfsck_env_info(env)->lti_linkea_buf2);
486         if (rc != 0)
487                 RETURN(rc);
488
489         cname = lfsck_name_get_const(env, name, strlen(name));
490         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
491         if (rc != 0)
492                 RETURN(rc);
493
494         th = dt_trans_create(env, dev);
495         if (IS_ERR(th))
496                 RETURN(PTR_ERR(th));
497
498         /* 1a. create child */
499         rc = dt_declare_create(env, child, la, NULL, dof, th);
500         if (rc != 0)
501                 GOTO(stop, rc);
502
503         /* 2a. increase child nlink */
504         rc = dt_declare_ref_add(env, child, th);
505         if (rc != 0)
506                 GOTO(stop, rc);
507
508         /* 3a. insert linkEA for child */
509         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
510                        ldata.ld_leh->leh_len);
511         rc = dt_declare_xattr_set(env, child, &linkea_buf,
512                                   XATTR_NAME_LINK, 0, th);
513         if (rc != 0)
514                 GOTO(stop, rc);
515
516         /* 4a. insert name into parent dir */
517         rec->rec_type = S_IFDIR;
518         rec->rec_fid = cfid;
519         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
520                                (const struct dt_key *)name, th);
521         if (rc != 0)
522                 GOTO(stop, rc);
523
524         /* 5a. increase parent nlink */
525         rc = dt_declare_ref_add(env, parent, th);
526         if (rc != 0)
527                 GOTO(stop, rc);
528
529         /* 6a. update bookmark */
530         rc = dt_declare_record_write(env, bk_obj,
531                                      lfsck_buf_get(env, bk, len), 0, th);
532         if (rc != 0)
533                 GOTO(stop, rc);
534
535         rc = dt_trans_start_local(env, dev, th);
536         if (rc != 0)
537                 GOTO(stop, rc);
538
539         dt_write_lock(env, child, 0);
540         /* 1b.1. create child */
541         rc = dt_create(env, child, la, NULL, dof, th);
542         if (rc != 0)
543                 GOTO(unlock, rc);
544
545         if (unlikely(!dt_try_as_dir(env, child)))
546                 GOTO(unlock, rc = -ENOTDIR);
547
548         /* 1b.2. insert dot into child dir */
549         rec->rec_fid = cfid;
550         rc = dt_insert(env, child, (const struct dt_rec *)rec,
551                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
552         if (rc != 0)
553                 GOTO(unlock, rc);
554
555         /* 1b.3. insert dotdot into child dir */
556         rec->rec_fid = &LU_LPF_FID;
557         rc = dt_insert(env, child, (const struct dt_rec *)rec,
558                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
559         if (rc != 0)
560                 GOTO(unlock, rc);
561
562         /* 2b. increase child nlink */
563         rc = dt_ref_add(env, child, th);
564         if (rc != 0)
565                 GOTO(unlock, rc);
566
567         /* 3b. insert linkEA for child. */
568         rc = dt_xattr_set(env, child, &linkea_buf,
569                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
570         dt_write_unlock(env, child);
571         if (rc != 0)
572                 GOTO(stop, rc);
573
574         /* 4b. insert name into parent dir */
575         rec->rec_fid = cfid;
576         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
577                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
578         if (rc != 0)
579                 GOTO(stop, rc);
580
581         dt_write_lock(env, parent, 0);
582         /* 5b. increase parent nlink */
583         rc = dt_ref_add(env, parent, th);
584         dt_write_unlock(env, parent);
585         if (rc != 0)
586                 GOTO(stop, rc);
587
588         bk->lb_lpf_fid = *cfid;
589         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
590
591         /* 6b. update bookmark */
592         rc = dt_record_write(env, bk_obj,
593                              lfsck_buf_get(env, bk, len), &pos, th);
594
595         GOTO(stop, rc);
596
597 unlock:
598         dt_write_unlock(env, child);
599
600 stop:
601         dt_trans_stop(env, dev, th);
602
603         return rc;
604 }
605
606 static int lfsck_create_lpf_remote(const struct lu_env *env,
607                                    struct lfsck_instance *lfsck,
608                                    struct dt_object *parent,
609                                    struct dt_object *child,
610                                    struct lu_attr *la,
611                                    struct dt_object_format *dof,
612                                    const char *name)
613 {
614         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
615         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
616         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
617         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
618         struct thandle          *th     = NULL;
619         struct linkea_data       ldata  = { 0 };
620         struct lu_buf            linkea_buf;
621         const struct lu_name    *cname;
622         struct dt_device        *dev;
623         loff_t                   pos    = 0;
624         int                      len    = sizeof(struct lfsck_bookmark);
625         int                      rc;
626         ENTRY;
627
628         rc = linkea_data_new(&ldata,
629                              &lfsck_env_info(env)->lti_linkea_buf2);
630         if (rc != 0)
631                 RETURN(rc);
632
633         cname = lfsck_name_get_const(env, name, strlen(name));
634         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
635         if (rc != 0)
636                 RETURN(rc);
637
638         /* Create .lustre/lost+found/MDTxxxx. */
639
640         /* XXX: Currently, cross-MDT create operation needs to create the child
641          *      object firstly, then insert name into the parent directory. For
642          *      this case, the child object resides on current MDT (local), but
643          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
644          *      easy to contain all the sub-modifications orderly within single
645          *      transaction.
646          *
647          *      To avoid more inconsistency, we split the create operation into
648          *      two transactions:
649          *
650          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
651          *         locally.
652          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
653          *         remotely.
654          *
655          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
656          *      repair such inconsistency when LFSCK run next time. */
657
658         /* Transaction I: locally */
659
660         dev = lfsck->li_bottom;
661         th = dt_trans_create(env, dev);
662         if (IS_ERR(th))
663                 RETURN(PTR_ERR(th));
664
665         /* 1a. create child */
666         rc = dt_declare_create(env, child, la, NULL, dof, th);
667         if (rc != 0)
668                 GOTO(stop, rc);
669
670         /* 2a. increase child nlink */
671         rc = dt_declare_ref_add(env, child, th);
672         if (rc != 0)
673                 GOTO(stop, rc);
674
675         /* 3a. insert linkEA for child */
676         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
677                        ldata.ld_leh->leh_len);
678         rc = dt_declare_xattr_set(env, child, &linkea_buf,
679                                   XATTR_NAME_LINK, 0, th);
680         if (rc != 0)
681                 GOTO(stop, rc);
682
683         /* 4a. update bookmark */
684         rc = dt_declare_record_write(env, bk_obj,
685                                      lfsck_buf_get(env, bk, len), 0, th);
686         if (rc != 0)
687                 GOTO(stop, rc);
688
689         rc = dt_trans_start_local(env, dev, th);
690         if (rc != 0)
691                 GOTO(stop, rc);
692
693         dt_write_lock(env, child, 0);
694         /* 1b.1. create child */
695         rc = dt_create(env, child, la, NULL, dof, th);
696         if (rc != 0)
697                 GOTO(unlock, rc);
698
699         if (unlikely(!dt_try_as_dir(env, child)))
700                 GOTO(unlock, rc = -ENOTDIR);
701
702         /* 1b.2. insert dot into child dir */
703         rec->rec_type = S_IFDIR;
704         rec->rec_fid = cfid;
705         rc = dt_insert(env, child, (const struct dt_rec *)rec,
706                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
707         if (rc != 0)
708                 GOTO(unlock, rc);
709
710         /* 1b.3. insert dotdot into child dir */
711         rec->rec_fid = &LU_LPF_FID;
712         rc = dt_insert(env, child, (const struct dt_rec *)rec,
713                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
714         if (rc != 0)
715                 GOTO(unlock, rc);
716
717         /* 2b. increase child nlink */
718         rc = dt_ref_add(env, child, th);
719         if (rc != 0)
720                 GOTO(unlock, rc);
721
722         /* 3b. insert linkEA for child */
723         rc = dt_xattr_set(env, child, &linkea_buf,
724                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
725         if (rc != 0)
726                 GOTO(unlock, rc);
727
728         bk->lb_lpf_fid = *cfid;
729         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
730
731         /* 4b. update bookmark */
732         rc = dt_record_write(env, bk_obj,
733                              lfsck_buf_get(env, bk, len), &pos, th);
734
735         dt_write_unlock(env, child);
736         dt_trans_stop(env, dev, th);
737         if (rc != 0)
738                 RETURN(rc);
739
740         /* Transaction II: remotely */
741
742         dev = lfsck->li_next;
743         th = dt_trans_create(env, dev);
744         if (IS_ERR(th))
745                 RETURN(PTR_ERR(th));
746
747         /* 5a. insert name into parent dir */
748         rec->rec_fid = cfid;
749         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
750                                (const struct dt_key *)name, th);
751         if (rc != 0)
752                 GOTO(stop, rc);
753
754         /* 6a. increase parent nlink */
755         rc = dt_declare_ref_add(env, parent, th);
756         if (rc != 0)
757                 GOTO(stop, rc);
758
759         rc = dt_trans_start(env, dev, th);
760         if (rc != 0)
761                 GOTO(stop, rc);
762
763         /* 5b. insert name into parent dir */
764         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
765                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
766         if (rc != 0)
767                 GOTO(stop, rc);
768
769         dt_write_lock(env, parent, 0);
770         /* 6b. increase parent nlink */
771         rc = dt_ref_add(env, parent, th);
772         dt_write_unlock(env, parent);
773
774         GOTO(stop, rc);
775
776 unlock:
777         dt_write_unlock(env, child);
778 stop:
779         dt_trans_stop(env, dev, th);
780
781         if (rc != 0 && dev == lfsck->li_next)
782                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
783                        "for orphans, but failed to insert the name %s "
784                        "to the .lustre/lost+found/. Such inconsistency "
785                        "will be repaired when LFSCK run next time: rc = %d\n",
786                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
787
788         return rc;
789 }
790
791 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
792  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
793  * only when it is required, such as orphan OST-objects repairing. */
794 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
795 {
796         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
797         struct lfsck_thread_info *info  = lfsck_env_info(env);
798         struct lu_fid            *cfid  = &info->lti_fid2;
799         struct lu_attr           *la    = &info->lti_la;
800         struct dt_object_format  *dof   = &info->lti_dof;
801         struct dt_object         *parent = NULL;
802         struct dt_object         *child = NULL;
803         struct lustre_handle      lh    = { 0 };
804         char                      name[8];
805         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
806         int                       rc    = 0;
807         ENTRY;
808
809         LASSERT(lfsck->li_master);
810
811         sprintf(name, "MDT%04x", node);
812         if (node == 0) {
813                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
814                                                   &LU_LPF_FID);
815         } else {
816                 struct lfsck_tgt_desc *ltd;
817
818                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
819                 if (unlikely(ltd == NULL))
820                         RETURN(-ENXIO);
821
822                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
823                                                   &LU_LPF_FID);
824                 lfsck_tgt_put(ltd);
825         }
826         if (IS_ERR(parent))
827                 RETURN(PTR_ERR(parent));
828
829         if (lfsck->li_lpf_obj != NULL)
830                 GOTO(out, rc = 0);
831
832         if (unlikely(!dt_try_as_dir(env, parent)))
833                 GOTO(out, rc = -ENOTDIR);
834
835         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
836                               MDS_INODELOCK_UPDATE, LCK_EX);
837         if (rc != 0)
838                 GOTO(out, rc);
839
840         mutex_lock(&lfsck->li_mutex);
841         if (lfsck->li_lpf_obj != NULL)
842                 GOTO(unlock, rc = 0);
843
844         if (fid_is_zero(&bk->lb_lpf_fid)) {
845                 /* There is corner case that: in former LFSCK scanning we have
846                  * created the .lustre/lost+found/MDTxxxx but failed to update
847                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
848                  * it from MDT0 firstly. */
849                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
850                                (const struct dt_key *)name, BYPASS_CAPA);
851                 if (rc != 0 && rc != -ENOENT)
852                         GOTO(unlock, rc);
853
854                 if (rc == 0) {
855                         bk->lb_lpf_fid = *cfid;
856                         rc = lfsck_bookmark_store(env, lfsck);
857                 } else {
858                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
859                 }
860                 if (rc != 0)
861                         GOTO(unlock, rc);
862         } else {
863                 *cfid = bk->lb_lpf_fid;
864         }
865
866         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
867         if (IS_ERR(child))
868                 GOTO(unlock, rc = PTR_ERR(child));
869
870         if (dt_object_exists(child) != 0) {
871                 if (unlikely(!dt_try_as_dir(env, child)))
872                         rc = -ENOTDIR;
873                 else
874                         lfsck->li_lpf_obj = child;
875
876                 GOTO(unlock, rc);
877         }
878
879         memset(la, 0, sizeof(*la));
880         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
881         la->la_mode = S_IFDIR | S_IRWXU;
882         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
883                        LA_UID | LA_GID;
884         memset(dof, 0, sizeof(*dof));
885         dof->dof_type = dt_mode_to_dft(S_IFDIR);
886
887         if (node == 0)
888                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
889                                             dof, name);
890         else
891                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
892                                              dof, name);
893         if (rc == 0)
894                 lfsck->li_lpf_obj = child;
895
896         GOTO(unlock, rc);
897
898 unlock:
899         mutex_unlock(&lfsck->li_mutex);
900         lfsck_ibits_unlock(&lh, LCK_EX);
901         if (rc != 0 && child != NULL && !IS_ERR(child))
902                 lu_object_put(env, &child->do_lu);
903 out:
904         if (parent != NULL && !IS_ERR(parent))
905                 lu_object_put(env, &parent->do_lu);
906
907         return rc;
908 }
909
910 /**
911  * Scan .lustre/lost+found for bad name entries and remove them.
912  *
913  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
914  * index in the system. Any other formatted name is invalid and should be
915  * removed.
916  *
917  * \param[in] env       pointer to the thread context
918  * \param[in] lfsck     pointer to the lfsck instance
919  * \param[in] parent    pointer to the lost+found object
920  *
921  * \retval              0 for success
922  * \retval              negative error number on failure
923  */
924 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
925                                       struct lfsck_instance *lfsck,
926                                       struct dt_object *parent)
927 {
928         struct lu_dirent        *ent    =
929                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
930         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
931         struct dt_it            *it;
932         int                      rc;
933         ENTRY;
934
935         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
936         if (IS_ERR(it))
937                 RETURN(PTR_ERR(it));
938
939         rc = iops->load(env, it, 0);
940         if (rc == 0)
941                 rc = iops->next(env, it);
942         else if (rc > 0)
943                 rc = 0;
944
945         while (rc == 0) {
946                 int off = 3;
947
948                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
949                 if (rc != 0)
950                         break;
951
952                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
953                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
954                         goto next;
955
956                 /* name length must be strlen("MDTxxxx") */
957                 if (ent->lde_namelen != 7)
958                         goto remove;
959
960                 if (memcmp(ent->lde_name, "MDT", off) != 0)
961                         goto remove;
962
963                 while (off < 7 && isxdigit(ent->lde_name[off]))
964                         off++;
965
966                 if (off != 7) {
967
968 remove:
969                         rc = lfsck_remove_name_entry(env, lfsck, parent,
970                                                      ent->lde_name, S_IFDIR);
971                         if (rc != 0)
972                                 break;
973                 }
974
975 next:
976                 rc = iops->next(env, it);
977         }
978
979         iops->put(env, it);
980         iops->fini(env, it);
981
982         RETURN(rc > 0 ? 0 : rc);
983 }
984
985 static int lfsck_update_lpf_entry(const struct lu_env *env,
986                                   struct lfsck_instance *lfsck,
987                                   struct dt_object *parent,
988                                   struct dt_object *child,
989                                   const char *name,
990                                   enum lfsck_verify_lpf_types type)
991 {
992         int rc;
993
994         if (type == LVLT_BY_BOOKMARK) {
995                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
996                                              lfsck_dto2fid(child), S_IFDIR);
997         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
998                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
999                 rc = lfsck_bookmark_store(env, lfsck);
1000
1001                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1002                        " in the bookmark file: rc = %d\n",
1003                        lfsck_lfsck2name(lfsck),
1004                        PFID(lfsck_dto2fid(child)), rc);
1005         }
1006
1007         return rc;
1008 }
1009
1010 /**
1011  * Check whether the @child back references the @parent.
1012  *
1013  * Two cases:
1014  * 1) The child's FID is stored in the bookmark file. If the child back
1015  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1016  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1017  *    the child back references another parent2, then:
1018  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1019  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1020  *      references the child. So keep them there. As the LFSCK processing,
1021  *      the parent3 may be found, then when the LFSCK run next time, the
1022  *      inconsistency can be repaired.
1023  *
1024  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1025  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1026  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1027  *    back references another parent2, then:
1028  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1029  *      from .lustre/lost+found/;
1030  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1031  *      sub-directory name entry and update the child;
1032  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1033  *      or not, then keep them there.
1034  *
1035  * \param[in] env       pointer to the thread context
1036  * \param[in] lfsck     pointer to the lfsck instance
1037  * \param[in] parent    pointer to the lost+found object
1038  * \param[in] child     pointer to the lost+found sub-directory object
1039  * \param[in] name      the name for lost+found sub-directory object
1040  * \param[out] fid      pointer to the buffer to hold the FID of the object
1041  *                      (called it as parent2) that is referenced via the
1042  *                      child's dotdot entry; it also can be the FID that
1043  *                      is referenced by the name entry under the parent2.
1044  * \param[in] type      to indicate where the child's FID is stored in
1045  *
1046  * \retval              positive number for uncertain inconsistency
1047  * \retval              0 for success
1048  * \retval              negative error number on failure
1049  */
1050 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1051                                   struct lfsck_instance *lfsck,
1052                                   struct dt_object *parent,
1053                                   struct dt_object *child, const char *name,
1054                                   struct lu_fid *fid,
1055                                   enum lfsck_verify_lpf_types type)
1056 {
1057         struct lfsck_thread_info *info    = lfsck_env_info(env);
1058         char                     *name2   = info->lti_key;
1059         struct lu_fid            *fid2    = &info->lti_fid3;
1060         struct dt_object         *parent2 = NULL;
1061         struct lustre_handle      lh      = { 0 };
1062         int                       rc;
1063         ENTRY;
1064
1065         fid_zero(fid);
1066         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1067                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1068         if (rc != 0)
1069                 GOTO(linkea, rc);
1070
1071         if (!fid_is_sane(fid))
1072                 GOTO(linkea, rc = -EINVAL);
1073
1074         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1075                 const struct lu_name *cname;
1076
1077                 if (lfsck->li_lpf_obj == NULL) {
1078                         lu_object_get(&child->do_lu);
1079                         lfsck->li_lpf_obj = child;
1080                 }
1081
1082                 cname = lfsck_name_get_const(env, name, strlen(name));
1083                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1084                                          &LU_LPF_FID);
1085                 if (rc == 0)
1086                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1087                                                     name, type);
1088
1089                 GOTO(out_done, rc);
1090         }
1091
1092         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1093         if (IS_ERR(parent2))
1094                 GOTO(linkea, parent2);
1095
1096         if (!dt_object_exists(parent2)) {
1097                 lu_object_put(env, &parent2->do_lu);
1098
1099                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1100         }
1101
1102         if (!dt_try_as_dir(env, parent2)) {
1103                 lu_object_put(env, &parent2->do_lu);
1104
1105                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1106         }
1107
1108 linkea:
1109         /* To prevent rename/unlink race */
1110         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1111                               MDS_INODELOCK_UPDATE, LCK_PR);
1112         if (rc != 0)
1113                 GOTO(out_put, rc);
1114
1115         dt_read_lock(env, child, 0);
1116         rc = lfsck_links_get_first(env, child, name2, fid2);
1117         if (rc != 0) {
1118                 dt_read_unlock(env, child);
1119                 lfsck_ibits_unlock(&lh, LCK_PR);
1120
1121                 GOTO(out_put, rc = 1);
1122         }
1123
1124         /* It is almost impossible that the bookmark file (or the name entry)
1125          * and the linkEA hit the same data corruption. Trust the linkEA. */
1126         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1127                 dt_read_unlock(env, child);
1128                 lfsck_ibits_unlock(&lh, LCK_PR);
1129
1130                 *fid = *fid2;
1131                 if (lfsck->li_lpf_obj == NULL) {
1132                         lu_object_get(&child->do_lu);
1133                         lfsck->li_lpf_obj = child;
1134                 }
1135
1136                 /* Update the child's dotdot entry */
1137                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1138                                              &LU_LPF_FID, S_IFDIR);
1139                 if (rc == 0)
1140                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1141                                                     name, type);
1142
1143                 GOTO(out_put, rc);
1144         }
1145
1146         if (parent2 == NULL || IS_ERR(parent2)) {
1147                 dt_read_unlock(env, child);
1148                 lfsck_ibits_unlock(&lh, LCK_PR);
1149
1150                 GOTO(out_done, rc = 1);
1151         }
1152
1153         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1154                        (const struct dt_key *)name2, BYPASS_CAPA);
1155         dt_read_unlock(env, child);
1156         lfsck_ibits_unlock(&lh, LCK_PR);
1157         if (rc != 0 && rc != -ENOENT)
1158                 GOTO(out_put, rc);
1159
1160         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1161                 if (type == LVLT_BY_BOOKMARK)
1162                         GOTO(out_put, rc = 1);
1163
1164                 /* Trust the name entry, update the child's dotdot entry. */
1165                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1166                                              &LU_LPF_FID, S_IFDIR);
1167
1168                 GOTO(out_put, rc);
1169         }
1170
1171         if (type == LVLT_BY_BOOKMARK) {
1172                 /* Invalid FID record in the bookmark file, reset it. */
1173                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1174                 rc = lfsck_bookmark_store(env, lfsck);
1175
1176                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1177                        " in the bookmark file: rc = %d\n",
1178                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1179         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1180                 /* The name entry is wrong, remove it. */
1181                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1182         }
1183
1184         GOTO(out_put, rc);
1185
1186 out_put:
1187         if (parent2 != NULL && !IS_ERR(parent2))
1188                 lu_object_put(env, &parent2->do_lu);
1189
1190 out_done:
1191         return rc;
1192 }
1193
1194 /**
1195  * Verify the /ROOT/.lustre/lost+found/ directory.
1196  *
1197  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1198  * the LFSCK does not exactly know how to handle, such as orphans. So before
1199  * the LFSCK scanning the system, the consistency of such directory needs to
1200  * be verified firstly to allow the users to use it during the LFSCK.
1201  *
1202  * \param[in] env       pointer to the thread context
1203  * \param[in] lfsck     pointer to the lfsck instance
1204  *
1205  * \retval              positive number for uncertain inconsistency
1206  * \retval              0 for success
1207  * \retval              negative error number on failure
1208  */
1209 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1210 {
1211         struct lfsck_thread_info *info   = lfsck_env_info(env);
1212         struct lu_fid            *pfid   = &info->lti_fid;
1213         struct lu_fid            *cfid   = &info->lti_fid2;
1214         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1215         struct dt_object         *parent = NULL;
1216         /* child1's FID is in the bookmark file. */
1217         struct dt_object         *child1 = NULL;
1218         /* child2's FID is in the name entry MDTxxxx. */
1219         struct dt_object         *child2 = NULL;
1220         struct dt_device         *dev    = lfsck->li_bottom;
1221         const struct lu_name     *cname;
1222         char                      name[8];
1223         int                       node   = lfsck_dev_idx(dev);
1224         int                       rc     = 0;
1225         ENTRY;
1226
1227         LASSERT(lfsck->li_master);
1228
1229         if (node == 0) {
1230                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1231         } else {
1232                 struct lfsck_tgt_desc *ltd;
1233
1234                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1235                 if (unlikely(ltd == NULL))
1236                         RETURN(-ENXIO);
1237
1238                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1239                                                   &LU_LPF_FID);
1240                 lfsck_tgt_put(ltd);
1241         }
1242
1243         if (IS_ERR(parent))
1244                 RETURN(PTR_ERR(parent));
1245
1246         LASSERT(dt_object_exists(parent));
1247
1248         if (unlikely(!dt_try_as_dir(env, parent)))
1249                 GOTO(put, rc = -ENOTDIR);
1250
1251         if (node == 0) {
1252                 rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
1253                 if (rc != 0)
1254                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1255                                "for bad sub-directories: rc = %d\n",
1256                                lfsck_lfsck2name(lfsck), rc);
1257         }
1258
1259         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1260                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1261                         struct lu_fid tfid = bk->lb_lpf_fid;
1262
1263                         /* Invalid FID record in the bookmark file, reset it. */
1264                         fid_zero(&bk->lb_lpf_fid);
1265                         rc = lfsck_bookmark_store(env, lfsck);
1266
1267                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1268                                " in the bookmark file: rc = %d\n",
1269                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1270
1271                         if (rc != 0)
1272                                 GOTO(put, rc);
1273                 } else {
1274                         child1 = lfsck_object_find_by_dev(env, dev,
1275                                                           &bk->lb_lpf_fid);
1276                         if (IS_ERR(child1))
1277                                 GOTO(put, rc = PTR_ERR(child1));
1278
1279                         if (unlikely(!dt_object_exists(child1) ||
1280                                      dt_object_remote(child1)) ||
1281                                      !S_ISDIR(lfsck_object_type(child1))) {
1282                                 /* Invalid FID record in the bookmark file,
1283                                  * reset it. */
1284                                 fid_zero(&bk->lb_lpf_fid);
1285                                 rc = lfsck_bookmark_store(env, lfsck);
1286
1287                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1288                                        " in the bookmark file: rc = %d\n",
1289                                        lfsck_lfsck2name(lfsck),
1290                                        PFID(lfsck_dto2fid(child1)), rc);
1291
1292                                 if (rc != 0)
1293                                         GOTO(put, rc);
1294
1295                                 lu_object_put(env, &child1->do_lu);
1296                                 child1 = NULL;
1297                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1298                                 GOTO(put, rc = -ENOTDIR);
1299                         }
1300                 }
1301         }
1302
1303         snprintf(name, 8, "MDT%04x", node);
1304         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1305                        (const struct dt_key *)name, BYPASS_CAPA);
1306         if (rc == -ENOENT) {
1307                 if (!fid_is_zero(&bk->lb_lpf_fid))
1308                         goto check_child1;
1309
1310                 GOTO(put, rc = 0);
1311         }
1312
1313         if (rc != 0)
1314                 GOTO(put, rc);
1315
1316         /* Invalid FID in the name entry, remove the name entry. */
1317         if (!fid_is_norm(cfid)) {
1318                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1319                 if (rc != 0)
1320                         GOTO(put, rc);
1321
1322                 goto check_child1;
1323         }
1324
1325         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1326         if (IS_ERR(child2))
1327                 GOTO(put, rc = PTR_ERR(child2));
1328
1329         if (unlikely(!dt_object_exists(child2) ||
1330                      dt_object_remote(child2)) ||
1331                      !S_ISDIR(lfsck_object_type(child2))) {
1332                 rc = lfsck_remove_name_entry(env, lfsck, parent, name,
1333                                              S_IFDIR);
1334                 if (rc != 0)
1335                         GOTO(put, rc);
1336
1337                 goto check_child1;
1338         }
1339
1340         if (unlikely(!dt_try_as_dir(env, child2)))
1341                 GOTO(put, rc = -ENOTDIR);
1342
1343         if (child1 == NULL) {
1344                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
1345                                             pfid, LVLT_BY_NAMEENTRY);
1346         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1347                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1348                                             pfid, LVLT_BY_BOOKMARK);
1349                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1350                         rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
1351                                                     name, pfid,
1352                                                     LVLT_BY_NAMEENTRY);
1353         } else {
1354                 if (lfsck->li_lpf_obj == NULL) {
1355                         lu_object_get(&child2->do_lu);
1356                         lfsck->li_lpf_obj = child2;
1357                 }
1358
1359                 cname = lfsck_name_get_const(env, name, strlen(name));
1360                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1361         }
1362
1363         GOTO(put, rc);
1364
1365 check_child1:
1366         if (child1 != NULL)
1367                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1368                                             pfid, LVLT_BY_BOOKMARK);
1369
1370         GOTO(put, rc);
1371
1372 put:
1373         if (lfsck->li_lpf_obj != NULL &&
1374             unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
1375                 rc = -ENOTDIR;
1376
1377         if (child2 != NULL && !IS_ERR(child2))
1378                 lu_object_put(env, &child2->do_lu);
1379         if (child1 != NULL && !IS_ERR(child1))
1380                 lu_object_put(env, &child1->do_lu);
1381         if (parent != NULL && !IS_ERR(parent))
1382                 lu_object_put(env, &parent->do_lu);
1383
1384         return rc;
1385 }
1386
1387 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1388 {
1389         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1390         struct seq_server_site  *ss;
1391         char                    *prefix;
1392         int                      rc     = 0;
1393         ENTRY;
1394
1395         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1396         if (unlikely(ss == NULL))
1397                 RETURN(-ENXIO);
1398
1399         OBD_ALLOC_PTR(lfsck->li_seq);
1400         if (lfsck->li_seq == NULL)
1401                 RETURN(-ENOMEM);
1402
1403         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1404         if (prefix == NULL)
1405                 GOTO(out, rc = -ENOMEM);
1406
1407         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1408         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1409                              ss->ss_server_seq);
1410         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1411         if (rc != 0)
1412                 GOTO(out, rc);
1413
1414         if (fid_is_sane(&bk->lb_last_fid))
1415                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1416
1417         RETURN(0);
1418
1419 out:
1420         OBD_FREE_PTR(lfsck->li_seq);
1421         lfsck->li_seq = NULL;
1422
1423         return rc;
1424 }
1425
1426 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1427 {
1428         if (lfsck->li_seq != NULL) {
1429                 seq_client_fini(lfsck->li_seq);
1430                 OBD_FREE_PTR(lfsck->li_seq);
1431                 lfsck->li_seq = NULL;
1432         }
1433 }
1434
1435 void lfsck_instance_cleanup(const struct lu_env *env,
1436                             struct lfsck_instance *lfsck)
1437 {
1438         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1439         struct lfsck_component  *com;
1440         struct lfsck_component  *next;
1441         struct lfsck_lmv_unit   *llu;
1442         struct lfsck_lmv_unit   *llu_next;
1443         struct lfsck_lmv        *llmv;
1444         ENTRY;
1445
1446         LASSERT(list_empty(&lfsck->li_link));
1447         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1448
1449         if (lfsck->li_obj_oit != NULL) {
1450                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1451                 lfsck->li_obj_oit = NULL;
1452         }
1453
1454         LASSERT(lfsck->li_obj_dir == NULL);
1455         LASSERT(lfsck->li_lmv == NULL);
1456
1457         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1458                 llmv = &llu->llu_lmv;
1459
1460                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1461                          "still in using: %u\n",
1462                          atomic_read(&llmv->ll_ref));
1463
1464                 lfsck_lmv_put(env, llmv);
1465         }
1466
1467         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1468                 lfsck_component_cleanup(env, com);
1469         }
1470
1471         LASSERT(list_empty(&lfsck->li_list_dir));
1472
1473         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1474                                  lc_link) {
1475                 lfsck_component_cleanup(env, com);
1476         }
1477
1478         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1479                 lfsck_component_cleanup(env, com);
1480         }
1481
1482         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1483         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1484
1485         if (lfsck->li_bookmark_obj != NULL) {
1486                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1487                 lfsck->li_bookmark_obj = NULL;
1488         }
1489
1490         if (lfsck->li_lpf_obj != NULL) {
1491                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1492                 lfsck->li_lpf_obj = NULL;
1493         }
1494
1495         if (lfsck->li_los != NULL) {
1496                 local_oid_storage_fini(env, lfsck->li_los);
1497                 lfsck->li_los = NULL;
1498         }
1499
1500         lfsck_fid_fini(lfsck);
1501
1502         OBD_FREE_PTR(lfsck);
1503 }
1504
1505 static inline struct lfsck_instance *
1506 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1507 {
1508         struct lfsck_instance *lfsck;
1509
1510         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1511                 if (lfsck->li_bottom == key) {
1512                         if (ref)
1513                                 lfsck_instance_get(lfsck);
1514                         if (unlink)
1515                                 list_del_init(&lfsck->li_link);
1516
1517                         return lfsck;
1518                 }
1519         }
1520
1521         return NULL;
1522 }
1523
1524 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1525                                            bool unlink)
1526 {
1527         struct lfsck_instance *lfsck;
1528
1529         spin_lock(&lfsck_instance_lock);
1530         lfsck = __lfsck_instance_find(key, ref, unlink);
1531         spin_unlock(&lfsck_instance_lock);
1532
1533         return lfsck;
1534 }
1535
1536 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1537 {
1538         struct lfsck_instance *tmp;
1539
1540         spin_lock(&lfsck_instance_lock);
1541         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1542                 if (lfsck->li_bottom == tmp->li_bottom) {
1543                         spin_unlock(&lfsck_instance_lock);
1544                         return -EEXIST;
1545                 }
1546         }
1547
1548         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1549         spin_unlock(&lfsck_instance_lock);
1550         return 0;
1551 }
1552
1553 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1554                     const char *prefix)
1555 {
1556         int flag;
1557         int i;
1558         bool newline = (bits != 0 ? false : true);
1559
1560         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1561
1562         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1563                 if (flag & bits) {
1564                         bits &= ~flag;
1565                         if (names[i] != NULL) {
1566                                 if (bits == 0)
1567                                         newline = true;
1568
1569                                 seq_printf(m, "%s%c", names[i],
1570                                            newline ? '\n' : ',');
1571                         }
1572                 }
1573         }
1574
1575         if (!newline)
1576                 seq_printf(m, "\n");
1577         return 0;
1578 }
1579
1580 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1581 {
1582         if (time != 0)
1583                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1584                           cfs_time_current_sec() - time);
1585         else
1586                 seq_printf(m, "%s: N/A\n", prefix);
1587         return 0;
1588 }
1589
1590 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1591                    const char *prefix)
1592 {
1593         if (fid_is_zero(&pos->lp_dir_parent)) {
1594                 if (pos->lp_oit_cookie == 0)
1595                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1596                                    prefix);
1597                 else
1598                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1599                                    prefix, pos->lp_oit_cookie);
1600         } else {
1601                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1602                            prefix, pos->lp_oit_cookie,
1603                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1604         }
1605         return 0;
1606 }
1607
1608 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1609                     struct lfsck_position *pos, bool init)
1610 {
1611         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1612
1613         if (unlikely(lfsck->li_di_oit == NULL)) {
1614                 memset(pos, 0, sizeof(*pos));
1615                 return;
1616         }
1617
1618         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1619         if (!lfsck->li_current_oit_processed && !init)
1620                 pos->lp_oit_cookie--;
1621
1622         LASSERT(pos->lp_oit_cookie > 0);
1623
1624         if (lfsck->li_di_dir != NULL) {
1625                 struct dt_object *dto = lfsck->li_obj_dir;
1626
1627                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1628                                                         lfsck->li_di_dir);
1629
1630                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1631                         fid_zero(&pos->lp_dir_parent);
1632                         pos->lp_dir_cookie = 0;
1633                 } else {
1634                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1635                 }
1636         } else {
1637                 fid_zero(&pos->lp_dir_parent);
1638                 pos->lp_dir_cookie = 0;
1639         }
1640 }
1641
1642 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1643 {
1644         bool dirty = false;
1645
1646         if (limit != LFSCK_SPEED_NO_LIMIT) {
1647                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1648                         lfsck->li_sleep_rate = jiffies_to_msecs(limit) /
1649                                                MSEC_PER_SEC;
1650                         lfsck->li_sleep_jif = 1;
1651                 } else {
1652                         lfsck->li_sleep_rate = 1;
1653                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC /
1654                                                                limit);
1655                 }
1656         } else {
1657                 lfsck->li_sleep_jif = 0;
1658                 lfsck->li_sleep_rate = 0;
1659         }
1660
1661         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1662                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1663                 dirty = true;
1664         }
1665
1666         return dirty;
1667 }
1668
1669 void lfsck_control_speed(struct lfsck_instance *lfsck)
1670 {
1671         struct ptlrpc_thread *thread = &lfsck->li_thread;
1672         struct l_wait_info    lwi;
1673
1674         if (lfsck->li_sleep_jif > 0 &&
1675             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1676                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1677                                        LWI_ON_SIGNAL_NOOP, NULL);
1678
1679                 l_wait_event(thread->t_ctl_waitq,
1680                              !thread_is_running(thread),
1681                              &lwi);
1682                 lfsck->li_new_scanned = 0;
1683         }
1684 }
1685
1686 void lfsck_control_speed_by_self(struct lfsck_component *com)
1687 {
1688         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1689         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1690         struct l_wait_info       lwi;
1691
1692         if (lfsck->li_sleep_jif > 0 &&
1693             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1694                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1695                                        LWI_ON_SIGNAL_NOOP, NULL);
1696
1697                 l_wait_event(thread->t_ctl_waitq,
1698                              !thread_is_running(thread),
1699                              &lwi);
1700                 com->lc_new_scanned = 0;
1701         }
1702 }
1703
1704 static struct lfsck_thread_args *
1705 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1706                        struct lfsck_component *com,
1707                        struct lfsck_start_param *lsp)
1708 {
1709         struct lfsck_thread_args *lta;
1710         int                       rc;
1711
1712         OBD_ALLOC_PTR(lta);
1713         if (lta == NULL)
1714                 return ERR_PTR(-ENOMEM);
1715
1716         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1717         if (rc != 0) {
1718                 OBD_FREE_PTR(lta);
1719                 return ERR_PTR(rc);
1720         }
1721
1722         lta->lta_lfsck = lfsck_instance_get(lfsck);
1723         if (com != NULL)
1724                 lta->lta_com = lfsck_component_get(com);
1725
1726         lta->lta_lsp = lsp;
1727
1728         return lta;
1729 }
1730
1731 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1732 {
1733         if (lta->lta_com != NULL)
1734                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1735         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1736         lu_env_fini(&lta->lta_env);
1737         OBD_FREE_PTR(lta);
1738 }
1739
1740 struct lfsck_assistant_data *
1741 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1742                           const char *name)
1743 {
1744         struct lfsck_assistant_data *lad;
1745
1746         OBD_ALLOC_PTR(lad);
1747         if (lad != NULL) {
1748                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1749                 if (lad->lad_bitmap == NULL) {
1750                         OBD_FREE_PTR(lad);
1751                         return NULL;
1752                 }
1753
1754                 INIT_LIST_HEAD(&lad->lad_req_list);
1755                 spin_lock_init(&lad->lad_lock);
1756                 INIT_LIST_HEAD(&lad->lad_ost_list);
1757                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1758                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1759                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1760                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1761                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1762                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1763                 lad->lad_ops = lao;
1764                 lad->lad_name = name;
1765         }
1766
1767         return lad;
1768 }
1769
1770 /**
1771  * Generic LFSCK asynchronous communication interpretor function.
1772  * The LFSCK RPC reply for both the event notification and status
1773  * querying will be handled here.
1774  *
1775  * \param[in] env       pointer to the thread context
1776  * \param[in] req       pointer to the LFSCK request
1777  * \param[in] args      pointer to the lfsck_async_interpret_args
1778  * \param[in] rc        the result for handling the LFSCK request
1779  *
1780  * \retval              0 for success
1781  * \retval              negative error number on failure
1782  */
1783 int lfsck_async_interpret_common(const struct lu_env *env,
1784                                  struct ptlrpc_request *req,
1785                                  void *args, int rc)
1786 {
1787         struct lfsck_async_interpret_args *laia = args;
1788         struct lfsck_component            *com  = laia->laia_com;
1789         struct lfsck_assistant_data       *lad  = com->lc_data;
1790         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1791         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1792         struct lfsck_request              *lr   = laia->laia_lr;
1793
1794         LASSERT(com->lc_lfsck->li_master);
1795
1796         switch (lr->lr_event) {
1797         case LE_START:
1798                 if (rc != 0) {
1799                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1800                                "start: rc = %d\n",
1801                                lfsck_lfsck2name(com->lc_lfsck),
1802                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1803                                ltd->ltd_index, lad->lad_name, rc);
1804
1805                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1806                                 struct lfsck_layout *lo = com->lc_file_ram;
1807
1808                                 if (lr->lr_flags & LEF_TO_OST)
1809                                         lfsck_lad_set_bitmap(env, com,
1810                                                              ltd->ltd_index);
1811                                 else
1812                                         lo->ll_flags |= LF_INCOMPLETE;
1813                         } else {
1814                                 struct lfsck_namespace *ns = com->lc_file_ram;
1815
1816                                 /* If some MDT does not join the namespace
1817                                  * LFSCK, then we cannot know whether there
1818                                  * is some name entry on such MDT that with
1819                                  * the referenced MDT-object on this MDT or
1820                                  * not. So the namespace LFSCK on this MDT
1821                                  * cannot handle orphan MDT-objects properly.
1822                                  * So we mark the LFSCK as LF_INCOMPLETE and
1823                                  * skip orphan MDT-objects handling. */
1824                                 ns->ln_flags |= LF_INCOMPLETE;
1825                         }
1826                         break;
1827                 }
1828
1829                 spin_lock(&ltds->ltd_lock);
1830                 if (ltd->ltd_dead) {
1831                         spin_unlock(&ltds->ltd_lock);
1832                         break;
1833                 }
1834
1835                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1836                         struct list_head *list;
1837                         struct list_head *phase_list;
1838
1839                         if (ltd->ltd_layout_done) {
1840                                 spin_unlock(&ltds->ltd_lock);
1841                                 break;
1842                         }
1843
1844                         if (lr->lr_flags & LEF_TO_OST) {
1845                                 list = &lad->lad_ost_list;
1846                                 phase_list = &lad->lad_ost_phase1_list;
1847                         } else {
1848                                 list = &lad->lad_mdt_list;
1849                                 phase_list = &lad->lad_mdt_phase1_list;
1850                         }
1851
1852                         if (list_empty(&ltd->ltd_layout_list))
1853                                 list_add_tail(&ltd->ltd_layout_list, list);
1854                         if (list_empty(&ltd->ltd_layout_phase_list))
1855                                 list_add_tail(&ltd->ltd_layout_phase_list,
1856                                               phase_list);
1857                 } else {
1858                         if (ltd->ltd_namespace_done) {
1859                                 spin_unlock(&ltds->ltd_lock);
1860                                 break;
1861                         }
1862
1863                         if (list_empty(&ltd->ltd_namespace_list))
1864                                 list_add_tail(&ltd->ltd_namespace_list,
1865                                               &lad->lad_mdt_list);
1866                         if (list_empty(&ltd->ltd_namespace_phase_list))
1867                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1868                                               &lad->lad_mdt_phase1_list);
1869                 }
1870                 spin_unlock(&ltds->ltd_lock);
1871                 break;
1872         case LE_STOP:
1873         case LE_PHASE1_DONE:
1874         case LE_PHASE2_DONE:
1875         case LE_PEER_EXIT:
1876                 if (rc != 0 && rc != -EALREADY)
1877                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1878                               "event = %d, rc = %d\n",
1879                               lfsck_lfsck2name(com->lc_lfsck),
1880                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1881                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1882                 break;
1883         case LE_QUERY: {
1884                 struct lfsck_reply *reply;
1885                 struct list_head *list;
1886                 struct list_head *phase_list;
1887
1888                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1889                         list = &ltd->ltd_layout_list;
1890                         phase_list = &ltd->ltd_layout_phase_list;
1891                 } else {
1892                         list = &ltd->ltd_namespace_list;
1893                         phase_list = &ltd->ltd_namespace_phase_list;
1894                 }
1895
1896                 if (rc != 0) {
1897                         spin_lock(&ltds->ltd_lock);
1898                         list_del_init(phase_list);
1899                         list_del_init(list);
1900                         spin_unlock(&ltds->ltd_lock);
1901                         break;
1902                 }
1903
1904                 reply = req_capsule_server_get(&req->rq_pill,
1905                                                &RMF_LFSCK_REPLY);
1906                 if (reply == NULL) {
1907                         rc = -EPROTO;
1908                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1909                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1910                                lad->lad_name, rc);
1911                         spin_lock(&ltds->ltd_lock);
1912                         list_del_init(phase_list);
1913                         list_del_init(list);
1914                         spin_unlock(&ltds->ltd_lock);
1915                         break;
1916                 }
1917
1918                 switch (reply->lr_status) {
1919                 case LS_SCANNING_PHASE1:
1920                         break;
1921                 case LS_SCANNING_PHASE2:
1922                         spin_lock(&ltds->ltd_lock);
1923                         list_del_init(phase_list);
1924                         if (ltd->ltd_dead) {
1925                                 spin_unlock(&ltds->ltd_lock);
1926                                 break;
1927                         }
1928
1929                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1930                                 if (ltd->ltd_layout_done) {
1931                                         spin_unlock(&ltds->ltd_lock);
1932                                         break;
1933                                 }
1934
1935                                 if (lr->lr_flags & LEF_TO_OST)
1936                                         list_add_tail(phase_list,
1937                                                 &lad->lad_ost_phase2_list);
1938                                 else
1939                                         list_add_tail(phase_list,
1940                                                 &lad->lad_mdt_phase2_list);
1941                         } else {
1942                                 if (ltd->ltd_namespace_done) {
1943                                         spin_unlock(&ltds->ltd_lock);
1944                                         break;
1945                                 }
1946
1947                                 list_add_tail(phase_list,
1948                                               &lad->lad_mdt_phase2_list);
1949                         }
1950                         spin_unlock(&ltds->ltd_lock);
1951                         break;
1952                 default:
1953                         spin_lock(&ltds->ltd_lock);
1954                         list_del_init(phase_list);
1955                         list_del_init(list);
1956                         spin_unlock(&ltds->ltd_lock);
1957                         break;
1958                 }
1959                 break;
1960         }
1961         default:
1962                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
1963                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1964                 break;
1965         }
1966
1967         if (!laia->laia_shared) {
1968                 lfsck_tgt_put(ltd);
1969                 lfsck_component_put(env, com);
1970         }
1971
1972         return 0;
1973 }
1974
1975 static void lfsck_interpret(const struct lu_env *env,
1976                             struct lfsck_instance *lfsck,
1977                             struct ptlrpc_request *req, void *args, int result)
1978 {
1979         struct lfsck_async_interpret_args *laia = args;
1980         struct lfsck_component            *com;
1981
1982         LASSERT(laia->laia_com == NULL);
1983         LASSERT(laia->laia_shared);
1984
1985         spin_lock(&lfsck->li_lock);
1986         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1987                 laia->laia_com = com;
1988                 lfsck_async_interpret_common(env, req, laia, result);
1989         }
1990
1991         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1992                 laia->laia_com = com;
1993                 lfsck_async_interpret_common(env, req, laia, result);
1994         }
1995         spin_unlock(&lfsck->li_lock);
1996 }
1997
1998 static int lfsck_stop_notify(const struct lu_env *env,
1999                              struct lfsck_instance *lfsck,
2000                              struct lfsck_tgt_descs *ltds,
2001                              struct lfsck_tgt_desc *ltd, __u16 type)
2002 {
2003         struct lfsck_component *com;
2004         int                     rc = 0;
2005         ENTRY;
2006
2007         LASSERT(lfsck->li_master);
2008
2009         spin_lock(&lfsck->li_lock);
2010         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2011         if (com == NULL)
2012                 com = __lfsck_component_find(lfsck, type,
2013                                              &lfsck->li_list_double_scan);
2014         if (com != NULL)
2015                 lfsck_component_get(com);
2016         spin_unlock(&lfsck->li_lock);
2017
2018         if (com != NULL) {
2019                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2020                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2021                 struct lfsck_request              *lr    = &info->lti_lr;
2022                 struct lfsck_assistant_data       *lad   = com->lc_data;
2023                 struct list_head                  *list;
2024                 struct list_head                  *phase_list;
2025                 struct ptlrpc_request_set         *set;
2026
2027                 set = ptlrpc_prep_set();
2028                 if (set == NULL) {
2029                         lfsck_component_put(env, com);
2030
2031                         RETURN(-ENOMEM);
2032                 }
2033
2034                 if (type == LFSCK_TYPE_LAYOUT) {
2035                         list = &ltd->ltd_layout_list;
2036                         phase_list = &ltd->ltd_layout_phase_list;
2037                 } else {
2038                         list = &ltd->ltd_namespace_list;
2039                         phase_list = &ltd->ltd_namespace_phase_list;
2040                 }
2041
2042                 spin_lock(&ltds->ltd_lock);
2043                 if (list_empty(list)) {
2044                         LASSERT(list_empty(phase_list));
2045                         spin_unlock(&ltds->ltd_lock);
2046                         ptlrpc_set_destroy(set);
2047
2048                         RETURN(0);
2049                 }
2050
2051                 list_del_init(phase_list);
2052                 list_del_init(list);
2053                 spin_unlock(&ltds->ltd_lock);
2054
2055                 memset(lr, 0, sizeof(*lr));
2056                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2057                 lr->lr_event = LE_PEER_EXIT;
2058                 lr->lr_active = type;
2059                 lr->lr_status = LS_CO_PAUSED;
2060                 if (ltds == &lfsck->li_ost_descs)
2061                         lr->lr_flags = LEF_TO_OST;
2062
2063                 laia->laia_com = com;
2064                 laia->laia_ltds = ltds;
2065                 atomic_inc(&ltd->ltd_ref);
2066                 laia->laia_ltd = ltd;
2067                 laia->laia_lr = lr;
2068                 laia->laia_shared = 0;
2069
2070                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2071                                          lfsck_async_interpret_common,
2072                                          laia, LFSCK_NOTIFY);
2073                 if (rc != 0) {
2074                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2075                                "co-stop for %s: rc = %d\n",
2076                                lfsck_lfsck2name(lfsck),
2077                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2078                                ltd->ltd_index, lad->lad_name, rc);
2079                         lfsck_tgt_put(ltd);
2080                 } else {
2081                         rc = ptlrpc_set_wait(set);
2082                 }
2083
2084                 ptlrpc_set_destroy(set);
2085                 lfsck_component_put(env, com);
2086         }
2087
2088         RETURN(rc);
2089 }
2090
2091 static int lfsck_async_interpret(const struct lu_env *env,
2092                                  struct ptlrpc_request *req,
2093                                  void *args, int rc)
2094 {
2095         struct lfsck_async_interpret_args *laia = args;
2096         struct lfsck_instance             *lfsck;
2097
2098         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2099                               li_mdt_descs);
2100         lfsck_interpret(env, lfsck, req, laia, rc);
2101         lfsck_tgt_put(laia->laia_ltd);
2102         if (rc != 0 && laia->laia_result != -EALREADY)
2103                 laia->laia_result = rc;
2104
2105         return 0;
2106 }
2107
2108 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2109                         struct lfsck_request *lr,
2110                         struct ptlrpc_request_set *set,
2111                         ptlrpc_interpterer_t interpreter,
2112                         void *args, int request)
2113 {
2114         struct lfsck_async_interpret_args *laia;
2115         struct ptlrpc_request             *req;
2116         struct lfsck_request              *tmp;
2117         struct req_format                 *format;
2118         int                                rc;
2119
2120         switch (request) {
2121         case LFSCK_NOTIFY:
2122                 format = &RQF_LFSCK_NOTIFY;
2123                 break;
2124         case LFSCK_QUERY:
2125                 format = &RQF_LFSCK_QUERY;
2126                 break;
2127         default:
2128                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2129                        exp->exp_obd->obd_name, request, -EINVAL);
2130                 return -EINVAL;
2131         }
2132
2133         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2134         if (req == NULL)
2135                 return -ENOMEM;
2136
2137         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2138         if (rc != 0) {
2139                 ptlrpc_request_free(req);
2140
2141                 return rc;
2142         }
2143
2144         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2145         *tmp = *lr;
2146         ptlrpc_request_set_replen(req);
2147
2148         laia = ptlrpc_req_async_args(req);
2149         *laia = *(struct lfsck_async_interpret_args *)args;
2150         if (laia->laia_com != NULL)
2151                 lfsck_component_get(laia->laia_com);
2152         req->rq_interpret_reply = interpreter;
2153         ptlrpc_set_add_req(set, req);
2154
2155         return 0;
2156 }
2157
2158 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2159                           struct lfsck_start_param *lsp)
2160 {
2161         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2162         struct lfsck_assistant_data     *lad     = com->lc_data;
2163         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2164         struct ptlrpc_thread            *athread = &lad->lad_thread;
2165         struct lfsck_thread_args        *lta;
2166         struct task_struct              *task;
2167         int                              rc;
2168         ENTRY;
2169
2170         lad->lad_assistant_status = 0;
2171         lad->lad_post_result = 0;
2172         lad->lad_to_post = 0;
2173         lad->lad_to_double_scan = 0;
2174         lad->lad_in_double_scan = 0;
2175         lad->lad_exit = 0;
2176         thread_set_flags(athread, 0);
2177
2178         lta = lfsck_thread_args_init(lfsck, com, lsp);
2179         if (IS_ERR(lta))
2180                 RETURN(PTR_ERR(lta));
2181
2182         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2183         if (IS_ERR(task)) {
2184                 rc = PTR_ERR(task);
2185                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2186                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2187                 lfsck_thread_args_fini(lta);
2188         } else {
2189                 struct l_wait_info lwi = { 0 };
2190
2191                 l_wait_event(mthread->t_ctl_waitq,
2192                              thread_is_running(athread) ||
2193                              thread_is_stopped(athread),
2194                              &lwi);
2195                 if (unlikely(!thread_is_running(athread)))
2196                         rc = lad->lad_assistant_status;
2197                 else
2198                         rc = 0;
2199         }
2200
2201         RETURN(rc);
2202 }
2203
2204 int lfsck_checkpoint_generic(const struct lu_env *env,
2205                              struct lfsck_component *com)
2206 {
2207         struct lfsck_assistant_data     *lad     = com->lc_data;
2208         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2209         struct ptlrpc_thread            *athread = &lad->lad_thread;
2210         struct l_wait_info               lwi     = { 0 };
2211
2212         if (com->lc_new_checked == 0)
2213                 return LFSCK_CHECKPOINT_SKIP;
2214
2215         l_wait_event(mthread->t_ctl_waitq,
2216                      list_empty(&lad->lad_req_list) ||
2217                      !thread_is_running(mthread) ||
2218                      thread_is_stopped(athread),
2219                      &lwi);
2220
2221         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2222                 return LFSCK_CHECKPOINT_SKIP;
2223
2224         return 0;
2225 }
2226
2227 void lfsck_post_generic(const struct lu_env *env,
2228                         struct lfsck_component *com, int *result)
2229 {
2230         struct lfsck_assistant_data     *lad     = com->lc_data;
2231         struct ptlrpc_thread            *athread = &lad->lad_thread;
2232         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2233         struct l_wait_info               lwi     = { 0 };
2234
2235         lad->lad_post_result = *result;
2236         if (*result <= 0)
2237                 lad->lad_exit = 1;
2238         lad->lad_to_post = 1;
2239
2240         wake_up_all(&athread->t_ctl_waitq);
2241         l_wait_event(mthread->t_ctl_waitq,
2242                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2243                      thread_is_stopped(athread),
2244                      &lwi);
2245
2246         if (lad->lad_assistant_status < 0)
2247                 *result = lad->lad_assistant_status;
2248 }
2249
2250 int lfsck_double_scan_generic(const struct lu_env *env,
2251                               struct lfsck_component *com, int status)
2252 {
2253         struct lfsck_assistant_data     *lad     = com->lc_data;
2254         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2255         struct ptlrpc_thread            *athread = &lad->lad_thread;
2256         struct l_wait_info               lwi     = { 0 };
2257
2258         if (status != LS_SCANNING_PHASE2)
2259                 lad->lad_exit = 1;
2260         else
2261                 lad->lad_to_double_scan = 1;
2262
2263         wake_up_all(&athread->t_ctl_waitq);
2264         l_wait_event(mthread->t_ctl_waitq,
2265                      lad->lad_in_double_scan ||
2266                      thread_is_stopped(athread),
2267                      &lwi);
2268
2269         if (lad->lad_assistant_status < 0)
2270                 return lad->lad_assistant_status;
2271
2272         return 0;
2273 }
2274
2275 void lfsck_quit_generic(const struct lu_env *env,
2276                         struct lfsck_component *com)
2277 {
2278         struct lfsck_assistant_data     *lad     = com->lc_data;
2279         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2280         struct ptlrpc_thread            *athread = &lad->lad_thread;
2281         struct l_wait_info               lwi     = { 0 };
2282
2283         lad->lad_exit = 1;
2284         wake_up_all(&athread->t_ctl_waitq);
2285         l_wait_event(mthread->t_ctl_waitq,
2286                      thread_is_init(athread) ||
2287                      thread_is_stopped(athread),
2288                      &lwi);
2289 }
2290
2291 /* external interfaces */
2292
2293 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2294 {
2295         struct lu_env           env;
2296         struct lfsck_instance  *lfsck;
2297         int                     rc;
2298         ENTRY;
2299
2300         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2301         if (rc != 0)
2302                 RETURN(rc);
2303
2304         lfsck = lfsck_instance_find(key, true, false);
2305         if (likely(lfsck != NULL)) {
2306                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2307                 lfsck_instance_put(&env, lfsck);
2308         } else {
2309                 rc = -ENXIO;
2310         }
2311
2312         lu_env_fini(&env);
2313
2314         RETURN(rc);
2315 }
2316 EXPORT_SYMBOL(lfsck_get_speed);
2317
2318 int lfsck_set_speed(struct dt_device *key, int val)
2319 {
2320         struct lu_env           env;
2321         struct lfsck_instance  *lfsck;
2322         int                     rc;
2323         ENTRY;
2324
2325         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2326         if (rc != 0)
2327                 RETURN(rc);
2328
2329         lfsck = lfsck_instance_find(key, true, false);
2330         if (likely(lfsck != NULL)) {
2331                 mutex_lock(&lfsck->li_mutex);
2332                 if (__lfsck_set_speed(lfsck, val))
2333                         rc = lfsck_bookmark_store(&env, lfsck);
2334                 mutex_unlock(&lfsck->li_mutex);
2335                 lfsck_instance_put(&env, lfsck);
2336         } else {
2337                 rc = -ENXIO;
2338         }
2339
2340         lu_env_fini(&env);
2341
2342         RETURN(rc);
2343 }
2344 EXPORT_SYMBOL(lfsck_set_speed);
2345
2346 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2347 {
2348         struct lu_env           env;
2349         struct lfsck_instance  *lfsck;
2350         int                     rc;
2351         ENTRY;
2352
2353         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2354         if (rc != 0)
2355                 RETURN(rc);
2356
2357         lfsck = lfsck_instance_find(key, true, false);
2358         if (likely(lfsck != NULL)) {
2359                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2360                 lfsck_instance_put(&env, lfsck);
2361         } else {
2362                 rc = -ENXIO;
2363         }
2364
2365         lu_env_fini(&env);
2366
2367         RETURN(rc);
2368 }
2369 EXPORT_SYMBOL(lfsck_get_windows);
2370
2371 int lfsck_set_windows(struct dt_device *key, int val)
2372 {
2373         struct lu_env           env;
2374         struct lfsck_instance  *lfsck;
2375         int                     rc;
2376         ENTRY;
2377
2378         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2379         if (rc != 0)
2380                 RETURN(rc);
2381
2382         lfsck = lfsck_instance_find(key, true, false);
2383         if (likely(lfsck != NULL)) {
2384                 if (val > LFSCK_ASYNC_WIN_MAX) {
2385                         CWARN("%s: Too large async window size, which "
2386                               "may cause memory issues. The valid range "
2387                               "is [0 - %u]. If you do not want to restrict "
2388                               "the window size for async requests pipeline, "
2389                               "just set it as 0.\n",
2390                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2391                         rc = -EINVAL;
2392                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2393                         mutex_lock(&lfsck->li_mutex);
2394                         lfsck->li_bookmark_ram.lb_async_windows = val;
2395                         rc = lfsck_bookmark_store(&env, lfsck);
2396                         mutex_unlock(&lfsck->li_mutex);
2397                 }
2398                 lfsck_instance_put(&env, lfsck);
2399         } else {
2400                 rc = -ENXIO;
2401         }
2402
2403         lu_env_fini(&env);
2404
2405         RETURN(rc);
2406 }
2407 EXPORT_SYMBOL(lfsck_set_windows);
2408
2409 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2410 {
2411         struct lu_env           env;
2412         struct lfsck_instance  *lfsck;
2413         struct lfsck_component *com;
2414         int                     rc;
2415         ENTRY;
2416
2417         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2418         if (rc != 0)
2419                 RETURN(rc);
2420
2421         lfsck = lfsck_instance_find(key, true, false);
2422         if (likely(lfsck != NULL)) {
2423                 com = lfsck_component_find(lfsck, type);
2424                 if (likely(com != NULL)) {
2425                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2426                         lfsck_component_put(&env, com);
2427                 } else {
2428                         rc = -ENOTSUPP;
2429                 }
2430
2431                 lfsck_instance_put(&env, lfsck);
2432         } else {
2433                 rc = -ENXIO;
2434         }
2435
2436         lu_env_fini(&env);
2437
2438         RETURN(rc);
2439 }
2440 EXPORT_SYMBOL(lfsck_dump);
2441
2442 static int lfsck_stop_all(const struct lu_env *env,
2443                           struct lfsck_instance *lfsck,
2444                           struct lfsck_stop *stop)
2445 {
2446         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2447         struct lfsck_request              *lr     = &info->lti_lr;
2448         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2449         struct ptlrpc_request_set         *set;
2450         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2451         struct lfsck_tgt_desc             *ltd;
2452         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2453         __u32                              idx;
2454         int                                rc     = 0;
2455         int                                rc1    = 0;
2456         ENTRY;
2457
2458         LASSERT(stop->ls_flags & LPF_BROADCAST);
2459
2460         set = ptlrpc_prep_set();
2461         if (unlikely(set == NULL))
2462                 RETURN(-ENOMEM);
2463
2464         memset(lr, 0, sizeof(*lr));
2465         lr->lr_event = LE_STOP;
2466         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2467         lr->lr_status = stop->ls_status;
2468         lr->lr_version = bk->lb_version;
2469         lr->lr_active = LFSCK_TYPES_ALL;
2470         lr->lr_param = stop->ls_flags;
2471
2472         laia->laia_com = NULL;
2473         laia->laia_ltds = ltds;
2474         laia->laia_lr = lr;
2475         laia->laia_result = 0;
2476         laia->laia_shared = 1;
2477
2478         down_read(&ltds->ltd_rw_sem);
2479         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2480                 ltd = lfsck_tgt_get(ltds, idx);
2481                 LASSERT(ltd != NULL);
2482
2483                 laia->laia_ltd = ltd;
2484                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2485                                          lfsck_async_interpret, laia,
2486                                          LFSCK_NOTIFY);
2487                 if (rc != 0) {
2488                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2489                         lfsck_tgt_put(ltd);
2490                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2491                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2492                         rc1 = rc;
2493                 }
2494         }
2495         up_read(&ltds->ltd_rw_sem);
2496
2497         rc = ptlrpc_set_wait(set);
2498         ptlrpc_set_destroy(set);
2499
2500         if (rc == 0)
2501                 rc = laia->laia_result;
2502
2503         if (rc == -EALREADY)
2504                 rc = 0;
2505
2506         if (rc != 0)
2507                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2508                        lfsck_lfsck2name(lfsck), rc);
2509
2510         RETURN(rc != 0 ? rc : rc1);
2511 }
2512
2513 static int lfsck_start_all(const struct lu_env *env,
2514                            struct lfsck_instance *lfsck,
2515                            struct lfsck_start *start)
2516 {
2517         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2518         struct lfsck_request              *lr     = &info->lti_lr;
2519         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2520         struct ptlrpc_request_set         *set;
2521         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2522         struct lfsck_tgt_desc             *ltd;
2523         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2524         __u32                              idx;
2525         int                                rc     = 0;
2526         ENTRY;
2527
2528         LASSERT(start->ls_flags & LPF_BROADCAST);
2529
2530         set = ptlrpc_prep_set();
2531         if (unlikely(set == NULL))
2532                 RETURN(-ENOMEM);
2533
2534         memset(lr, 0, sizeof(*lr));
2535         lr->lr_event = LE_START;
2536         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2537         lr->lr_speed = bk->lb_speed_limit;
2538         lr->lr_version = bk->lb_version;
2539         lr->lr_active = start->ls_active;
2540         lr->lr_param = start->ls_flags;
2541         lr->lr_async_windows = bk->lb_async_windows;
2542         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2543                        LSV_ASYNC_WINDOWS;
2544
2545         laia->laia_com = NULL;
2546         laia->laia_ltds = ltds;
2547         laia->laia_lr = lr;
2548         laia->laia_result = 0;
2549         laia->laia_shared = 1;
2550
2551         down_read(&ltds->ltd_rw_sem);
2552         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2553                 ltd = lfsck_tgt_get(ltds, idx);
2554                 LASSERT(ltd != NULL);
2555
2556                 laia->laia_ltd = ltd;
2557                 ltd->ltd_layout_done = 0;
2558                 ltd->ltd_namespace_done = 0;
2559                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2560                                          lfsck_async_interpret, laia,
2561                                          LFSCK_NOTIFY);
2562                 if (rc != 0) {
2563                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2564                         lfsck_tgt_put(ltd);
2565                         CERROR("%s: cannot notify MDT %x for LFSCK "
2566                                "start, failout: rc = %d\n",
2567                                lfsck_lfsck2name(lfsck), idx, rc);
2568                         break;
2569                 }
2570         }
2571         up_read(&ltds->ltd_rw_sem);
2572
2573         if (rc != 0) {
2574                 ptlrpc_set_destroy(set);
2575
2576                 RETURN(rc);
2577         }
2578
2579         rc = ptlrpc_set_wait(set);
2580         ptlrpc_set_destroy(set);
2581
2582         if (rc == 0)
2583                 rc = laia->laia_result;
2584
2585         if (rc != 0) {
2586                 struct lfsck_stop *stop = &info->lti_stop;
2587
2588                 CERROR("%s: cannot start LFSCK on some MDTs, "
2589                        "stop all: rc = %d\n",
2590                        lfsck_lfsck2name(lfsck), rc);
2591                 if (rc != -EALREADY) {
2592                         stop->ls_status = LS_FAILED;
2593                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2594                         lfsck_stop_all(env, lfsck, stop);
2595                 }
2596         }
2597
2598         RETURN(rc);
2599 }
2600
2601 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2602                 struct lfsck_start_param *lsp)
2603 {
2604         struct lfsck_start              *start  = lsp->lsp_start;
2605         struct lfsck_instance           *lfsck;
2606         struct lfsck_bookmark           *bk;
2607         struct ptlrpc_thread            *thread;
2608         struct lfsck_component          *com;
2609         struct l_wait_info               lwi    = { 0 };
2610         struct lfsck_thread_args        *lta;
2611         struct task_struct              *task;
2612         int                              rc     = 0;
2613         __u16                            valid  = 0;
2614         __u16                            flags  = 0;
2615         __u16                            type   = 1;
2616         ENTRY;
2617
2618         lfsck = lfsck_instance_find(key, true, false);
2619         if (unlikely(lfsck == NULL))
2620                 RETURN(-ENXIO);
2621
2622         /* System is not ready, try again later. */
2623         if (unlikely(lfsck->li_namespace == NULL))
2624                 GOTO(put, rc = -EAGAIN);
2625
2626         /* start == NULL means auto trigger paused LFSCK. */
2627         if ((start == NULL) &&
2628             (list_empty(&lfsck->li_list_scan) ||
2629              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2630                 GOTO(put, rc = 0);
2631
2632         bk = &lfsck->li_bookmark_ram;
2633         thread = &lfsck->li_thread;
2634         mutex_lock(&lfsck->li_mutex);
2635         spin_lock(&lfsck->li_lock);
2636         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2637                 rc = -EALREADY;
2638                 if (unlikely(start == NULL)) {
2639                         spin_unlock(&lfsck->li_lock);
2640                         GOTO(out, rc);
2641                 }
2642
2643                 while (start->ls_active != 0) {
2644                         if (!(type & start->ls_active)) {
2645                                 type <<= 1;
2646                                 continue;
2647                         }
2648
2649                         com = __lfsck_component_find(lfsck, type,
2650                                                      &lfsck->li_list_scan);
2651                         if (com == NULL)
2652                                 com = __lfsck_component_find(lfsck, type,
2653                                                 &lfsck->li_list_double_scan);
2654                         if (com == NULL) {
2655                                 rc = -EOPNOTSUPP;
2656                                 break;
2657                         }
2658
2659                         if (com->lc_ops->lfsck_join != NULL) {
2660                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2661                                 if (rc != 0 && rc != -EALREADY)
2662                                         break;
2663                         }
2664                         start->ls_active &= ~type;
2665                         type <<= 1;
2666                 }
2667                 spin_unlock(&lfsck->li_lock);
2668                 GOTO(out, rc);
2669         }
2670         spin_unlock(&lfsck->li_lock);
2671
2672         lfsck->li_status = 0;
2673         lfsck->li_oit_over = 0;
2674         lfsck->li_start_unplug = 0;
2675         lfsck->li_drop_dryrun = 0;
2676         lfsck->li_new_scanned = 0;
2677
2678         /* For auto trigger. */
2679         if (start == NULL)
2680                 goto trigger;
2681
2682         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2683                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2684                        lfsck_lfsck2name(lfsck));
2685
2686                 GOTO(out, rc = -EPERM);
2687         }
2688
2689         start->ls_version = bk->lb_version;
2690
2691         if (start->ls_active != 0) {
2692                 struct lfsck_component *next;
2693
2694                 if (start->ls_active == LFSCK_TYPES_ALL)
2695                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2696
2697                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2698                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2699                         GOTO(out, rc = -ENOTSUPP);
2700                 }
2701
2702                 list_for_each_entry_safe(com, next,
2703                                          &lfsck->li_list_scan, lc_link) {
2704                         if (!(com->lc_type & start->ls_active)) {
2705                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2706                                                              false);
2707                                 if (rc != 0)
2708                                         GOTO(out, rc);
2709                         }
2710                 }
2711
2712                 while (start->ls_active != 0) {
2713                         if (type & start->ls_active) {
2714                                 com = __lfsck_component_find(lfsck, type,
2715                                                         &lfsck->li_list_idle);
2716                                 if (com != NULL)
2717                                         /* The component status will be updated
2718                                          * when its prep() is called later by
2719                                          * the LFSCK main engine. */
2720                                         list_move_tail(&com->lc_link,
2721                                                        &lfsck->li_list_scan);
2722                                 start->ls_active &= ~type;
2723                         }
2724                         type <<= 1;
2725                 }
2726         }
2727
2728         if (list_empty(&lfsck->li_list_scan)) {
2729                 /* The speed limit will be used to control both the LFSCK and
2730                  * low layer scrub (if applied), need to be handled firstly. */
2731                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2732                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2733                                 rc = lfsck_bookmark_store(env, lfsck);
2734                                 if (rc != 0)
2735                                         GOTO(out, rc);
2736                         }
2737                 }
2738
2739                 goto trigger;
2740         }
2741
2742         if (start->ls_flags & LPF_RESET)
2743                 flags |= DOIF_RESET;
2744
2745         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2746         if (rc != 0)
2747                 GOTO(out, rc);
2748
2749         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2750                 start->ls_active |= com->lc_type;
2751                 if (flags & DOIF_RESET) {
2752                         rc = com->lc_ops->lfsck_reset(env, com, false);
2753                         if (rc != 0)
2754                                 GOTO(out, rc);
2755                 }
2756         }
2757
2758 trigger:
2759         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2760         if (bk->lb_param & LPF_DRYRUN)
2761                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2762
2763         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2764                 valid |= DOIV_ERROR_HANDLE;
2765                 if (start->ls_flags & LPF_FAILOUT)
2766                         flags |= DOIF_FAILOUT;
2767         }
2768
2769         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2770                 valid |= DOIV_DRYRUN;
2771                 if (start->ls_flags & LPF_DRYRUN)
2772                         flags |= DOIF_DRYRUN;
2773         }
2774
2775         if (!list_empty(&lfsck->li_list_scan))
2776                 flags |= DOIF_OUTUSED;
2777
2778         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2779         thread_set_flags(thread, 0);
2780         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2781         if (IS_ERR(lta))
2782                 GOTO(out, rc = PTR_ERR(lta));
2783
2784         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2785         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2786         if (IS_ERR(task)) {
2787                 rc = PTR_ERR(task);
2788                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2789                        lfsck_lfsck2name(lfsck), rc);
2790                 lfsck_thread_args_fini(lta);
2791
2792                 GOTO(out, rc);
2793         }
2794
2795         l_wait_event(thread->t_ctl_waitq,
2796                      thread_is_running(thread) ||
2797                      thread_is_stopped(thread),
2798                      &lwi);
2799         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2800                 lfsck->li_start_unplug = 1;
2801                 wake_up_all(&thread->t_ctl_waitq);
2802
2803                 GOTO(out, rc = 0);
2804         }
2805
2806         /* release lfsck::li_mutex to avoid deadlock. */
2807         mutex_unlock(&lfsck->li_mutex);
2808         rc = lfsck_start_all(env, lfsck, start);
2809         if (rc != 0) {
2810                 spin_lock(&lfsck->li_lock);
2811                 if (thread_is_stopped(thread)) {
2812                         spin_unlock(&lfsck->li_lock);
2813                 } else {
2814                         lfsck->li_status = LS_FAILED;
2815                         lfsck->li_flags = 0;
2816                         thread_set_flags(thread, SVC_STOPPING);
2817                         spin_unlock(&lfsck->li_lock);
2818
2819                         lfsck->li_start_unplug = 1;
2820                         wake_up_all(&thread->t_ctl_waitq);
2821                         l_wait_event(thread->t_ctl_waitq,
2822                                      thread_is_stopped(thread),
2823                                      &lwi);
2824                 }
2825         } else {
2826                 lfsck->li_start_unplug = 1;
2827                 wake_up_all(&thread->t_ctl_waitq);
2828         }
2829
2830         GOTO(put, rc);
2831
2832 out:
2833         mutex_unlock(&lfsck->li_mutex);
2834
2835 put:
2836         lfsck_instance_put(env, lfsck);
2837
2838         return rc < 0 ? rc : 0;
2839 }
2840 EXPORT_SYMBOL(lfsck_start);
2841
2842 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2843                struct lfsck_stop *stop)
2844 {
2845         struct lfsck_instance   *lfsck;
2846         struct ptlrpc_thread    *thread;
2847         struct l_wait_info       lwi    = { 0 };
2848         int                      rc     = 0;
2849         int                      rc1    = 0;
2850         ENTRY;
2851
2852         lfsck = lfsck_instance_find(key, true, false);
2853         if (unlikely(lfsck == NULL))
2854                 RETURN(-ENXIO);
2855
2856         thread = &lfsck->li_thread;
2857         /* release lfsck::li_mutex to avoid deadlock. */
2858         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2859                 if (!lfsck->li_master) {
2860                         CERROR("%s: only allow to specify '-A' via MDS\n",
2861                                lfsck_lfsck2name(lfsck));
2862
2863                         GOTO(out, rc = -EPERM);
2864                 }
2865
2866                 rc1 = lfsck_stop_all(env, lfsck, stop);
2867         }
2868
2869         mutex_lock(&lfsck->li_mutex);
2870         spin_lock(&lfsck->li_lock);
2871         /* no error if LFSCK is already stopped, or was never started */
2872         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2873                 spin_unlock(&lfsck->li_lock);
2874                 GOTO(out, rc = 0);
2875         }
2876
2877         if (stop != NULL) {
2878                 lfsck->li_status = stop->ls_status;
2879                 lfsck->li_flags = stop->ls_flags;
2880         } else {
2881                 lfsck->li_status = LS_STOPPED;
2882                 lfsck->li_flags = 0;
2883         }
2884
2885         thread_set_flags(thread, SVC_STOPPING);
2886         spin_unlock(&lfsck->li_lock);
2887
2888         wake_up_all(&thread->t_ctl_waitq);
2889         l_wait_event(thread->t_ctl_waitq,
2890                      thread_is_stopped(thread),
2891                      &lwi);
2892
2893         GOTO(out, rc = 0);
2894
2895 out:
2896         mutex_unlock(&lfsck->li_mutex);
2897         lfsck_instance_put(env, lfsck);
2898
2899         return rc != 0 ? rc : rc1;
2900 }
2901 EXPORT_SYMBOL(lfsck_stop);
2902
2903 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2904                     struct lfsck_request *lr, struct thandle *th)
2905 {
2906         int rc = -EOPNOTSUPP;
2907         ENTRY;
2908
2909         switch (lr->lr_event) {
2910         case LE_START: {
2911                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2912                 struct lfsck_start_param  lsp;
2913
2914                 memset(start, 0, sizeof(*start));
2915                 start->ls_valid = lr->lr_valid;
2916                 start->ls_speed_limit = lr->lr_speed;
2917                 start->ls_version = lr->lr_version;
2918                 start->ls_active = lr->lr_active;
2919                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2920                 start->ls_async_windows = lr->lr_async_windows;
2921
2922                 lsp.lsp_start = start;
2923                 lsp.lsp_index = lr->lr_index;
2924                 lsp.lsp_index_valid = 1;
2925                 rc = lfsck_start(env, key, &lsp);
2926                 break;
2927         }
2928         case LE_STOP: {
2929                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2930
2931                 memset(stop, 0, sizeof(*stop));
2932                 stop->ls_status = lr->lr_status;
2933                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2934                 rc = lfsck_stop(env, key, stop);
2935                 break;
2936         }
2937         case LE_PHASE1_DONE:
2938         case LE_PHASE2_DONE:
2939         case LE_FID_ACCESSED:
2940         case LE_PEER_EXIT:
2941         case LE_CONDITIONAL_DESTROY:
2942         case LE_CREATE_ORPHAN:
2943         case LE_SKIP_NLINK_DECLARE:
2944         case LE_SKIP_NLINK:
2945         case LE_SET_LMV_MASTER:
2946         case LE_SET_LMV_SLAVE:
2947         case LE_PAIRS_VERIFY: {
2948                 struct lfsck_instance  *lfsck;
2949                 struct lfsck_component *com;
2950
2951                 lfsck = lfsck_instance_find(key, true, false);
2952                 if (unlikely(lfsck == NULL))
2953                         RETURN(-ENXIO);
2954
2955                 com = lfsck_component_find(lfsck, lr->lr_active);
2956                 if (likely(com != NULL)) {
2957                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
2958                         lfsck_component_put(env, com);
2959                 }
2960
2961                 lfsck_instance_put(env, lfsck);
2962                 break;
2963         }
2964         default:
2965                 break;
2966         }
2967
2968         RETURN(rc);
2969 }
2970 EXPORT_SYMBOL(lfsck_in_notify);
2971
2972 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2973                 struct lfsck_request *lr)
2974 {
2975         struct lfsck_instance  *lfsck;
2976         struct lfsck_component *com;
2977         int                     rc;
2978         ENTRY;
2979
2980         lfsck = lfsck_instance_find(key, true, false);
2981         if (unlikely(lfsck == NULL))
2982                 RETURN(-ENXIO);
2983
2984         com = lfsck_component_find(lfsck, lr->lr_active);
2985         if (likely(com != NULL)) {
2986                 rc = com->lc_ops->lfsck_query(env, com);
2987                 lfsck_component_put(env, com);
2988         } else {
2989                 rc = -ENOTSUPP;
2990         }
2991
2992         lfsck_instance_put(env, lfsck);
2993
2994         RETURN(rc);
2995 }
2996 EXPORT_SYMBOL(lfsck_query);
2997
2998 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2999                              struct ldlm_namespace *ns)
3000 {
3001         struct lfsck_instance  *lfsck;
3002         int                     rc      = -ENXIO;
3003
3004         lfsck = lfsck_instance_find(key, true, false);
3005         if (likely(lfsck != NULL)) {
3006                 lfsck->li_namespace = ns;
3007                 lfsck_instance_put(env, lfsck);
3008                 rc = 0;
3009         }
3010
3011         return rc;
3012 }
3013 EXPORT_SYMBOL(lfsck_register_namespace);
3014
3015 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3016                    struct dt_device *next, struct obd_device *obd,
3017                    lfsck_out_notify notify, void *notify_data, bool master)
3018 {
3019         struct lfsck_instance   *lfsck;
3020         struct dt_object        *root  = NULL;
3021         struct dt_object        *obj   = NULL;
3022         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3023         int                      rc;
3024         ENTRY;
3025
3026         lfsck = lfsck_instance_find(key, false, false);
3027         if (unlikely(lfsck != NULL))
3028                 RETURN(-EEXIST);
3029
3030         OBD_ALLOC_PTR(lfsck);
3031         if (lfsck == NULL)
3032                 RETURN(-ENOMEM);
3033
3034         mutex_init(&lfsck->li_mutex);
3035         spin_lock_init(&lfsck->li_lock);
3036         INIT_LIST_HEAD(&lfsck->li_link);
3037         INIT_LIST_HEAD(&lfsck->li_list_scan);
3038         INIT_LIST_HEAD(&lfsck->li_list_dir);
3039         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3040         INIT_LIST_HEAD(&lfsck->li_list_idle);
3041         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3042         atomic_set(&lfsck->li_ref, 1);
3043         atomic_set(&lfsck->li_double_scan_count, 0);
3044         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3045         lfsck->li_out_notify = notify;
3046         lfsck->li_out_notify_data = notify_data;
3047         lfsck->li_next = next;
3048         lfsck->li_bottom = key;
3049         lfsck->li_obd = obd;
3050
3051         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3052         if (rc != 0)
3053                 GOTO(out, rc);
3054
3055         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3056         if (rc != 0)
3057                 GOTO(out, rc);
3058
3059         fid->f_seq = FID_SEQ_LOCAL_NAME;
3060         fid->f_oid = 1;
3061         fid->f_ver = 0;
3062         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3063         if (rc != 0)
3064                 GOTO(out, rc);
3065
3066         rc = dt_root_get(env, key, fid);
3067         if (rc != 0)
3068                 GOTO(out, rc);
3069
3070         root = dt_locate(env, key, fid);
3071         if (IS_ERR(root))
3072                 GOTO(out, rc = PTR_ERR(root));
3073
3074         if (unlikely(!dt_try_as_dir(env, root)))
3075                 GOTO(out, rc = -ENOTDIR);
3076
3077         lfsck->li_local_root_fid = *fid;
3078         if (master) {
3079                 lfsck->li_master = 1;
3080                 if (lfsck_dev_idx(key) == 0) {
3081                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3082                         const struct lu_name *cname;
3083
3084                         rc = dt_lookup(env, root,
3085                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3086                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3087                         if (rc != 0)
3088                                 GOTO(out, rc);
3089
3090                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3091                         if (IS_ERR(obj))
3092                                 GOTO(out, rc = PTR_ERR(obj));
3093
3094                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3095                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3096                         if (rc != 0)
3097                                 GOTO(out, rc);
3098
3099                         lu_object_put(env, &obj->do_lu);
3100                         obj = dt_locate(env, key, fid);
3101                         if (IS_ERR(obj))
3102                                 GOTO(out, rc = PTR_ERR(obj));
3103
3104                         cname = lfsck_name_get_const(env, dotlustre,
3105                                                      strlen(dotlustre));
3106                         rc = lfsck_verify_linkea(env, key, obj, cname,
3107                                                  &lfsck->li_global_root_fid);
3108                         if (rc != 0)
3109                                 GOTO(out, rc);
3110
3111                         *pfid = *fid;
3112                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3113                                        (const struct dt_key *)lostfound,
3114                                        BYPASS_CAPA);
3115                         if (rc != 0)
3116                                 GOTO(out, rc);
3117
3118                         lu_object_put(env, &obj->do_lu);
3119                         obj = dt_locate(env, key, fid);
3120                         if (IS_ERR(obj))
3121                                 GOTO(out, rc = PTR_ERR(obj));
3122
3123                         cname = lfsck_name_get_const(env, lostfound,
3124                                                      strlen(lostfound));
3125                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3126                         if (rc != 0)
3127                                 GOTO(out, rc);
3128
3129                         lu_object_put(env, &obj->do_lu);
3130                         obj = NULL;
3131                 }
3132         }
3133
3134         fid->f_seq = FID_SEQ_LOCAL_FILE;
3135         fid->f_oid = OTABLE_IT_OID;
3136         fid->f_ver = 0;
3137         obj = dt_locate(env, key, fid);
3138         if (IS_ERR(obj))
3139                 GOTO(out, rc = PTR_ERR(obj));
3140
3141         lu_object_get(&obj->do_lu);
3142         lfsck->li_obj_oit = obj;
3143         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3144         if (rc != 0)
3145                 GOTO(out, rc);
3146
3147         rc = lfsck_bookmark_setup(env, lfsck);
3148         if (rc != 0)
3149                 GOTO(out, rc);
3150
3151         if (master) {
3152                 rc = lfsck_fid_init(lfsck);
3153                 if (rc < 0)
3154                         GOTO(out, rc);
3155
3156                 rc = lfsck_namespace_setup(env, lfsck);
3157                 if (rc < 0)
3158                         GOTO(out, rc);
3159         }
3160
3161         rc = lfsck_layout_setup(env, lfsck);
3162         if (rc < 0)
3163                 GOTO(out, rc);
3164
3165         /* XXX: more LFSCK components initialization to be added here. */
3166
3167         rc = lfsck_instance_add(lfsck);
3168         if (rc == 0)
3169                 rc = lfsck_add_target_from_orphan(env, lfsck);
3170 out:
3171         if (obj != NULL && !IS_ERR(obj))
3172                 lu_object_put(env, &obj->do_lu);
3173         if (root != NULL && !IS_ERR(root))
3174                 lu_object_put(env, &root->do_lu);
3175         if (rc != 0)
3176                 lfsck_instance_cleanup(env, lfsck);
3177         return rc;
3178 }
3179 EXPORT_SYMBOL(lfsck_register);
3180
3181 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3182 {
3183         struct lfsck_instance *lfsck;
3184
3185         lfsck = lfsck_instance_find(key, false, true);
3186         if (lfsck != NULL)
3187                 lfsck_instance_put(env, lfsck);
3188 }
3189 EXPORT_SYMBOL(lfsck_degister);
3190
3191 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3192                      struct dt_device *tgt, struct obd_export *exp,
3193                      __u32 index, bool for_ost)
3194 {
3195         struct lfsck_instance   *lfsck;
3196         struct lfsck_tgt_desc   *ltd;
3197         int                      rc;
3198         ENTRY;
3199
3200         OBD_ALLOC_PTR(ltd);
3201         if (ltd == NULL)
3202                 RETURN(-ENOMEM);
3203
3204         ltd->ltd_tgt = tgt;
3205         ltd->ltd_key = key;
3206         ltd->ltd_exp = exp;
3207         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3208         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3209         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3210         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3211         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3212         atomic_set(&ltd->ltd_ref, 1);
3213         ltd->ltd_index = index;
3214
3215         spin_lock(&lfsck_instance_lock);
3216         lfsck = __lfsck_instance_find(key, true, false);
3217         if (lfsck == NULL) {
3218                 if (for_ost)
3219                         list_add_tail(&ltd->ltd_orphan_list,
3220                                       &lfsck_ost_orphan_list);
3221                 else
3222                         list_add_tail(&ltd->ltd_orphan_list,
3223                                       &lfsck_mdt_orphan_list);
3224                 spin_unlock(&lfsck_instance_lock);
3225
3226                 RETURN(0);
3227         }
3228         spin_unlock(&lfsck_instance_lock);
3229
3230         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3231         if (rc != 0)
3232                 lfsck_tgt_put(ltd);
3233
3234         lfsck_instance_put(env, lfsck);
3235
3236         RETURN(rc);
3237 }
3238 EXPORT_SYMBOL(lfsck_add_target);
3239
3240 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3241                       struct dt_device *tgt, __u32 index, bool for_ost)
3242 {
3243         struct lfsck_instance   *lfsck;
3244         struct lfsck_tgt_descs  *ltds;
3245         struct lfsck_tgt_desc   *ltd;
3246         struct list_head        *head;
3247
3248         if (for_ost)
3249                 head = &lfsck_ost_orphan_list;
3250         else
3251                 head = &lfsck_mdt_orphan_list;
3252
3253         spin_lock(&lfsck_instance_lock);
3254         list_for_each_entry(ltd, head, ltd_orphan_list) {
3255                 if (ltd->ltd_tgt == tgt) {
3256                         list_del_init(&ltd->ltd_orphan_list);
3257                         spin_unlock(&lfsck_instance_lock);
3258                         lfsck_tgt_put(ltd);
3259
3260                         return;
3261                 }
3262         }
3263
3264         ltd = NULL;
3265         lfsck = __lfsck_instance_find(key, true, false);
3266         spin_unlock(&lfsck_instance_lock);
3267         if (unlikely(lfsck == NULL))
3268                 return;
3269
3270         if (for_ost)
3271                 ltds = &lfsck->li_ost_descs;
3272         else
3273                 ltds = &lfsck->li_mdt_descs;
3274
3275         down_write(&ltds->ltd_rw_sem);
3276         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3277
3278         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3279                 goto unlock;
3280
3281         ltd = LTD_TGT(ltds, index);
3282         if (unlikely(ltd == NULL))
3283                 goto unlock;
3284
3285         LASSERT(ltds->ltd_tgtnr > 0);
3286
3287         ltds->ltd_tgtnr--;
3288         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3289         LTD_TGT(ltds, index) = NULL;
3290
3291 unlock:
3292         if (ltd == NULL) {
3293                 if (for_ost)
3294                         head = &lfsck->li_ost_descs.ltd_orphan;
3295                 else
3296                         head = &lfsck->li_mdt_descs.ltd_orphan;
3297
3298                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3299                         if (ltd->ltd_tgt == tgt) {
3300                                 list_del_init(&ltd->ltd_orphan_list);
3301                                 break;
3302                         }
3303                 }
3304         }
3305
3306         up_write(&ltds->ltd_rw_sem);
3307         if (ltd != NULL) {
3308                 spin_lock(&ltds->ltd_lock);
3309                 ltd->ltd_dead = 1;
3310                 spin_unlock(&ltds->ltd_lock);
3311                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3312                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3313                 lfsck_tgt_put(ltd);
3314         }
3315
3316         lfsck_instance_put(env, lfsck);
3317 }
3318 EXPORT_SYMBOL(lfsck_del_target);
3319
3320 static int __init lfsck_init(void)
3321 {
3322         int rc;
3323
3324         INIT_LIST_HEAD(&lfsck_instance_list);
3325         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3326         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3327         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3328         rc = lu_context_key_register(&lfsck_thread_key);
3329         if (rc == 0) {
3330                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3331                 tgt_register_lfsck_query(lfsck_query);
3332         }
3333
3334         return rc;
3335 }
3336
3337 static void __exit lfsck_exit(void)
3338 {
3339         struct lfsck_tgt_desc *ltd;
3340         struct lfsck_tgt_desc *next;
3341
3342         LASSERT(list_empty(&lfsck_instance_list));
3343
3344         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3345                                  ltd_orphan_list) {
3346                 list_del_init(&ltd->ltd_orphan_list);
3347                 lfsck_tgt_put(ltd);
3348         }
3349
3350         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3351                                  ltd_orphan_list) {
3352                 list_del_init(&ltd->ltd_orphan_list);
3353                 lfsck_tgt_put(ltd);
3354         }
3355
3356         lu_context_key_degister(&lfsck_thread_key);
3357 }
3358
3359 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3360 MODULE_DESCRIPTION("LFSCK");
3361 MODULE_LICENSE("GPL");
3362
3363 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);