Whamcloud - gitweb
LU-6068 misc: update Intel copyright messages 2014
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         "create_mdtobj",
102         NULL
103 };
104
105 enum lfsck_verify_lpf_types {
106         LVLT_BY_BOOKMARK        = 0,
107         LVLT_BY_NAMEENTRY       = 1,
108 };
109
110 const char *lfsck_status2names(enum lfsck_status status)
111 {
112         if (unlikely(status < 0 || status >= LS_MAX))
113                 return "unknown";
114
115         return lfsck_status_names[status];
116 }
117
118 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
119 {
120         spin_lock_init(&ltds->ltd_lock);
121         init_rwsem(&ltds->ltd_rw_sem);
122         INIT_LIST_HEAD(&ltds->ltd_orphan);
123         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
124         if (ltds->ltd_tgts_bitmap == NULL)
125                 return -ENOMEM;
126
127         return 0;
128 }
129
130 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
131 {
132         struct lfsck_tgt_desc   *ltd;
133         struct lfsck_tgt_desc   *next;
134         int                      idx;
135
136         down_write(&ltds->ltd_rw_sem);
137
138         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
139                                  ltd_orphan_list) {
140                 list_del_init(&ltd->ltd_orphan_list);
141                 lfsck_tgt_put(ltd);
142         }
143
144         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
145                 up_write(&ltds->ltd_rw_sem);
146
147                 return;
148         }
149
150         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
151                 ltd = LTD_TGT(ltds, idx);
152                 if (likely(ltd != NULL)) {
153                         LASSERT(list_empty(&ltd->ltd_layout_list));
154                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_list));
156                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
157
158                         ltds->ltd_tgtnr--;
159                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
160                         LTD_TGT(ltds, idx) = NULL;
161                         lfsck_tgt_put(ltd);
162                 }
163         }
164
165         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
166                  ltds->ltd_tgtnr);
167
168         for (idx = 0; idx < TGT_PTRS; idx++) {
169                 if (ltds->ltd_tgts_idx[idx] != NULL) {
170                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
171                         ltds->ltd_tgts_idx[idx] = NULL;
172                 }
173         }
174
175         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
176         ltds->ltd_tgts_bitmap = NULL;
177         up_write(&ltds->ltd_rw_sem);
178 }
179
180 static int __lfsck_add_target(const struct lu_env *env,
181                               struct lfsck_instance *lfsck,
182                               struct lfsck_tgt_desc *ltd,
183                               bool for_ost, bool locked)
184 {
185         struct lfsck_tgt_descs *ltds;
186         __u32                   index = ltd->ltd_index;
187         int                     rc    = 0;
188         ENTRY;
189
190         if (for_ost)
191                 ltds = &lfsck->li_ost_descs;
192         else
193                 ltds = &lfsck->li_mdt_descs;
194
195         if (!locked)
196                 down_write(&ltds->ltd_rw_sem);
197
198         LASSERT(ltds->ltd_tgts_bitmap != NULL);
199
200         if (index >= ltds->ltd_tgts_bitmap->size) {
201                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
202                                     (__u32)BITS_PER_LONG);
203                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
204                 cfs_bitmap_t *new_bitmap;
205
206                 while (newsize < index + 1)
207                         newsize <<= 1;
208
209                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
210                 if (new_bitmap == NULL)
211                         GOTO(unlock, rc = -ENOMEM);
212
213                 if (ltds->ltd_tgtnr > 0)
214                         cfs_bitmap_copy(new_bitmap, old_bitmap);
215                 ltds->ltd_tgts_bitmap = new_bitmap;
216                 CFS_FREE_BITMAP(old_bitmap);
217         }
218
219         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
220                 CERROR("%s: the device %s (%u) is registered already\n",
221                        lfsck_lfsck2name(lfsck),
222                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
223                 GOTO(unlock, rc = -EEXIST);
224         }
225
226         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
227                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
228                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
229                         GOTO(unlock, rc = -ENOMEM);
230         }
231
232         LTD_TGT(ltds, index) = ltd;
233         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
234         ltds->ltd_tgtnr++;
235
236         GOTO(unlock, rc = 0);
237
238 unlock:
239         if (!locked)
240                 up_write(&ltds->ltd_rw_sem);
241
242         return rc;
243 }
244
245 static int lfsck_add_target_from_orphan(const struct lu_env *env,
246                                         struct lfsck_instance *lfsck)
247 {
248         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
249         struct lfsck_tgt_desc   *ltd;
250         struct lfsck_tgt_desc   *next;
251         struct list_head        *head    = &lfsck_ost_orphan_list;
252         int                      rc;
253         bool                     for_ost = true;
254
255 again:
256         spin_lock(&lfsck_instance_lock);
257         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
258                 if (ltd->ltd_key == lfsck->li_bottom)
259                         list_move_tail(&ltd->ltd_orphan_list,
260                                        &ltds->ltd_orphan);
261         }
262         spin_unlock(&lfsck_instance_lock);
263
264         down_write(&ltds->ltd_rw_sem);
265         while (!list_empty(&ltds->ltd_orphan)) {
266                 ltd = list_entry(ltds->ltd_orphan.next,
267                                  struct lfsck_tgt_desc,
268                                  ltd_orphan_list);
269                 list_del_init(&ltd->ltd_orphan_list);
270                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
271                 /* Do not hold the semaphore for too long time. */
272                 up_write(&ltds->ltd_rw_sem);
273                 if (rc != 0)
274                         return rc;
275
276                 down_write(&ltds->ltd_rw_sem);
277         }
278         up_write(&ltds->ltd_rw_sem);
279
280         if (for_ost) {
281                 ltds = &lfsck->li_mdt_descs;
282                 head = &lfsck_mdt_orphan_list;
283                 for_ost = false;
284                 goto again;
285         }
286
287         return 0;
288 }
289
290 static inline struct lfsck_component *
291 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
292                        struct list_head *list)
293 {
294         struct lfsck_component *com;
295
296         list_for_each_entry(com, list, lc_link) {
297                 if (com->lc_type == type)
298                         return com;
299         }
300         return NULL;
301 }
302
303 struct lfsck_component *
304 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
305 {
306         struct lfsck_component *com;
307
308         spin_lock(&lfsck->li_lock);
309         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
310         if (com != NULL)
311                 goto unlock;
312
313         com = __lfsck_component_find(lfsck, type,
314                                      &lfsck->li_list_double_scan);
315         if (com != NULL)
316                 goto unlock;
317
318         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
319
320 unlock:
321         if (com != NULL)
322                 lfsck_component_get(com);
323         spin_unlock(&lfsck->li_lock);
324         return com;
325 }
326
327 void lfsck_component_cleanup(const struct lu_env *env,
328                              struct lfsck_component *com)
329 {
330         if (!list_empty(&com->lc_link))
331                 list_del_init(&com->lc_link);
332         if (!list_empty(&com->lc_link_dir))
333                 list_del_init(&com->lc_link_dir);
334
335         lfsck_component_put(env, com);
336 }
337
338 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
339                     struct lu_fid *fid, bool locked)
340 {
341         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
342         int                      rc = 0;
343         ENTRY;
344
345         if (!locked)
346                 mutex_lock(&lfsck->li_mutex);
347
348         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
349         if (rc >= 0) {
350                 bk->lb_last_fid = *fid;
351                 /* We do not care about whether the subsequent sub-operations
352                  * failed or not. The worst case is that one FID is lost that
353                  * is not a big issue for the LFSCK since it is relative rare
354                  * for LFSCK create. */
355                 rc = lfsck_bookmark_store(env, lfsck);
356         }
357
358         if (!locked)
359                 mutex_unlock(&lfsck->li_mutex);
360
361         RETURN(rc);
362 }
363
364 /**
365  * Request the specified ibits lock for the given object.
366  *
367  * Before the LFSCK modifying on the namespace visible object,
368  * it needs to acquire related ibits ldlm lock.
369  *
370  * \param[in] env       pointer to the thread context
371  * \param[in] lfsck     pointer to the lfsck instance
372  * \param[in] obj       pointer to the dt_object to be locked
373  * \param[out] lh       pointer to the lock handle
374  * \param[in] ibits     the bits for the ldlm lock to be acquired
375  * \param[in] mode      the mode for the ldlm lock to be acquired
376  *
377  * \retval              0 for success
378  * \retval              negative error number on failure
379  */
380 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
381                      struct dt_object *obj, struct lustre_handle *lh,
382                      __u64 bits, ldlm_mode_t mode)
383 {
384         struct lfsck_thread_info        *info   = lfsck_env_info(env);
385         ldlm_policy_data_t              *policy = &info->lti_policy;
386         struct ldlm_res_id              *resid  = &info->lti_resid;
387         __u64                            flags  = LDLM_FL_ATOMIC_CB;
388         int                              rc;
389
390         LASSERT(lfsck->li_namespace != NULL);
391
392         memset(policy, 0, sizeof(*policy));
393         policy->l_inodebits.bits = bits;
394         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
395         if (dt_object_remote(obj)) {
396                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
397
398                 memset(einfo, 0, sizeof(*einfo));
399                 einfo->ei_type = LDLM_IBITS;
400                 einfo->ei_mode = mode;
401                 einfo->ei_cb_bl = ldlm_blocking_ast;
402                 einfo->ei_cb_cp = ldlm_completion_ast;
403                 einfo->ei_res_id = resid;
404
405                 rc = dt_object_lock(env, obj, lh, einfo, policy);
406         } else {
407                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
408                                             LDLM_IBITS, policy, mode,
409                                             &flags, ldlm_blocking_ast,
410                                             ldlm_completion_ast, NULL, NULL,
411                                             0, LVB_T_NONE, NULL, lh);
412         }
413
414         if (rc == ELDLM_OK) {
415                 rc = 0;
416         } else {
417                 memset(lh, 0, sizeof(*lh));
418                 rc = -EIO;
419         }
420
421         return rc;
422 }
423
424 /**
425  * Release the the specified ibits lock.
426  *
427  * If the lock has been acquired before, release it
428  * and cleanup the handle. Otherwise, do nothing.
429  *
430  * \param[in] lh        pointer to the lock handle
431  * \param[in] mode      the mode for the ldlm lock to be released
432  */
433 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
434 {
435         if (lustre_handle_is_used(lh)) {
436                 ldlm_lock_decref(lh, mode);
437                 memset(lh, 0, sizeof(*lh));
438         }
439 }
440
441 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
442                               struct lfsck_instance *lfsck,
443                               const struct lu_fid *fid)
444 {
445         struct seq_server_site  *ss     =
446                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
447         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
448         int                      rc;
449
450         fld_range_set_mdt(range);
451         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
452         if (rc == 0)
453                 rc = range->lsr_index;
454
455         return rc;
456 }
457
458 const char dot[] = ".";
459 const char dotdot[] = "..";
460 static const char dotlustre[] = ".lustre";
461 static const char lostfound[] = "lost+found";
462
463 static int lfsck_create_lpf_local(const struct lu_env *env,
464                                   struct lfsck_instance *lfsck,
465                                   struct dt_object *parent,
466                                   struct dt_object *child,
467                                   struct lu_attr *la,
468                                   struct dt_object_format *dof,
469                                   const char *name)
470 {
471         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
472         struct dt_device        *dev    = lfsck->li_bottom;
473         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
474         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
475         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
476         struct thandle          *th     = NULL;
477         struct linkea_data       ldata  = { 0 };
478         struct lu_buf            linkea_buf;
479         const struct lu_name    *cname;
480         loff_t                   pos    = 0;
481         int                      len    = sizeof(struct lfsck_bookmark);
482         int                      rc;
483         ENTRY;
484
485         rc = linkea_data_new(&ldata,
486                              &lfsck_env_info(env)->lti_linkea_buf2);
487         if (rc != 0)
488                 RETURN(rc);
489
490         cname = lfsck_name_get_const(env, name, strlen(name));
491         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
492         if (rc != 0)
493                 RETURN(rc);
494
495         th = dt_trans_create(env, dev);
496         if (IS_ERR(th))
497                 RETURN(PTR_ERR(th));
498
499         /* 1a. create child */
500         rc = dt_declare_create(env, child, la, NULL, dof, th);
501         if (rc != 0)
502                 GOTO(stop, rc);
503
504         /* 2a. increase child nlink */
505         rc = dt_declare_ref_add(env, child, th);
506         if (rc != 0)
507                 GOTO(stop, rc);
508
509         /* 3a. insert linkEA for child */
510         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
511                        ldata.ld_leh->leh_len);
512         rc = dt_declare_xattr_set(env, child, &linkea_buf,
513                                   XATTR_NAME_LINK, 0, th);
514         if (rc != 0)
515                 GOTO(stop, rc);
516
517         /* 4a. insert name into parent dir */
518         rec->rec_type = S_IFDIR;
519         rec->rec_fid = cfid;
520         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
521                                (const struct dt_key *)name, th);
522         if (rc != 0)
523                 GOTO(stop, rc);
524
525         /* 5a. increase parent nlink */
526         rc = dt_declare_ref_add(env, parent, th);
527         if (rc != 0)
528                 GOTO(stop, rc);
529
530         /* 6a. update bookmark */
531         rc = dt_declare_record_write(env, bk_obj,
532                                      lfsck_buf_get(env, bk, len), 0, th);
533         if (rc != 0)
534                 GOTO(stop, rc);
535
536         rc = dt_trans_start_local(env, dev, th);
537         if (rc != 0)
538                 GOTO(stop, rc);
539
540         dt_write_lock(env, child, 0);
541         /* 1b.1. create child */
542         rc = dt_create(env, child, la, NULL, dof, th);
543         if (rc != 0)
544                 GOTO(unlock, rc);
545
546         if (unlikely(!dt_try_as_dir(env, child)))
547                 GOTO(unlock, rc = -ENOTDIR);
548
549         /* 1b.2. insert dot into child dir */
550         rec->rec_fid = cfid;
551         rc = dt_insert(env, child, (const struct dt_rec *)rec,
552                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
553         if (rc != 0)
554                 GOTO(unlock, rc);
555
556         /* 1b.3. insert dotdot into child dir */
557         rec->rec_fid = &LU_LPF_FID;
558         rc = dt_insert(env, child, (const struct dt_rec *)rec,
559                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
560         if (rc != 0)
561                 GOTO(unlock, rc);
562
563         /* 2b. increase child nlink */
564         rc = dt_ref_add(env, child, th);
565         if (rc != 0)
566                 GOTO(unlock, rc);
567
568         /* 3b. insert linkEA for child. */
569         rc = dt_xattr_set(env, child, &linkea_buf,
570                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
571         dt_write_unlock(env, child);
572         if (rc != 0)
573                 GOTO(stop, rc);
574
575         /* 4b. insert name into parent dir */
576         rec->rec_fid = cfid;
577         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
578                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
579         if (rc != 0)
580                 GOTO(stop, rc);
581
582         dt_write_lock(env, parent, 0);
583         /* 5b. increase parent nlink */
584         rc = dt_ref_add(env, parent, th);
585         dt_write_unlock(env, parent);
586         if (rc != 0)
587                 GOTO(stop, rc);
588
589         bk->lb_lpf_fid = *cfid;
590         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
591
592         /* 6b. update bookmark */
593         rc = dt_record_write(env, bk_obj,
594                              lfsck_buf_get(env, bk, len), &pos, th);
595
596         GOTO(stop, rc);
597
598 unlock:
599         dt_write_unlock(env, child);
600
601 stop:
602         dt_trans_stop(env, dev, th);
603
604         return rc;
605 }
606
607 static int lfsck_create_lpf_remote(const struct lu_env *env,
608                                    struct lfsck_instance *lfsck,
609                                    struct dt_object *parent,
610                                    struct dt_object *child,
611                                    struct lu_attr *la,
612                                    struct dt_object_format *dof,
613                                    const char *name)
614 {
615         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
616         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
617         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
618         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
619         struct thandle          *th     = NULL;
620         struct linkea_data       ldata  = { 0 };
621         struct lu_buf            linkea_buf;
622         const struct lu_name    *cname;
623         struct dt_device        *dev;
624         loff_t                   pos    = 0;
625         int                      len    = sizeof(struct lfsck_bookmark);
626         int                      rc;
627         ENTRY;
628
629         rc = linkea_data_new(&ldata,
630                              &lfsck_env_info(env)->lti_linkea_buf2);
631         if (rc != 0)
632                 RETURN(rc);
633
634         cname = lfsck_name_get_const(env, name, strlen(name));
635         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
636         if (rc != 0)
637                 RETURN(rc);
638
639         /* Create .lustre/lost+found/MDTxxxx. */
640
641         /* XXX: Currently, cross-MDT create operation needs to create the child
642          *      object firstly, then insert name into the parent directory. For
643          *      this case, the child object resides on current MDT (local), but
644          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
645          *      easy to contain all the sub-modifications orderly within single
646          *      transaction.
647          *
648          *      To avoid more inconsistency, we split the create operation into
649          *      two transactions:
650          *
651          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
652          *         locally.
653          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
654          *         remotely.
655          *
656          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
657          *      repair such inconsistency when LFSCK run next time. */
658
659         /* Transaction I: locally */
660
661         dev = lfsck->li_bottom;
662         th = dt_trans_create(env, dev);
663         if (IS_ERR(th))
664                 RETURN(PTR_ERR(th));
665
666         /* 1a. create child */
667         rc = dt_declare_create(env, child, la, NULL, dof, th);
668         if (rc != 0)
669                 GOTO(stop, rc);
670
671         /* 2a. increase child nlink */
672         rc = dt_declare_ref_add(env, child, th);
673         if (rc != 0)
674                 GOTO(stop, rc);
675
676         /* 3a. insert linkEA for child */
677         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
678                        ldata.ld_leh->leh_len);
679         rc = dt_declare_xattr_set(env, child, &linkea_buf,
680                                   XATTR_NAME_LINK, 0, th);
681         if (rc != 0)
682                 GOTO(stop, rc);
683
684         /* 4a. update bookmark */
685         rc = dt_declare_record_write(env, bk_obj,
686                                      lfsck_buf_get(env, bk, len), 0, th);
687         if (rc != 0)
688                 GOTO(stop, rc);
689
690         rc = dt_trans_start_local(env, dev, th);
691         if (rc != 0)
692                 GOTO(stop, rc);
693
694         dt_write_lock(env, child, 0);
695         /* 1b.1. create child */
696         rc = dt_create(env, child, la, NULL, dof, th);
697         if (rc != 0)
698                 GOTO(unlock, rc);
699
700         if (unlikely(!dt_try_as_dir(env, child)))
701                 GOTO(unlock, rc = -ENOTDIR);
702
703         /* 1b.2. insert dot into child dir */
704         rec->rec_type = S_IFDIR;
705         rec->rec_fid = cfid;
706         rc = dt_insert(env, child, (const struct dt_rec *)rec,
707                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
708         if (rc != 0)
709                 GOTO(unlock, rc);
710
711         /* 1b.3. insert dotdot into child dir */
712         rec->rec_fid = &LU_LPF_FID;
713         rc = dt_insert(env, child, (const struct dt_rec *)rec,
714                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
715         if (rc != 0)
716                 GOTO(unlock, rc);
717
718         /* 2b. increase child nlink */
719         rc = dt_ref_add(env, child, th);
720         if (rc != 0)
721                 GOTO(unlock, rc);
722
723         /* 3b. insert linkEA for child */
724         rc = dt_xattr_set(env, child, &linkea_buf,
725                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
726         if (rc != 0)
727                 GOTO(unlock, rc);
728
729         bk->lb_lpf_fid = *cfid;
730         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
731
732         /* 4b. update bookmark */
733         rc = dt_record_write(env, bk_obj,
734                              lfsck_buf_get(env, bk, len), &pos, th);
735
736         dt_write_unlock(env, child);
737         dt_trans_stop(env, dev, th);
738         if (rc != 0)
739                 RETURN(rc);
740
741         /* Transaction II: remotely */
742
743         dev = lfsck->li_next;
744         th = dt_trans_create(env, dev);
745         if (IS_ERR(th))
746                 RETURN(PTR_ERR(th));
747
748         /* 5a. insert name into parent dir */
749         rec->rec_fid = cfid;
750         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
751                                (const struct dt_key *)name, th);
752         if (rc != 0)
753                 GOTO(stop, rc);
754
755         /* 6a. increase parent nlink */
756         rc = dt_declare_ref_add(env, parent, th);
757         if (rc != 0)
758                 GOTO(stop, rc);
759
760         rc = dt_trans_start(env, dev, th);
761         if (rc != 0)
762                 GOTO(stop, rc);
763
764         /* 5b. insert name into parent dir */
765         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
766                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
767         if (rc != 0)
768                 GOTO(stop, rc);
769
770         dt_write_lock(env, parent, 0);
771         /* 6b. increase parent nlink */
772         rc = dt_ref_add(env, parent, th);
773         dt_write_unlock(env, parent);
774
775         GOTO(stop, rc);
776
777 unlock:
778         dt_write_unlock(env, child);
779 stop:
780         dt_trans_stop(env, dev, th);
781
782         if (rc != 0 && dev == lfsck->li_next)
783                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
784                        "for orphans, but failed to insert the name %s "
785                        "to the .lustre/lost+found/. Such inconsistency "
786                        "will be repaired when LFSCK run next time: rc = %d\n",
787                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
788
789         return rc;
790 }
791
792 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
793  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
794  * only when it is required, such as orphan OST-objects repairing. */
795 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
796 {
797         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
798         struct lfsck_thread_info *info  = lfsck_env_info(env);
799         struct lu_fid            *cfid  = &info->lti_fid2;
800         struct lu_attr           *la    = &info->lti_la;
801         struct dt_object_format  *dof   = &info->lti_dof;
802         struct dt_object         *parent = NULL;
803         struct dt_object         *child = NULL;
804         struct lustre_handle      lh    = { 0 };
805         char                      name[8];
806         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
807         int                       rc    = 0;
808         ENTRY;
809
810         LASSERT(lfsck->li_master);
811
812         sprintf(name, "MDT%04x", node);
813         if (node == 0) {
814                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
815                                                   &LU_LPF_FID);
816         } else {
817                 struct lfsck_tgt_desc *ltd;
818
819                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
820                 if (unlikely(ltd == NULL))
821                         RETURN(-ENXIO);
822
823                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
824                                                   &LU_LPF_FID);
825                 lfsck_tgt_put(ltd);
826         }
827         if (IS_ERR(parent))
828                 RETURN(PTR_ERR(parent));
829
830         if (lfsck->li_lpf_obj != NULL)
831                 GOTO(out, rc = 0);
832
833         if (unlikely(!dt_try_as_dir(env, parent)))
834                 GOTO(out, rc = -ENOTDIR);
835
836         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
837                               MDS_INODELOCK_UPDATE, LCK_EX);
838         if (rc != 0)
839                 GOTO(out, rc);
840
841         mutex_lock(&lfsck->li_mutex);
842         if (lfsck->li_lpf_obj != NULL)
843                 GOTO(unlock, rc = 0);
844
845         if (fid_is_zero(&bk->lb_lpf_fid)) {
846                 /* There is corner case that: in former LFSCK scanning we have
847                  * created the .lustre/lost+found/MDTxxxx but failed to update
848                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
849                  * it from MDT0 firstly. */
850                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
851                                (const struct dt_key *)name, BYPASS_CAPA);
852                 if (rc != 0 && rc != -ENOENT)
853                         GOTO(unlock, rc);
854
855                 if (rc == 0) {
856                         bk->lb_lpf_fid = *cfid;
857                         rc = lfsck_bookmark_store(env, lfsck);
858                 } else {
859                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
860                 }
861                 if (rc != 0)
862                         GOTO(unlock, rc);
863         } else {
864                 *cfid = bk->lb_lpf_fid;
865         }
866
867         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
868         if (IS_ERR(child))
869                 GOTO(unlock, rc = PTR_ERR(child));
870
871         if (dt_object_exists(child) != 0) {
872                 if (unlikely(!dt_try_as_dir(env, child)))
873                         rc = -ENOTDIR;
874                 else
875                         lfsck->li_lpf_obj = child;
876
877                 GOTO(unlock, rc);
878         }
879
880         memset(la, 0, sizeof(*la));
881         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
882         la->la_mode = S_IFDIR | S_IRWXU;
883         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
884                        LA_UID | LA_GID;
885         memset(dof, 0, sizeof(*dof));
886         dof->dof_type = dt_mode_to_dft(S_IFDIR);
887
888         if (node == 0)
889                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
890                                             dof, name);
891         else
892                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
893                                              dof, name);
894         if (rc == 0)
895                 lfsck->li_lpf_obj = child;
896
897         GOTO(unlock, rc);
898
899 unlock:
900         mutex_unlock(&lfsck->li_mutex);
901         lfsck_ibits_unlock(&lh, LCK_EX);
902         if (rc != 0 && child != NULL && !IS_ERR(child))
903                 lu_object_put(env, &child->do_lu);
904 out:
905         if (parent != NULL && !IS_ERR(parent))
906                 lu_object_put(env, &parent->do_lu);
907
908         return rc;
909 }
910
911 /**
912  * Scan .lustre/lost+found for bad name entries and remove them.
913  *
914  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
915  * index in the system. Any other formatted name is invalid and should be
916  * removed.
917  *
918  * \param[in] env       pointer to the thread context
919  * \param[in] lfsck     pointer to the lfsck instance
920  * \param[in] parent    pointer to the lost+found object
921  *
922  * \retval              0 for success
923  * \retval              negative error number on failure
924  */
925 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
926                                       struct lfsck_instance *lfsck,
927                                       struct dt_object *parent)
928 {
929         struct lu_dirent        *ent    =
930                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
931         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
932         struct dt_it            *it;
933         int                      rc;
934         ENTRY;
935
936         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
937         if (IS_ERR(it))
938                 RETURN(PTR_ERR(it));
939
940         rc = iops->load(env, it, 0);
941         if (rc == 0)
942                 rc = iops->next(env, it);
943         else if (rc > 0)
944                 rc = 0;
945
946         while (rc == 0) {
947                 int off = 3;
948
949                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
950                 if (rc != 0)
951                         break;
952
953                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
954                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
955                         goto next;
956
957                 /* name length must be strlen("MDTxxxx") */
958                 if (ent->lde_namelen != 7)
959                         goto remove;
960
961                 if (memcmp(ent->lde_name, "MDT", off) != 0)
962                         goto remove;
963
964                 while (off < 7 && isxdigit(ent->lde_name[off]))
965                         off++;
966
967                 if (off != 7) {
968
969 remove:
970                         rc = lfsck_remove_name_entry(env, lfsck, parent,
971                                                      ent->lde_name, S_IFDIR);
972                         if (rc != 0)
973                                 break;
974                 }
975
976 next:
977                 rc = iops->next(env, it);
978         }
979
980         iops->put(env, it);
981         iops->fini(env, it);
982
983         RETURN(rc > 0 ? 0 : rc);
984 }
985
986 static int lfsck_update_lpf_entry(const struct lu_env *env,
987                                   struct lfsck_instance *lfsck,
988                                   struct dt_object *parent,
989                                   struct dt_object *child,
990                                   const char *name,
991                                   enum lfsck_verify_lpf_types type)
992 {
993         int rc;
994
995         if (type == LVLT_BY_BOOKMARK) {
996                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
997                                              lfsck_dto2fid(child), S_IFDIR);
998         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
999                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1000                 rc = lfsck_bookmark_store(env, lfsck);
1001
1002                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1003                        " in the bookmark file: rc = %d\n",
1004                        lfsck_lfsck2name(lfsck),
1005                        PFID(lfsck_dto2fid(child)), rc);
1006         }
1007
1008         return rc;
1009 }
1010
1011 /**
1012  * Check whether the @child back references the @parent.
1013  *
1014  * Two cases:
1015  * 1) The child's FID is stored in the bookmark file. If the child back
1016  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1017  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1018  *    the child back references another parent2, then:
1019  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1020  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1021  *      references the child. So keep them there. As the LFSCK processing,
1022  *      the parent3 may be found, then when the LFSCK run next time, the
1023  *      inconsistency can be repaired.
1024  *
1025  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1026  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1027  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1028  *    back references another parent2, then:
1029  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1030  *      from .lustre/lost+found/;
1031  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1032  *      sub-directory name entry and update the child;
1033  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1034  *      or not, then keep them there.
1035  *
1036  * \param[in] env       pointer to the thread context
1037  * \param[in] lfsck     pointer to the lfsck instance
1038  * \param[in] parent    pointer to the lost+found object
1039  * \param[in] child     pointer to the lost+found sub-directory object
1040  * \param[in] name      the name for lost+found sub-directory object
1041  * \param[out] fid      pointer to the buffer to hold the FID of the object
1042  *                      (called it as parent2) that is referenced via the
1043  *                      child's dotdot entry; it also can be the FID that
1044  *                      is referenced by the name entry under the parent2.
1045  * \param[in] type      to indicate where the child's FID is stored in
1046  *
1047  * \retval              positive number for uncertain inconsistency
1048  * \retval              0 for success
1049  * \retval              negative error number on failure
1050  */
1051 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1052                                   struct lfsck_instance *lfsck,
1053                                   struct dt_object *parent,
1054                                   struct dt_object *child, const char *name,
1055                                   struct lu_fid *fid,
1056                                   enum lfsck_verify_lpf_types type)
1057 {
1058         struct lfsck_thread_info *info    = lfsck_env_info(env);
1059         char                     *name2   = info->lti_key;
1060         struct lu_fid            *fid2    = &info->lti_fid3;
1061         struct dt_object         *parent2 = NULL;
1062         struct lustre_handle      lh      = { 0 };
1063         int                       rc;
1064         ENTRY;
1065
1066         fid_zero(fid);
1067         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1068                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1069         if (rc != 0)
1070                 GOTO(linkea, rc);
1071
1072         if (!fid_is_sane(fid))
1073                 GOTO(linkea, rc = -EINVAL);
1074
1075         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1076                 const struct lu_name *cname;
1077
1078                 if (lfsck->li_lpf_obj == NULL) {
1079                         lu_object_get(&child->do_lu);
1080                         lfsck->li_lpf_obj = child;
1081                 }
1082
1083                 cname = lfsck_name_get_const(env, name, strlen(name));
1084                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1085                                          &LU_LPF_FID);
1086                 if (rc == 0)
1087                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1088                                                     name, type);
1089
1090                 GOTO(out_done, rc);
1091         }
1092
1093         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1094         if (IS_ERR(parent2))
1095                 GOTO(linkea, parent2);
1096
1097         if (!dt_object_exists(parent2)) {
1098                 lu_object_put(env, &parent2->do_lu);
1099
1100                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1101         }
1102
1103         if (!dt_try_as_dir(env, parent2)) {
1104                 lu_object_put(env, &parent2->do_lu);
1105
1106                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1107         }
1108
1109 linkea:
1110         /* To prevent rename/unlink race */
1111         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1112                               MDS_INODELOCK_UPDATE, LCK_PR);
1113         if (rc != 0)
1114                 GOTO(out_put, rc);
1115
1116         dt_read_lock(env, child, 0);
1117         rc = lfsck_links_get_first(env, child, name2, fid2);
1118         if (rc != 0) {
1119                 dt_read_unlock(env, child);
1120                 lfsck_ibits_unlock(&lh, LCK_PR);
1121
1122                 GOTO(out_put, rc = 1);
1123         }
1124
1125         /* It is almost impossible that the bookmark file (or the name entry)
1126          * and the linkEA hit the same data corruption. Trust the linkEA. */
1127         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1128                 dt_read_unlock(env, child);
1129                 lfsck_ibits_unlock(&lh, LCK_PR);
1130
1131                 *fid = *fid2;
1132                 if (lfsck->li_lpf_obj == NULL) {
1133                         lu_object_get(&child->do_lu);
1134                         lfsck->li_lpf_obj = child;
1135                 }
1136
1137                 /* Update the child's dotdot entry */
1138                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1139                                              &LU_LPF_FID, S_IFDIR);
1140                 if (rc == 0)
1141                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1142                                                     name, type);
1143
1144                 GOTO(out_put, rc);
1145         }
1146
1147         if (parent2 == NULL || IS_ERR(parent2)) {
1148                 dt_read_unlock(env, child);
1149                 lfsck_ibits_unlock(&lh, LCK_PR);
1150
1151                 GOTO(out_done, rc = 1);
1152         }
1153
1154         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1155                        (const struct dt_key *)name2, BYPASS_CAPA);
1156         dt_read_unlock(env, child);
1157         lfsck_ibits_unlock(&lh, LCK_PR);
1158         if (rc != 0 && rc != -ENOENT)
1159                 GOTO(out_put, rc);
1160
1161         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1162                 if (type == LVLT_BY_BOOKMARK)
1163                         GOTO(out_put, rc = 1);
1164
1165                 /* Trust the name entry, update the child's dotdot entry. */
1166                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1167                                              &LU_LPF_FID, S_IFDIR);
1168
1169                 GOTO(out_put, rc);
1170         }
1171
1172         if (type == LVLT_BY_BOOKMARK) {
1173                 /* Invalid FID record in the bookmark file, reset it. */
1174                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1175                 rc = lfsck_bookmark_store(env, lfsck);
1176
1177                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1178                        " in the bookmark file: rc = %d\n",
1179                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1180         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1181                 /* The name entry is wrong, remove it. */
1182                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1183         }
1184
1185         GOTO(out_put, rc);
1186
1187 out_put:
1188         if (parent2 != NULL && !IS_ERR(parent2))
1189                 lu_object_put(env, &parent2->do_lu);
1190
1191 out_done:
1192         return rc;
1193 }
1194
1195 /**
1196  * Verify the /ROOT/.lustre/lost+found/ directory.
1197  *
1198  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1199  * the LFSCK does not exactly know how to handle, such as orphans. So before
1200  * the LFSCK scanning the system, the consistency of such directory needs to
1201  * be verified firstly to allow the users to use it during the LFSCK.
1202  *
1203  * \param[in] env       pointer to the thread context
1204  * \param[in] lfsck     pointer to the lfsck instance
1205  *
1206  * \retval              positive number for uncertain inconsistency
1207  * \retval              0 for success
1208  * \retval              negative error number on failure
1209  */
1210 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1211 {
1212         struct lfsck_thread_info *info   = lfsck_env_info(env);
1213         struct lu_fid            *pfid   = &info->lti_fid;
1214         struct lu_fid            *cfid   = &info->lti_fid2;
1215         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1216         struct dt_object         *parent = NULL;
1217         /* child1's FID is in the bookmark file. */
1218         struct dt_object         *child1 = NULL;
1219         /* child2's FID is in the name entry MDTxxxx. */
1220         struct dt_object         *child2 = NULL;
1221         struct dt_device         *dev    = lfsck->li_bottom;
1222         const struct lu_name     *cname;
1223         char                      name[8];
1224         int                       node   = lfsck_dev_idx(dev);
1225         int                       rc     = 0;
1226         ENTRY;
1227
1228         LASSERT(lfsck->li_master);
1229
1230         if (node == 0) {
1231                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1232         } else {
1233                 struct lfsck_tgt_desc *ltd;
1234
1235                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1236                 if (unlikely(ltd == NULL))
1237                         RETURN(-ENXIO);
1238
1239                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1240                                                   &LU_LPF_FID);
1241                 lfsck_tgt_put(ltd);
1242         }
1243
1244         if (IS_ERR(parent))
1245                 RETURN(PTR_ERR(parent));
1246
1247         LASSERT(dt_object_exists(parent));
1248
1249         if (unlikely(!dt_try_as_dir(env, parent)))
1250                 GOTO(put, rc = -ENOTDIR);
1251
1252         if (node == 0) {
1253                 rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
1254                 if (rc != 0)
1255                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1256                                "for bad sub-directories: rc = %d\n",
1257                                lfsck_lfsck2name(lfsck), rc);
1258         }
1259
1260         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1261                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1262                         struct lu_fid tfid = bk->lb_lpf_fid;
1263
1264                         /* Invalid FID record in the bookmark file, reset it. */
1265                         fid_zero(&bk->lb_lpf_fid);
1266                         rc = lfsck_bookmark_store(env, lfsck);
1267
1268                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1269                                " in the bookmark file: rc = %d\n",
1270                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1271
1272                         if (rc != 0)
1273                                 GOTO(put, rc);
1274                 } else {
1275                         child1 = lfsck_object_find_by_dev(env, dev,
1276                                                           &bk->lb_lpf_fid);
1277                         if (IS_ERR(child1))
1278                                 GOTO(put, rc = PTR_ERR(child1));
1279
1280                         if (unlikely(!dt_object_exists(child1) ||
1281                                      dt_object_remote(child1)) ||
1282                                      !S_ISDIR(lfsck_object_type(child1))) {
1283                                 /* Invalid FID record in the bookmark file,
1284                                  * reset it. */
1285                                 fid_zero(&bk->lb_lpf_fid);
1286                                 rc = lfsck_bookmark_store(env, lfsck);
1287
1288                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1289                                        " in the bookmark file: rc = %d\n",
1290                                        lfsck_lfsck2name(lfsck),
1291                                        PFID(lfsck_dto2fid(child1)), rc);
1292
1293                                 if (rc != 0)
1294                                         GOTO(put, rc);
1295
1296                                 lu_object_put(env, &child1->do_lu);
1297                                 child1 = NULL;
1298                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1299                                 GOTO(put, rc = -ENOTDIR);
1300                         }
1301                 }
1302         }
1303
1304         snprintf(name, 8, "MDT%04x", node);
1305         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1306                        (const struct dt_key *)name, BYPASS_CAPA);
1307         if (rc == -ENOENT) {
1308                 if (!fid_is_zero(&bk->lb_lpf_fid))
1309                         goto check_child1;
1310
1311                 GOTO(put, rc = 0);
1312         }
1313
1314         if (rc != 0)
1315                 GOTO(put, rc);
1316
1317         /* Invalid FID in the name entry, remove the name entry. */
1318         if (!fid_is_norm(cfid)) {
1319                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1320                 if (rc != 0)
1321                         GOTO(put, rc);
1322
1323                 goto check_child1;
1324         }
1325
1326         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1327         if (IS_ERR(child2))
1328                 GOTO(put, rc = PTR_ERR(child2));
1329
1330         if (unlikely(!dt_object_exists(child2) ||
1331                      dt_object_remote(child2)) ||
1332                      !S_ISDIR(lfsck_object_type(child2))) {
1333                 rc = lfsck_remove_name_entry(env, lfsck, parent, name,
1334                                              S_IFDIR);
1335                 if (rc != 0)
1336                         GOTO(put, rc);
1337
1338                 goto check_child1;
1339         }
1340
1341         if (unlikely(!dt_try_as_dir(env, child2)))
1342                 GOTO(put, rc = -ENOTDIR);
1343
1344         if (child1 == NULL) {
1345                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
1346                                             pfid, LVLT_BY_NAMEENTRY);
1347         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1348                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1349                                             pfid, LVLT_BY_BOOKMARK);
1350                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1351                         rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
1352                                                     name, pfid,
1353                                                     LVLT_BY_NAMEENTRY);
1354         } else {
1355                 if (lfsck->li_lpf_obj == NULL) {
1356                         lu_object_get(&child2->do_lu);
1357                         lfsck->li_lpf_obj = child2;
1358                 }
1359
1360                 cname = lfsck_name_get_const(env, name, strlen(name));
1361                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1362         }
1363
1364         GOTO(put, rc);
1365
1366 check_child1:
1367         if (child1 != NULL)
1368                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1369                                             pfid, LVLT_BY_BOOKMARK);
1370
1371         GOTO(put, rc);
1372
1373 put:
1374         if (lfsck->li_lpf_obj != NULL &&
1375             unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
1376                 rc = -ENOTDIR;
1377
1378         if (child2 != NULL && !IS_ERR(child2))
1379                 lu_object_put(env, &child2->do_lu);
1380         if (child1 != NULL && !IS_ERR(child1))
1381                 lu_object_put(env, &child1->do_lu);
1382         if (parent != NULL && !IS_ERR(parent))
1383                 lu_object_put(env, &parent->do_lu);
1384
1385         return rc;
1386 }
1387
1388 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1389 {
1390         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1391         struct seq_server_site  *ss;
1392         char                    *prefix;
1393         int                      rc     = 0;
1394         ENTRY;
1395
1396         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1397         if (unlikely(ss == NULL))
1398                 RETURN(-ENXIO);
1399
1400         OBD_ALLOC_PTR(lfsck->li_seq);
1401         if (lfsck->li_seq == NULL)
1402                 RETURN(-ENOMEM);
1403
1404         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1405         if (prefix == NULL)
1406                 GOTO(out, rc = -ENOMEM);
1407
1408         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1409         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1410                              ss->ss_server_seq);
1411         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1412         if (rc != 0)
1413                 GOTO(out, rc);
1414
1415         if (fid_is_sane(&bk->lb_last_fid))
1416                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1417
1418         RETURN(0);
1419
1420 out:
1421         OBD_FREE_PTR(lfsck->li_seq);
1422         lfsck->li_seq = NULL;
1423
1424         return rc;
1425 }
1426
1427 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1428 {
1429         if (lfsck->li_seq != NULL) {
1430                 seq_client_fini(lfsck->li_seq);
1431                 OBD_FREE_PTR(lfsck->li_seq);
1432                 lfsck->li_seq = NULL;
1433         }
1434 }
1435
1436 void lfsck_instance_cleanup(const struct lu_env *env,
1437                             struct lfsck_instance *lfsck)
1438 {
1439         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1440         struct lfsck_component  *com;
1441         struct lfsck_component  *next;
1442         struct lfsck_lmv_unit   *llu;
1443         struct lfsck_lmv_unit   *llu_next;
1444         struct lfsck_lmv        *llmv;
1445         ENTRY;
1446
1447         LASSERT(list_empty(&lfsck->li_link));
1448         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1449
1450         if (lfsck->li_obj_oit != NULL) {
1451                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1452                 lfsck->li_obj_oit = NULL;
1453         }
1454
1455         LASSERT(lfsck->li_obj_dir == NULL);
1456         LASSERT(lfsck->li_lmv == NULL);
1457
1458         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1459                 llmv = &llu->llu_lmv;
1460
1461                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1462                          "still in using: %u\n",
1463                          atomic_read(&llmv->ll_ref));
1464
1465                 lfsck_lmv_put(env, llmv);
1466         }
1467
1468         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1469                 lfsck_component_cleanup(env, com);
1470         }
1471
1472         LASSERT(list_empty(&lfsck->li_list_dir));
1473
1474         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1475                                  lc_link) {
1476                 lfsck_component_cleanup(env, com);
1477         }
1478
1479         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1480                 lfsck_component_cleanup(env, com);
1481         }
1482
1483         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1484         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1485
1486         if (lfsck->li_bookmark_obj != NULL) {
1487                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1488                 lfsck->li_bookmark_obj = NULL;
1489         }
1490
1491         if (lfsck->li_lpf_obj != NULL) {
1492                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1493                 lfsck->li_lpf_obj = NULL;
1494         }
1495
1496         if (lfsck->li_los != NULL) {
1497                 local_oid_storage_fini(env, lfsck->li_los);
1498                 lfsck->li_los = NULL;
1499         }
1500
1501         lfsck_fid_fini(lfsck);
1502
1503         OBD_FREE_PTR(lfsck);
1504 }
1505
1506 static inline struct lfsck_instance *
1507 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1508 {
1509         struct lfsck_instance *lfsck;
1510
1511         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1512                 if (lfsck->li_bottom == key) {
1513                         if (ref)
1514                                 lfsck_instance_get(lfsck);
1515                         if (unlink)
1516                                 list_del_init(&lfsck->li_link);
1517
1518                         return lfsck;
1519                 }
1520         }
1521
1522         return NULL;
1523 }
1524
1525 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1526                                            bool unlink)
1527 {
1528         struct lfsck_instance *lfsck;
1529
1530         spin_lock(&lfsck_instance_lock);
1531         lfsck = __lfsck_instance_find(key, ref, unlink);
1532         spin_unlock(&lfsck_instance_lock);
1533
1534         return lfsck;
1535 }
1536
1537 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1538 {
1539         struct lfsck_instance *tmp;
1540
1541         spin_lock(&lfsck_instance_lock);
1542         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1543                 if (lfsck->li_bottom == tmp->li_bottom) {
1544                         spin_unlock(&lfsck_instance_lock);
1545                         return -EEXIST;
1546                 }
1547         }
1548
1549         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1550         spin_unlock(&lfsck_instance_lock);
1551         return 0;
1552 }
1553
1554 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1555                     const char *prefix)
1556 {
1557         int flag;
1558         int i;
1559         bool newline = (bits != 0 ? false : true);
1560
1561         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1562
1563         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1564                 if (flag & bits) {
1565                         bits &= ~flag;
1566                         if (names[i] != NULL) {
1567                                 if (bits == 0)
1568                                         newline = true;
1569
1570                                 seq_printf(m, "%s%c", names[i],
1571                                            newline ? '\n' : ',');
1572                         }
1573                 }
1574         }
1575
1576         if (!newline)
1577                 seq_printf(m, "\n");
1578         return 0;
1579 }
1580
1581 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1582 {
1583         if (time != 0)
1584                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1585                           cfs_time_current_sec() - time);
1586         else
1587                 seq_printf(m, "%s: N/A\n", prefix);
1588         return 0;
1589 }
1590
1591 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1592                    const char *prefix)
1593 {
1594         if (fid_is_zero(&pos->lp_dir_parent)) {
1595                 if (pos->lp_oit_cookie == 0)
1596                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1597                                    prefix);
1598                 else
1599                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1600                                    prefix, pos->lp_oit_cookie);
1601         } else {
1602                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1603                            prefix, pos->lp_oit_cookie,
1604                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1605         }
1606         return 0;
1607 }
1608
1609 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1610                     struct lfsck_position *pos, bool init)
1611 {
1612         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1613
1614         if (unlikely(lfsck->li_di_oit == NULL)) {
1615                 memset(pos, 0, sizeof(*pos));
1616                 return;
1617         }
1618
1619         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1620         if (!lfsck->li_current_oit_processed && !init)
1621                 pos->lp_oit_cookie--;
1622
1623         LASSERT(pos->lp_oit_cookie > 0);
1624
1625         if (lfsck->li_di_dir != NULL) {
1626                 struct dt_object *dto = lfsck->li_obj_dir;
1627
1628                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1629                                                         lfsck->li_di_dir);
1630
1631                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1632                         fid_zero(&pos->lp_dir_parent);
1633                         pos->lp_dir_cookie = 0;
1634                 } else {
1635                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1636                 }
1637         } else {
1638                 fid_zero(&pos->lp_dir_parent);
1639                 pos->lp_dir_cookie = 0;
1640         }
1641 }
1642
1643 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1644 {
1645         bool dirty = false;
1646
1647         if (limit != LFSCK_SPEED_NO_LIMIT) {
1648                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1649                         lfsck->li_sleep_rate = limit /
1650                                                msecs_to_jiffies(MSEC_PER_SEC);
1651                         lfsck->li_sleep_jif = 1;
1652                 } else {
1653                         lfsck->li_sleep_rate = 1;
1654                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1655                                               limit;
1656                 }
1657         } else {
1658                 lfsck->li_sleep_jif = 0;
1659                 lfsck->li_sleep_rate = 0;
1660         }
1661
1662         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1663                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1664                 dirty = true;
1665         }
1666
1667         return dirty;
1668 }
1669
1670 void lfsck_control_speed(struct lfsck_instance *lfsck)
1671 {
1672         struct ptlrpc_thread *thread = &lfsck->li_thread;
1673         struct l_wait_info    lwi;
1674
1675         if (lfsck->li_sleep_jif > 0 &&
1676             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1677                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1678                                        LWI_ON_SIGNAL_NOOP, NULL);
1679
1680                 l_wait_event(thread->t_ctl_waitq,
1681                              !thread_is_running(thread),
1682                              &lwi);
1683                 lfsck->li_new_scanned = 0;
1684         }
1685 }
1686
1687 void lfsck_control_speed_by_self(struct lfsck_component *com)
1688 {
1689         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1690         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1691         struct l_wait_info       lwi;
1692
1693         if (lfsck->li_sleep_jif > 0 &&
1694             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1695                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1696                                        LWI_ON_SIGNAL_NOOP, NULL);
1697
1698                 l_wait_event(thread->t_ctl_waitq,
1699                              !thread_is_running(thread),
1700                              &lwi);
1701                 com->lc_new_scanned = 0;
1702         }
1703 }
1704
1705 static struct lfsck_thread_args *
1706 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1707                        struct lfsck_component *com,
1708                        struct lfsck_start_param *lsp)
1709 {
1710         struct lfsck_thread_args *lta;
1711         int                       rc;
1712
1713         OBD_ALLOC_PTR(lta);
1714         if (lta == NULL)
1715                 return ERR_PTR(-ENOMEM);
1716
1717         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1718         if (rc != 0) {
1719                 OBD_FREE_PTR(lta);
1720                 return ERR_PTR(rc);
1721         }
1722
1723         lta->lta_lfsck = lfsck_instance_get(lfsck);
1724         if (com != NULL)
1725                 lta->lta_com = lfsck_component_get(com);
1726
1727         lta->lta_lsp = lsp;
1728
1729         return lta;
1730 }
1731
1732 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1733 {
1734         if (lta->lta_com != NULL)
1735                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1736         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1737         lu_env_fini(&lta->lta_env);
1738         OBD_FREE_PTR(lta);
1739 }
1740
1741 struct lfsck_assistant_data *
1742 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1743                           const char *name)
1744 {
1745         struct lfsck_assistant_data *lad;
1746
1747         OBD_ALLOC_PTR(lad);
1748         if (lad != NULL) {
1749                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1750                 if (lad->lad_bitmap == NULL) {
1751                         OBD_FREE_PTR(lad);
1752                         return NULL;
1753                 }
1754
1755                 INIT_LIST_HEAD(&lad->lad_req_list);
1756                 spin_lock_init(&lad->lad_lock);
1757                 INIT_LIST_HEAD(&lad->lad_ost_list);
1758                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1759                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1760                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1761                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1762                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1763                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1764                 lad->lad_ops = lao;
1765                 lad->lad_name = name;
1766         }
1767
1768         return lad;
1769 }
1770
1771 /**
1772  * Generic LFSCK asynchronous communication interpretor function.
1773  * The LFSCK RPC reply for both the event notification and status
1774  * querying will be handled here.
1775  *
1776  * \param[in] env       pointer to the thread context
1777  * \param[in] req       pointer to the LFSCK request
1778  * \param[in] args      pointer to the lfsck_async_interpret_args
1779  * \param[in] rc        the result for handling the LFSCK request
1780  *
1781  * \retval              0 for success
1782  * \retval              negative error number on failure
1783  */
1784 int lfsck_async_interpret_common(const struct lu_env *env,
1785                                  struct ptlrpc_request *req,
1786                                  void *args, int rc)
1787 {
1788         struct lfsck_async_interpret_args *laia = args;
1789         struct lfsck_component            *com  = laia->laia_com;
1790         struct lfsck_assistant_data       *lad  = com->lc_data;
1791         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1792         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1793         struct lfsck_request              *lr   = laia->laia_lr;
1794
1795         LASSERT(com->lc_lfsck->li_master);
1796
1797         switch (lr->lr_event) {
1798         case LE_START:
1799                 if (rc != 0) {
1800                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1801                                "start: rc = %d\n",
1802                                lfsck_lfsck2name(com->lc_lfsck),
1803                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1804                                ltd->ltd_index, lad->lad_name, rc);
1805
1806                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1807                                 struct lfsck_layout *lo = com->lc_file_ram;
1808
1809                                 if (lr->lr_flags & LEF_TO_OST)
1810                                         lfsck_lad_set_bitmap(env, com,
1811                                                              ltd->ltd_index);
1812                                 else
1813                                         lo->ll_flags |= LF_INCOMPLETE;
1814                         } else {
1815                                 struct lfsck_namespace *ns = com->lc_file_ram;
1816
1817                                 /* If some MDT does not join the namespace
1818                                  * LFSCK, then we cannot know whether there
1819                                  * is some name entry on such MDT that with
1820                                  * the referenced MDT-object on this MDT or
1821                                  * not. So the namespace LFSCK on this MDT
1822                                  * cannot handle orphan MDT-objects properly.
1823                                  * So we mark the LFSCK as LF_INCOMPLETE and
1824                                  * skip orphan MDT-objects handling. */
1825                                 ns->ln_flags |= LF_INCOMPLETE;
1826                         }
1827                         break;
1828                 }
1829
1830                 spin_lock(&ltds->ltd_lock);
1831                 if (ltd->ltd_dead) {
1832                         spin_unlock(&ltds->ltd_lock);
1833                         break;
1834                 }
1835
1836                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1837                         struct list_head *list;
1838                         struct list_head *phase_list;
1839
1840                         if (ltd->ltd_layout_done) {
1841                                 spin_unlock(&ltds->ltd_lock);
1842                                 break;
1843                         }
1844
1845                         if (lr->lr_flags & LEF_TO_OST) {
1846                                 list = &lad->lad_ost_list;
1847                                 phase_list = &lad->lad_ost_phase1_list;
1848                         } else {
1849                                 list = &lad->lad_mdt_list;
1850                                 phase_list = &lad->lad_mdt_phase1_list;
1851                         }
1852
1853                         if (list_empty(&ltd->ltd_layout_list))
1854                                 list_add_tail(&ltd->ltd_layout_list, list);
1855                         if (list_empty(&ltd->ltd_layout_phase_list))
1856                                 list_add_tail(&ltd->ltd_layout_phase_list,
1857                                               phase_list);
1858                 } else {
1859                         if (ltd->ltd_namespace_done) {
1860                                 spin_unlock(&ltds->ltd_lock);
1861                                 break;
1862                         }
1863
1864                         if (list_empty(&ltd->ltd_namespace_list))
1865                                 list_add_tail(&ltd->ltd_namespace_list,
1866                                               &lad->lad_mdt_list);
1867                         if (list_empty(&ltd->ltd_namespace_phase_list))
1868                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1869                                               &lad->lad_mdt_phase1_list);
1870                 }
1871                 spin_unlock(&ltds->ltd_lock);
1872                 break;
1873         case LE_STOP:
1874         case LE_PHASE1_DONE:
1875         case LE_PHASE2_DONE:
1876         case LE_PEER_EXIT:
1877                 if (rc != 0 && rc != -EALREADY)
1878                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1879                               "event = %d, rc = %d\n",
1880                               lfsck_lfsck2name(com->lc_lfsck),
1881                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1882                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1883                 break;
1884         case LE_QUERY: {
1885                 struct lfsck_reply *reply;
1886                 struct list_head *list;
1887                 struct list_head *phase_list;
1888
1889                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1890                         list = &ltd->ltd_layout_list;
1891                         phase_list = &ltd->ltd_layout_phase_list;
1892                 } else {
1893                         list = &ltd->ltd_namespace_list;
1894                         phase_list = &ltd->ltd_namespace_phase_list;
1895                 }
1896
1897                 if (rc != 0) {
1898                         spin_lock(&ltds->ltd_lock);
1899                         list_del_init(phase_list);
1900                         list_del_init(list);
1901                         spin_unlock(&ltds->ltd_lock);
1902                         break;
1903                 }
1904
1905                 reply = req_capsule_server_get(&req->rq_pill,
1906                                                &RMF_LFSCK_REPLY);
1907                 if (reply == NULL) {
1908                         rc = -EPROTO;
1909                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1910                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1911                                lad->lad_name, rc);
1912                         spin_lock(&ltds->ltd_lock);
1913                         list_del_init(phase_list);
1914                         list_del_init(list);
1915                         spin_unlock(&ltds->ltd_lock);
1916                         break;
1917                 }
1918
1919                 switch (reply->lr_status) {
1920                 case LS_SCANNING_PHASE1:
1921                         break;
1922                 case LS_SCANNING_PHASE2:
1923                         spin_lock(&ltds->ltd_lock);
1924                         list_del_init(phase_list);
1925                         if (ltd->ltd_dead) {
1926                                 spin_unlock(&ltds->ltd_lock);
1927                                 break;
1928                         }
1929
1930                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1931                                 if (ltd->ltd_layout_done) {
1932                                         spin_unlock(&ltds->ltd_lock);
1933                                         break;
1934                                 }
1935
1936                                 if (lr->lr_flags & LEF_TO_OST)
1937                                         list_add_tail(phase_list,
1938                                                 &lad->lad_ost_phase2_list);
1939                                 else
1940                                         list_add_tail(phase_list,
1941                                                 &lad->lad_mdt_phase2_list);
1942                         } else {
1943                                 if (ltd->ltd_namespace_done) {
1944                                         spin_unlock(&ltds->ltd_lock);
1945                                         break;
1946                                 }
1947
1948                                 list_add_tail(phase_list,
1949                                               &lad->lad_mdt_phase2_list);
1950                         }
1951                         spin_unlock(&ltds->ltd_lock);
1952                         break;
1953                 default:
1954                         spin_lock(&ltds->ltd_lock);
1955                         list_del_init(phase_list);
1956                         list_del_init(list);
1957                         spin_unlock(&ltds->ltd_lock);
1958                         break;
1959                 }
1960                 break;
1961         }
1962         default:
1963                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
1964                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1965                 break;
1966         }
1967
1968         if (!laia->laia_shared) {
1969                 lfsck_tgt_put(ltd);
1970                 lfsck_component_put(env, com);
1971         }
1972
1973         return 0;
1974 }
1975
1976 static void lfsck_interpret(const struct lu_env *env,
1977                             struct lfsck_instance *lfsck,
1978                             struct ptlrpc_request *req, void *args, int result)
1979 {
1980         struct lfsck_async_interpret_args *laia = args;
1981         struct lfsck_component            *com;
1982
1983         LASSERT(laia->laia_com == NULL);
1984         LASSERT(laia->laia_shared);
1985
1986         spin_lock(&lfsck->li_lock);
1987         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1988                 laia->laia_com = com;
1989                 lfsck_async_interpret_common(env, req, laia, result);
1990         }
1991
1992         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1993                 laia->laia_com = com;
1994                 lfsck_async_interpret_common(env, req, laia, result);
1995         }
1996         spin_unlock(&lfsck->li_lock);
1997 }
1998
1999 static int lfsck_stop_notify(const struct lu_env *env,
2000                              struct lfsck_instance *lfsck,
2001                              struct lfsck_tgt_descs *ltds,
2002                              struct lfsck_tgt_desc *ltd, __u16 type)
2003 {
2004         struct lfsck_component *com;
2005         int                     rc = 0;
2006         ENTRY;
2007
2008         LASSERT(lfsck->li_master);
2009
2010         spin_lock(&lfsck->li_lock);
2011         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2012         if (com == NULL)
2013                 com = __lfsck_component_find(lfsck, type,
2014                                              &lfsck->li_list_double_scan);
2015         if (com != NULL)
2016                 lfsck_component_get(com);
2017         spin_unlock(&lfsck->li_lock);
2018
2019         if (com != NULL) {
2020                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2021                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2022                 struct lfsck_request              *lr    = &info->lti_lr;
2023                 struct lfsck_assistant_data       *lad   = com->lc_data;
2024                 struct list_head                  *list;
2025                 struct list_head                  *phase_list;
2026                 struct ptlrpc_request_set         *set;
2027
2028                 set = ptlrpc_prep_set();
2029                 if (set == NULL) {
2030                         lfsck_component_put(env, com);
2031
2032                         RETURN(-ENOMEM);
2033                 }
2034
2035                 if (type == LFSCK_TYPE_LAYOUT) {
2036                         list = &ltd->ltd_layout_list;
2037                         phase_list = &ltd->ltd_layout_phase_list;
2038                 } else {
2039                         list = &ltd->ltd_namespace_list;
2040                         phase_list = &ltd->ltd_namespace_phase_list;
2041                 }
2042
2043                 spin_lock(&ltds->ltd_lock);
2044                 if (list_empty(list)) {
2045                         LASSERT(list_empty(phase_list));
2046                         spin_unlock(&ltds->ltd_lock);
2047                         ptlrpc_set_destroy(set);
2048
2049                         RETURN(0);
2050                 }
2051
2052                 list_del_init(phase_list);
2053                 list_del_init(list);
2054                 spin_unlock(&ltds->ltd_lock);
2055
2056                 memset(lr, 0, sizeof(*lr));
2057                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2058                 lr->lr_event = LE_PEER_EXIT;
2059                 lr->lr_active = type;
2060                 lr->lr_status = LS_CO_PAUSED;
2061                 if (ltds == &lfsck->li_ost_descs)
2062                         lr->lr_flags = LEF_TO_OST;
2063
2064                 laia->laia_com = com;
2065                 laia->laia_ltds = ltds;
2066                 atomic_inc(&ltd->ltd_ref);
2067                 laia->laia_ltd = ltd;
2068                 laia->laia_lr = lr;
2069                 laia->laia_shared = 0;
2070
2071                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2072                                          lfsck_async_interpret_common,
2073                                          laia, LFSCK_NOTIFY);
2074                 if (rc != 0) {
2075                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2076                                "co-stop for %s: rc = %d\n",
2077                                lfsck_lfsck2name(lfsck),
2078                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2079                                ltd->ltd_index, lad->lad_name, rc);
2080                         lfsck_tgt_put(ltd);
2081                 } else {
2082                         rc = ptlrpc_set_wait(set);
2083                 }
2084
2085                 ptlrpc_set_destroy(set);
2086                 lfsck_component_put(env, com);
2087         }
2088
2089         RETURN(rc);
2090 }
2091
2092 static int lfsck_async_interpret(const struct lu_env *env,
2093                                  struct ptlrpc_request *req,
2094                                  void *args, int rc)
2095 {
2096         struct lfsck_async_interpret_args *laia = args;
2097         struct lfsck_instance             *lfsck;
2098
2099         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2100                               li_mdt_descs);
2101         lfsck_interpret(env, lfsck, req, laia, rc);
2102         lfsck_tgt_put(laia->laia_ltd);
2103         if (rc != 0 && laia->laia_result != -EALREADY)
2104                 laia->laia_result = rc;
2105
2106         return 0;
2107 }
2108
2109 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2110                         struct lfsck_request *lr,
2111                         struct ptlrpc_request_set *set,
2112                         ptlrpc_interpterer_t interpreter,
2113                         void *args, int request)
2114 {
2115         struct lfsck_async_interpret_args *laia;
2116         struct ptlrpc_request             *req;
2117         struct lfsck_request              *tmp;
2118         struct req_format                 *format;
2119         int                                rc;
2120
2121         switch (request) {
2122         case LFSCK_NOTIFY:
2123                 format = &RQF_LFSCK_NOTIFY;
2124                 break;
2125         case LFSCK_QUERY:
2126                 format = &RQF_LFSCK_QUERY;
2127                 break;
2128         default:
2129                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2130                        exp->exp_obd->obd_name, request, -EINVAL);
2131                 return -EINVAL;
2132         }
2133
2134         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2135         if (req == NULL)
2136                 return -ENOMEM;
2137
2138         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2139         if (rc != 0) {
2140                 ptlrpc_request_free(req);
2141
2142                 return rc;
2143         }
2144
2145         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2146         *tmp = *lr;
2147         ptlrpc_request_set_replen(req);
2148
2149         laia = ptlrpc_req_async_args(req);
2150         *laia = *(struct lfsck_async_interpret_args *)args;
2151         if (laia->laia_com != NULL)
2152                 lfsck_component_get(laia->laia_com);
2153         req->rq_interpret_reply = interpreter;
2154         ptlrpc_set_add_req(set, req);
2155
2156         return 0;
2157 }
2158
2159 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2160                           struct lfsck_start_param *lsp)
2161 {
2162         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2163         struct lfsck_assistant_data     *lad     = com->lc_data;
2164         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2165         struct ptlrpc_thread            *athread = &lad->lad_thread;
2166         struct lfsck_thread_args        *lta;
2167         struct task_struct              *task;
2168         int                              rc;
2169         ENTRY;
2170
2171         lad->lad_assistant_status = 0;
2172         lad->lad_post_result = 0;
2173         lad->lad_to_post = 0;
2174         lad->lad_to_double_scan = 0;
2175         lad->lad_in_double_scan = 0;
2176         lad->lad_exit = 0;
2177         thread_set_flags(athread, 0);
2178
2179         lta = lfsck_thread_args_init(lfsck, com, lsp);
2180         if (IS_ERR(lta))
2181                 RETURN(PTR_ERR(lta));
2182
2183         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2184         if (IS_ERR(task)) {
2185                 rc = PTR_ERR(task);
2186                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2187                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2188                 lfsck_thread_args_fini(lta);
2189         } else {
2190                 struct l_wait_info lwi = { 0 };
2191
2192                 l_wait_event(mthread->t_ctl_waitq,
2193                              thread_is_running(athread) ||
2194                              thread_is_stopped(athread),
2195                              &lwi);
2196                 if (unlikely(!thread_is_running(athread)))
2197                         rc = lad->lad_assistant_status;
2198                 else
2199                         rc = 0;
2200         }
2201
2202         RETURN(rc);
2203 }
2204
2205 int lfsck_checkpoint_generic(const struct lu_env *env,
2206                              struct lfsck_component *com)
2207 {
2208         struct lfsck_assistant_data     *lad     = com->lc_data;
2209         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2210         struct ptlrpc_thread            *athread = &lad->lad_thread;
2211         struct l_wait_info               lwi     = { 0 };
2212
2213         if (com->lc_new_checked == 0)
2214                 return LFSCK_CHECKPOINT_SKIP;
2215
2216         l_wait_event(mthread->t_ctl_waitq,
2217                      list_empty(&lad->lad_req_list) ||
2218                      !thread_is_running(mthread) ||
2219                      thread_is_stopped(athread),
2220                      &lwi);
2221
2222         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2223                 return LFSCK_CHECKPOINT_SKIP;
2224
2225         return 0;
2226 }
2227
2228 void lfsck_post_generic(const struct lu_env *env,
2229                         struct lfsck_component *com, int *result)
2230 {
2231         struct lfsck_assistant_data     *lad     = com->lc_data;
2232         struct ptlrpc_thread            *athread = &lad->lad_thread;
2233         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2234         struct l_wait_info               lwi     = { 0 };
2235
2236         lad->lad_post_result = *result;
2237         if (*result <= 0)
2238                 lad->lad_exit = 1;
2239         lad->lad_to_post = 1;
2240
2241         wake_up_all(&athread->t_ctl_waitq);
2242         l_wait_event(mthread->t_ctl_waitq,
2243                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2244                      thread_is_stopped(athread),
2245                      &lwi);
2246
2247         if (lad->lad_assistant_status < 0)
2248                 *result = lad->lad_assistant_status;
2249 }
2250
2251 int lfsck_double_scan_generic(const struct lu_env *env,
2252                               struct lfsck_component *com, int status)
2253 {
2254         struct lfsck_assistant_data     *lad     = com->lc_data;
2255         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2256         struct ptlrpc_thread            *athread = &lad->lad_thread;
2257         struct l_wait_info               lwi     = { 0 };
2258
2259         if (status != LS_SCANNING_PHASE2)
2260                 lad->lad_exit = 1;
2261         else
2262                 lad->lad_to_double_scan = 1;
2263
2264         wake_up_all(&athread->t_ctl_waitq);
2265         l_wait_event(mthread->t_ctl_waitq,
2266                      lad->lad_in_double_scan ||
2267                      thread_is_stopped(athread),
2268                      &lwi);
2269
2270         if (lad->lad_assistant_status < 0)
2271                 return lad->lad_assistant_status;
2272
2273         return 0;
2274 }
2275
2276 void lfsck_quit_generic(const struct lu_env *env,
2277                         struct lfsck_component *com)
2278 {
2279         struct lfsck_assistant_data     *lad     = com->lc_data;
2280         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2281         struct ptlrpc_thread            *athread = &lad->lad_thread;
2282         struct l_wait_info               lwi     = { 0 };
2283
2284         lad->lad_exit = 1;
2285         wake_up_all(&athread->t_ctl_waitq);
2286         l_wait_event(mthread->t_ctl_waitq,
2287                      thread_is_init(athread) ||
2288                      thread_is_stopped(athread),
2289                      &lwi);
2290 }
2291
2292 /* external interfaces */
2293
2294 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2295 {
2296         struct lu_env           env;
2297         struct lfsck_instance  *lfsck;
2298         int                     rc;
2299         ENTRY;
2300
2301         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2302         if (rc != 0)
2303                 RETURN(rc);
2304
2305         lfsck = lfsck_instance_find(key, true, false);
2306         if (likely(lfsck != NULL)) {
2307                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2308                 lfsck_instance_put(&env, lfsck);
2309         } else {
2310                 rc = -ENXIO;
2311         }
2312
2313         lu_env_fini(&env);
2314
2315         RETURN(rc);
2316 }
2317 EXPORT_SYMBOL(lfsck_get_speed);
2318
2319 int lfsck_set_speed(struct dt_device *key, int val)
2320 {
2321         struct lu_env           env;
2322         struct lfsck_instance  *lfsck;
2323         int                     rc;
2324         ENTRY;
2325
2326         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2327         if (rc != 0)
2328                 RETURN(rc);
2329
2330         lfsck = lfsck_instance_find(key, true, false);
2331         if (likely(lfsck != NULL)) {
2332                 mutex_lock(&lfsck->li_mutex);
2333                 if (__lfsck_set_speed(lfsck, val))
2334                         rc = lfsck_bookmark_store(&env, lfsck);
2335                 mutex_unlock(&lfsck->li_mutex);
2336                 lfsck_instance_put(&env, lfsck);
2337         } else {
2338                 rc = -ENXIO;
2339         }
2340
2341         lu_env_fini(&env);
2342
2343         RETURN(rc);
2344 }
2345 EXPORT_SYMBOL(lfsck_set_speed);
2346
2347 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2348 {
2349         struct lu_env           env;
2350         struct lfsck_instance  *lfsck;
2351         int                     rc;
2352         ENTRY;
2353
2354         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2355         if (rc != 0)
2356                 RETURN(rc);
2357
2358         lfsck = lfsck_instance_find(key, true, false);
2359         if (likely(lfsck != NULL)) {
2360                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2361                 lfsck_instance_put(&env, lfsck);
2362         } else {
2363                 rc = -ENXIO;
2364         }
2365
2366         lu_env_fini(&env);
2367
2368         RETURN(rc);
2369 }
2370 EXPORT_SYMBOL(lfsck_get_windows);
2371
2372 int lfsck_set_windows(struct dt_device *key, int val)
2373 {
2374         struct lu_env           env;
2375         struct lfsck_instance  *lfsck;
2376         int                     rc;
2377         ENTRY;
2378
2379         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2380         if (rc != 0)
2381                 RETURN(rc);
2382
2383         lfsck = lfsck_instance_find(key, true, false);
2384         if (likely(lfsck != NULL)) {
2385                 if (val > LFSCK_ASYNC_WIN_MAX) {
2386                         CWARN("%s: Too large async window size, which "
2387                               "may cause memory issues. The valid range "
2388                               "is [0 - %u]. If you do not want to restrict "
2389                               "the window size for async requests pipeline, "
2390                               "just set it as 0.\n",
2391                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2392                         rc = -EINVAL;
2393                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2394                         mutex_lock(&lfsck->li_mutex);
2395                         lfsck->li_bookmark_ram.lb_async_windows = val;
2396                         rc = lfsck_bookmark_store(&env, lfsck);
2397                         mutex_unlock(&lfsck->li_mutex);
2398                 }
2399                 lfsck_instance_put(&env, lfsck);
2400         } else {
2401                 rc = -ENXIO;
2402         }
2403
2404         lu_env_fini(&env);
2405
2406         RETURN(rc);
2407 }
2408 EXPORT_SYMBOL(lfsck_set_windows);
2409
2410 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2411 {
2412         struct lu_env           env;
2413         struct lfsck_instance  *lfsck;
2414         struct lfsck_component *com;
2415         int                     rc;
2416         ENTRY;
2417
2418         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2419         if (rc != 0)
2420                 RETURN(rc);
2421
2422         lfsck = lfsck_instance_find(key, true, false);
2423         if (likely(lfsck != NULL)) {
2424                 com = lfsck_component_find(lfsck, type);
2425                 if (likely(com != NULL)) {
2426                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2427                         lfsck_component_put(&env, com);
2428                 } else {
2429                         rc = -ENOTSUPP;
2430                 }
2431
2432                 lfsck_instance_put(&env, lfsck);
2433         } else {
2434                 rc = -ENXIO;
2435         }
2436
2437         lu_env_fini(&env);
2438
2439         RETURN(rc);
2440 }
2441 EXPORT_SYMBOL(lfsck_dump);
2442
2443 static int lfsck_stop_all(const struct lu_env *env,
2444                           struct lfsck_instance *lfsck,
2445                           struct lfsck_stop *stop)
2446 {
2447         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2448         struct lfsck_request              *lr     = &info->lti_lr;
2449         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2450         struct ptlrpc_request_set         *set;
2451         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2452         struct lfsck_tgt_desc             *ltd;
2453         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2454         __u32                              idx;
2455         int                                rc     = 0;
2456         int                                rc1    = 0;
2457         ENTRY;
2458
2459         LASSERT(stop->ls_flags & LPF_BROADCAST);
2460
2461         set = ptlrpc_prep_set();
2462         if (unlikely(set == NULL))
2463                 RETURN(-ENOMEM);
2464
2465         memset(lr, 0, sizeof(*lr));
2466         lr->lr_event = LE_STOP;
2467         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2468         lr->lr_status = stop->ls_status;
2469         lr->lr_version = bk->lb_version;
2470         lr->lr_active = LFSCK_TYPES_ALL;
2471         lr->lr_param = stop->ls_flags;
2472
2473         laia->laia_com = NULL;
2474         laia->laia_ltds = ltds;
2475         laia->laia_lr = lr;
2476         laia->laia_result = 0;
2477         laia->laia_shared = 1;
2478
2479         down_read(&ltds->ltd_rw_sem);
2480         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2481                 ltd = lfsck_tgt_get(ltds, idx);
2482                 LASSERT(ltd != NULL);
2483
2484                 laia->laia_ltd = ltd;
2485                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2486                                          lfsck_async_interpret, laia,
2487                                          LFSCK_NOTIFY);
2488                 if (rc != 0) {
2489                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2490                         lfsck_tgt_put(ltd);
2491                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2492                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2493                         rc1 = rc;
2494                 }
2495         }
2496         up_read(&ltds->ltd_rw_sem);
2497
2498         rc = ptlrpc_set_wait(set);
2499         ptlrpc_set_destroy(set);
2500
2501         if (rc == 0)
2502                 rc = laia->laia_result;
2503
2504         if (rc == -EALREADY)
2505                 rc = 0;
2506
2507         if (rc != 0)
2508                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2509                        lfsck_lfsck2name(lfsck), rc);
2510
2511         RETURN(rc != 0 ? rc : rc1);
2512 }
2513
2514 static int lfsck_start_all(const struct lu_env *env,
2515                            struct lfsck_instance *lfsck,
2516                            struct lfsck_start *start)
2517 {
2518         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2519         struct lfsck_request              *lr     = &info->lti_lr;
2520         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2521         struct ptlrpc_request_set         *set;
2522         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2523         struct lfsck_tgt_desc             *ltd;
2524         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2525         __u32                              idx;
2526         int                                rc     = 0;
2527         ENTRY;
2528
2529         LASSERT(start->ls_flags & LPF_BROADCAST);
2530
2531         set = ptlrpc_prep_set();
2532         if (unlikely(set == NULL))
2533                 RETURN(-ENOMEM);
2534
2535         memset(lr, 0, sizeof(*lr));
2536         lr->lr_event = LE_START;
2537         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2538         lr->lr_speed = bk->lb_speed_limit;
2539         lr->lr_version = bk->lb_version;
2540         lr->lr_active = start->ls_active;
2541         lr->lr_param = start->ls_flags;
2542         lr->lr_async_windows = bk->lb_async_windows;
2543         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2544                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2545                        LSV_CREATE_MDTOBJ;
2546
2547         laia->laia_com = NULL;
2548         laia->laia_ltds = ltds;
2549         laia->laia_lr = lr;
2550         laia->laia_result = 0;
2551         laia->laia_shared = 1;
2552
2553         down_read(&ltds->ltd_rw_sem);
2554         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2555                 ltd = lfsck_tgt_get(ltds, idx);
2556                 LASSERT(ltd != NULL);
2557
2558                 laia->laia_ltd = ltd;
2559                 ltd->ltd_layout_done = 0;
2560                 ltd->ltd_namespace_done = 0;
2561                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2562                                          lfsck_async_interpret, laia,
2563                                          LFSCK_NOTIFY);
2564                 if (rc != 0) {
2565                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2566                         lfsck_tgt_put(ltd);
2567                         CERROR("%s: cannot notify MDT %x for LFSCK "
2568                                "start, failout: rc = %d\n",
2569                                lfsck_lfsck2name(lfsck), idx, rc);
2570                         break;
2571                 }
2572         }
2573         up_read(&ltds->ltd_rw_sem);
2574
2575         if (rc != 0) {
2576                 ptlrpc_set_destroy(set);
2577
2578                 RETURN(rc);
2579         }
2580
2581         rc = ptlrpc_set_wait(set);
2582         ptlrpc_set_destroy(set);
2583
2584         if (rc == 0)
2585                 rc = laia->laia_result;
2586
2587         if (rc != 0) {
2588                 struct lfsck_stop *stop = &info->lti_stop;
2589
2590                 CERROR("%s: cannot start LFSCK on some MDTs, "
2591                        "stop all: rc = %d\n",
2592                        lfsck_lfsck2name(lfsck), rc);
2593                 if (rc != -EALREADY) {
2594                         stop->ls_status = LS_FAILED;
2595                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2596                         lfsck_stop_all(env, lfsck, stop);
2597                 }
2598         }
2599
2600         RETURN(rc);
2601 }
2602
2603 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2604                 struct lfsck_start_param *lsp)
2605 {
2606         struct lfsck_start              *start  = lsp->lsp_start;
2607         struct lfsck_instance           *lfsck;
2608         struct lfsck_bookmark           *bk;
2609         struct ptlrpc_thread            *thread;
2610         struct lfsck_component          *com;
2611         struct l_wait_info               lwi    = { 0 };
2612         struct lfsck_thread_args        *lta;
2613         struct task_struct              *task;
2614         int                              rc     = 0;
2615         __u16                            valid  = 0;
2616         __u16                            flags  = 0;
2617         __u16                            type   = 1;
2618         ENTRY;
2619
2620         lfsck = lfsck_instance_find(key, true, false);
2621         if (unlikely(lfsck == NULL))
2622                 RETURN(-ENXIO);
2623
2624         /* System is not ready, try again later. */
2625         if (unlikely(lfsck->li_namespace == NULL))
2626                 GOTO(put, rc = -EAGAIN);
2627
2628         /* start == NULL means auto trigger paused LFSCK. */
2629         if ((start == NULL) &&
2630             (list_empty(&lfsck->li_list_scan) ||
2631              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2632                 GOTO(put, rc = 0);
2633
2634         bk = &lfsck->li_bookmark_ram;
2635         thread = &lfsck->li_thread;
2636         mutex_lock(&lfsck->li_mutex);
2637         spin_lock(&lfsck->li_lock);
2638         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2639                 rc = -EALREADY;
2640                 if (unlikely(start == NULL)) {
2641                         spin_unlock(&lfsck->li_lock);
2642                         GOTO(out, rc);
2643                 }
2644
2645                 while (start->ls_active != 0) {
2646                         if (!(type & start->ls_active)) {
2647                                 type <<= 1;
2648                                 continue;
2649                         }
2650
2651                         com = __lfsck_component_find(lfsck, type,
2652                                                      &lfsck->li_list_scan);
2653                         if (com == NULL)
2654                                 com = __lfsck_component_find(lfsck, type,
2655                                                 &lfsck->li_list_double_scan);
2656                         if (com == NULL) {
2657                                 rc = -EOPNOTSUPP;
2658                                 break;
2659                         }
2660
2661                         if (com->lc_ops->lfsck_join != NULL) {
2662                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2663                                 if (rc != 0 && rc != -EALREADY)
2664                                         break;
2665                         }
2666                         start->ls_active &= ~type;
2667                         type <<= 1;
2668                 }
2669                 spin_unlock(&lfsck->li_lock);
2670                 GOTO(out, rc);
2671         }
2672         spin_unlock(&lfsck->li_lock);
2673
2674         lfsck->li_status = 0;
2675         lfsck->li_oit_over = 0;
2676         lfsck->li_start_unplug = 0;
2677         lfsck->li_drop_dryrun = 0;
2678         lfsck->li_new_scanned = 0;
2679
2680         /* For auto trigger. */
2681         if (start == NULL)
2682                 goto trigger;
2683
2684         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2685                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2686                        lfsck_lfsck2name(lfsck));
2687
2688                 GOTO(out, rc = -EPERM);
2689         }
2690
2691         start->ls_version = bk->lb_version;
2692
2693         if (start->ls_active != 0) {
2694                 struct lfsck_component *next;
2695
2696                 if (start->ls_active == LFSCK_TYPES_ALL)
2697                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2698
2699                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2700                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2701                         GOTO(out, rc = -ENOTSUPP);
2702                 }
2703
2704                 list_for_each_entry_safe(com, next,
2705                                          &lfsck->li_list_scan, lc_link) {
2706                         if (!(com->lc_type & start->ls_active)) {
2707                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2708                                                              false);
2709                                 if (rc != 0)
2710                                         GOTO(out, rc);
2711                         }
2712                 }
2713
2714                 while (start->ls_active != 0) {
2715                         if (type & start->ls_active) {
2716                                 com = __lfsck_component_find(lfsck, type,
2717                                                         &lfsck->li_list_idle);
2718                                 if (com != NULL)
2719                                         /* The component status will be updated
2720                                          * when its prep() is called later by
2721                                          * the LFSCK main engine. */
2722                                         list_move_tail(&com->lc_link,
2723                                                        &lfsck->li_list_scan);
2724                                 start->ls_active &= ~type;
2725                         }
2726                         type <<= 1;
2727                 }
2728         }
2729
2730         if (list_empty(&lfsck->li_list_scan)) {
2731                 /* The speed limit will be used to control both the LFSCK and
2732                  * low layer scrub (if applied), need to be handled firstly. */
2733                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2734                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2735                                 rc = lfsck_bookmark_store(env, lfsck);
2736                                 if (rc != 0)
2737                                         GOTO(out, rc);
2738                         }
2739                 }
2740
2741                 goto trigger;
2742         }
2743
2744         if (start->ls_flags & LPF_RESET)
2745                 flags |= DOIF_RESET;
2746
2747         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2748         if (rc != 0)
2749                 GOTO(out, rc);
2750
2751         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2752                 start->ls_active |= com->lc_type;
2753                 if (flags & DOIF_RESET) {
2754                         rc = com->lc_ops->lfsck_reset(env, com, false);
2755                         if (rc != 0)
2756                                 GOTO(out, rc);
2757                 }
2758         }
2759
2760 trigger:
2761         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2762         if (bk->lb_param & LPF_DRYRUN)
2763                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2764
2765         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2766                 valid |= DOIV_ERROR_HANDLE;
2767                 if (start->ls_flags & LPF_FAILOUT)
2768                         flags |= DOIF_FAILOUT;
2769         }
2770
2771         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2772                 valid |= DOIV_DRYRUN;
2773                 if (start->ls_flags & LPF_DRYRUN)
2774                         flags |= DOIF_DRYRUN;
2775         }
2776
2777         if (!list_empty(&lfsck->li_list_scan))
2778                 flags |= DOIF_OUTUSED;
2779
2780         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2781         thread_set_flags(thread, 0);
2782         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2783         if (IS_ERR(lta))
2784                 GOTO(out, rc = PTR_ERR(lta));
2785
2786         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2787         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2788         if (IS_ERR(task)) {
2789                 rc = PTR_ERR(task);
2790                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2791                        lfsck_lfsck2name(lfsck), rc);
2792                 lfsck_thread_args_fini(lta);
2793
2794                 GOTO(out, rc);
2795         }
2796
2797         l_wait_event(thread->t_ctl_waitq,
2798                      thread_is_running(thread) ||
2799                      thread_is_stopped(thread),
2800                      &lwi);
2801         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2802                 lfsck->li_start_unplug = 1;
2803                 wake_up_all(&thread->t_ctl_waitq);
2804
2805                 GOTO(out, rc = 0);
2806         }
2807
2808         /* release lfsck::li_mutex to avoid deadlock. */
2809         mutex_unlock(&lfsck->li_mutex);
2810         rc = lfsck_start_all(env, lfsck, start);
2811         if (rc != 0) {
2812                 spin_lock(&lfsck->li_lock);
2813                 if (thread_is_stopped(thread)) {
2814                         spin_unlock(&lfsck->li_lock);
2815                 } else {
2816                         lfsck->li_status = LS_FAILED;
2817                         lfsck->li_flags = 0;
2818                         thread_set_flags(thread, SVC_STOPPING);
2819                         spin_unlock(&lfsck->li_lock);
2820
2821                         lfsck->li_start_unplug = 1;
2822                         wake_up_all(&thread->t_ctl_waitq);
2823                         l_wait_event(thread->t_ctl_waitq,
2824                                      thread_is_stopped(thread),
2825                                      &lwi);
2826                 }
2827         } else {
2828                 lfsck->li_start_unplug = 1;
2829                 wake_up_all(&thread->t_ctl_waitq);
2830         }
2831
2832         GOTO(put, rc);
2833
2834 out:
2835         mutex_unlock(&lfsck->li_mutex);
2836
2837 put:
2838         lfsck_instance_put(env, lfsck);
2839
2840         return rc < 0 ? rc : 0;
2841 }
2842 EXPORT_SYMBOL(lfsck_start);
2843
2844 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2845                struct lfsck_stop *stop)
2846 {
2847         struct lfsck_instance   *lfsck;
2848         struct ptlrpc_thread    *thread;
2849         struct l_wait_info       lwi    = { 0 };
2850         int                      rc     = 0;
2851         int                      rc1    = 0;
2852         ENTRY;
2853
2854         lfsck = lfsck_instance_find(key, true, false);
2855         if (unlikely(lfsck == NULL))
2856                 RETURN(-ENXIO);
2857
2858         thread = &lfsck->li_thread;
2859         /* release lfsck::li_mutex to avoid deadlock. */
2860         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2861                 if (!lfsck->li_master) {
2862                         CERROR("%s: only allow to specify '-A' via MDS\n",
2863                                lfsck_lfsck2name(lfsck));
2864
2865                         GOTO(out, rc = -EPERM);
2866                 }
2867
2868                 rc1 = lfsck_stop_all(env, lfsck, stop);
2869         }
2870
2871         mutex_lock(&lfsck->li_mutex);
2872         spin_lock(&lfsck->li_lock);
2873         /* no error if LFSCK is already stopped, or was never started */
2874         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2875                 spin_unlock(&lfsck->li_lock);
2876                 GOTO(out, rc = 0);
2877         }
2878
2879         if (stop != NULL) {
2880                 lfsck->li_status = stop->ls_status;
2881                 lfsck->li_flags = stop->ls_flags;
2882         } else {
2883                 lfsck->li_status = LS_STOPPED;
2884                 lfsck->li_flags = 0;
2885         }
2886
2887         thread_set_flags(thread, SVC_STOPPING);
2888         spin_unlock(&lfsck->li_lock);
2889
2890         wake_up_all(&thread->t_ctl_waitq);
2891         l_wait_event(thread->t_ctl_waitq,
2892                      thread_is_stopped(thread),
2893                      &lwi);
2894
2895         GOTO(out, rc = 0);
2896
2897 out:
2898         mutex_unlock(&lfsck->li_mutex);
2899         lfsck_instance_put(env, lfsck);
2900
2901         return rc != 0 ? rc : rc1;
2902 }
2903 EXPORT_SYMBOL(lfsck_stop);
2904
2905 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2906                     struct lfsck_request *lr, struct thandle *th)
2907 {
2908         int rc = -EOPNOTSUPP;
2909         ENTRY;
2910
2911         switch (lr->lr_event) {
2912         case LE_START: {
2913                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2914                 struct lfsck_start_param  lsp;
2915
2916                 memset(start, 0, sizeof(*start));
2917                 start->ls_valid = lr->lr_valid;
2918                 start->ls_speed_limit = lr->lr_speed;
2919                 start->ls_version = lr->lr_version;
2920                 start->ls_active = lr->lr_active;
2921                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2922                 start->ls_async_windows = lr->lr_async_windows;
2923
2924                 lsp.lsp_start = start;
2925                 lsp.lsp_index = lr->lr_index;
2926                 lsp.lsp_index_valid = 1;
2927                 rc = lfsck_start(env, key, &lsp);
2928                 break;
2929         }
2930         case LE_STOP: {
2931                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2932
2933                 memset(stop, 0, sizeof(*stop));
2934                 stop->ls_status = lr->lr_status;
2935                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2936                 rc = lfsck_stop(env, key, stop);
2937                 break;
2938         }
2939         case LE_PHASE1_DONE:
2940         case LE_PHASE2_DONE:
2941         case LE_FID_ACCESSED:
2942         case LE_PEER_EXIT:
2943         case LE_CONDITIONAL_DESTROY:
2944         case LE_CREATE_ORPHAN:
2945         case LE_SKIP_NLINK_DECLARE:
2946         case LE_SKIP_NLINK:
2947         case LE_SET_LMV_MASTER:
2948         case LE_SET_LMV_SLAVE:
2949         case LE_PAIRS_VERIFY: {
2950                 struct lfsck_instance  *lfsck;
2951                 struct lfsck_component *com;
2952
2953                 lfsck = lfsck_instance_find(key, true, false);
2954                 if (unlikely(lfsck == NULL))
2955                         RETURN(-ENXIO);
2956
2957                 com = lfsck_component_find(lfsck, lr->lr_active);
2958                 if (likely(com != NULL)) {
2959                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
2960                         lfsck_component_put(env, com);
2961                 }
2962
2963                 lfsck_instance_put(env, lfsck);
2964                 break;
2965         }
2966         default:
2967                 break;
2968         }
2969
2970         RETURN(rc);
2971 }
2972 EXPORT_SYMBOL(lfsck_in_notify);
2973
2974 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2975                 struct lfsck_request *lr)
2976 {
2977         struct lfsck_instance  *lfsck;
2978         struct lfsck_component *com;
2979         int                     rc;
2980         ENTRY;
2981
2982         lfsck = lfsck_instance_find(key, true, false);
2983         if (unlikely(lfsck == NULL))
2984                 RETURN(-ENXIO);
2985
2986         com = lfsck_component_find(lfsck, lr->lr_active);
2987         if (likely(com != NULL)) {
2988                 rc = com->lc_ops->lfsck_query(env, com);
2989                 lfsck_component_put(env, com);
2990         } else {
2991                 rc = -ENOTSUPP;
2992         }
2993
2994         lfsck_instance_put(env, lfsck);
2995
2996         RETURN(rc);
2997 }
2998 EXPORT_SYMBOL(lfsck_query);
2999
3000 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3001                              struct ldlm_namespace *ns)
3002 {
3003         struct lfsck_instance  *lfsck;
3004         int                     rc      = -ENXIO;
3005
3006         lfsck = lfsck_instance_find(key, true, false);
3007         if (likely(lfsck != NULL)) {
3008                 lfsck->li_namespace = ns;
3009                 lfsck_instance_put(env, lfsck);
3010                 rc = 0;
3011         }
3012
3013         return rc;
3014 }
3015 EXPORT_SYMBOL(lfsck_register_namespace);
3016
3017 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3018                    struct dt_device *next, struct obd_device *obd,
3019                    lfsck_out_notify notify, void *notify_data, bool master)
3020 {
3021         struct lfsck_instance   *lfsck;
3022         struct dt_object        *root  = NULL;
3023         struct dt_object        *obj   = NULL;
3024         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3025         int                      rc;
3026         ENTRY;
3027
3028         lfsck = lfsck_instance_find(key, false, false);
3029         if (unlikely(lfsck != NULL))
3030                 RETURN(-EEXIST);
3031
3032         OBD_ALLOC_PTR(lfsck);
3033         if (lfsck == NULL)
3034                 RETURN(-ENOMEM);
3035
3036         mutex_init(&lfsck->li_mutex);
3037         spin_lock_init(&lfsck->li_lock);
3038         INIT_LIST_HEAD(&lfsck->li_link);
3039         INIT_LIST_HEAD(&lfsck->li_list_scan);
3040         INIT_LIST_HEAD(&lfsck->li_list_dir);
3041         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3042         INIT_LIST_HEAD(&lfsck->li_list_idle);
3043         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3044         atomic_set(&lfsck->li_ref, 1);
3045         atomic_set(&lfsck->li_double_scan_count, 0);
3046         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3047         lfsck->li_out_notify = notify;
3048         lfsck->li_out_notify_data = notify_data;
3049         lfsck->li_next = next;
3050         lfsck->li_bottom = key;
3051         lfsck->li_obd = obd;
3052
3053         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3054         if (rc != 0)
3055                 GOTO(out, rc);
3056
3057         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3058         if (rc != 0)
3059                 GOTO(out, rc);
3060
3061         fid->f_seq = FID_SEQ_LOCAL_NAME;
3062         fid->f_oid = 1;
3063         fid->f_ver = 0;
3064         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3065         if (rc != 0)
3066                 GOTO(out, rc);
3067
3068         rc = dt_root_get(env, key, fid);
3069         if (rc != 0)
3070                 GOTO(out, rc);
3071
3072         root = dt_locate(env, key, fid);
3073         if (IS_ERR(root))
3074                 GOTO(out, rc = PTR_ERR(root));
3075
3076         if (unlikely(!dt_try_as_dir(env, root)))
3077                 GOTO(out, rc = -ENOTDIR);
3078
3079         lfsck->li_local_root_fid = *fid;
3080         if (master) {
3081                 lfsck->li_master = 1;
3082                 if (lfsck_dev_idx(key) == 0) {
3083                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3084                         const struct lu_name *cname;
3085
3086                         rc = dt_lookup(env, root,
3087                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3088                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3089                         if (rc != 0)
3090                                 GOTO(out, rc);
3091
3092                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3093                         if (IS_ERR(obj))
3094                                 GOTO(out, rc = PTR_ERR(obj));
3095
3096                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3097                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3098                         if (rc != 0)
3099                                 GOTO(out, rc);
3100
3101                         lu_object_put(env, &obj->do_lu);
3102                         obj = dt_locate(env, key, fid);
3103                         if (IS_ERR(obj))
3104                                 GOTO(out, rc = PTR_ERR(obj));
3105
3106                         cname = lfsck_name_get_const(env, dotlustre,
3107                                                      strlen(dotlustre));
3108                         rc = lfsck_verify_linkea(env, key, obj, cname,
3109                                                  &lfsck->li_global_root_fid);
3110                         if (rc != 0)
3111                                 GOTO(out, rc);
3112
3113                         *pfid = *fid;
3114                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3115                                        (const struct dt_key *)lostfound,
3116                                        BYPASS_CAPA);
3117                         if (rc != 0)
3118                                 GOTO(out, rc);
3119
3120                         lu_object_put(env, &obj->do_lu);
3121                         obj = dt_locate(env, key, fid);
3122                         if (IS_ERR(obj))
3123                                 GOTO(out, rc = PTR_ERR(obj));
3124
3125                         cname = lfsck_name_get_const(env, lostfound,
3126                                                      strlen(lostfound));
3127                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3128                         if (rc != 0)
3129                                 GOTO(out, rc);
3130
3131                         lu_object_put(env, &obj->do_lu);
3132                         obj = NULL;
3133                 }
3134         }
3135
3136         fid->f_seq = FID_SEQ_LOCAL_FILE;
3137         fid->f_oid = OTABLE_IT_OID;
3138         fid->f_ver = 0;
3139         obj = dt_locate(env, key, fid);
3140         if (IS_ERR(obj))
3141                 GOTO(out, rc = PTR_ERR(obj));
3142
3143         lu_object_get(&obj->do_lu);
3144         lfsck->li_obj_oit = obj;
3145         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3146         if (rc != 0)
3147                 GOTO(out, rc);
3148
3149         rc = lfsck_bookmark_setup(env, lfsck);
3150         if (rc != 0)
3151                 GOTO(out, rc);
3152
3153         if (master) {
3154                 rc = lfsck_fid_init(lfsck);
3155                 if (rc < 0)
3156                         GOTO(out, rc);
3157
3158                 rc = lfsck_namespace_setup(env, lfsck);
3159                 if (rc < 0)
3160                         GOTO(out, rc);
3161         }
3162
3163         rc = lfsck_layout_setup(env, lfsck);
3164         if (rc < 0)
3165                 GOTO(out, rc);
3166
3167         /* XXX: more LFSCK components initialization to be added here. */
3168
3169         rc = lfsck_instance_add(lfsck);
3170         if (rc == 0)
3171                 rc = lfsck_add_target_from_orphan(env, lfsck);
3172 out:
3173         if (obj != NULL && !IS_ERR(obj))
3174                 lu_object_put(env, &obj->do_lu);
3175         if (root != NULL && !IS_ERR(root))
3176                 lu_object_put(env, &root->do_lu);
3177         if (rc != 0)
3178                 lfsck_instance_cleanup(env, lfsck);
3179         return rc;
3180 }
3181 EXPORT_SYMBOL(lfsck_register);
3182
3183 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3184 {
3185         struct lfsck_instance *lfsck;
3186
3187         lfsck = lfsck_instance_find(key, false, true);
3188         if (lfsck != NULL)
3189                 lfsck_instance_put(env, lfsck);
3190 }
3191 EXPORT_SYMBOL(lfsck_degister);
3192
3193 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3194                      struct dt_device *tgt, struct obd_export *exp,
3195                      __u32 index, bool for_ost)
3196 {
3197         struct lfsck_instance   *lfsck;
3198         struct lfsck_tgt_desc   *ltd;
3199         int                      rc;
3200         ENTRY;
3201
3202         OBD_ALLOC_PTR(ltd);
3203         if (ltd == NULL)
3204                 RETURN(-ENOMEM);
3205
3206         ltd->ltd_tgt = tgt;
3207         ltd->ltd_key = key;
3208         ltd->ltd_exp = exp;
3209         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3210         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3211         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3212         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3213         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3214         atomic_set(&ltd->ltd_ref, 1);
3215         ltd->ltd_index = index;
3216
3217         spin_lock(&lfsck_instance_lock);
3218         lfsck = __lfsck_instance_find(key, true, false);
3219         if (lfsck == NULL) {
3220                 if (for_ost)
3221                         list_add_tail(&ltd->ltd_orphan_list,
3222                                       &lfsck_ost_orphan_list);
3223                 else
3224                         list_add_tail(&ltd->ltd_orphan_list,
3225                                       &lfsck_mdt_orphan_list);
3226                 spin_unlock(&lfsck_instance_lock);
3227
3228                 RETURN(0);
3229         }
3230         spin_unlock(&lfsck_instance_lock);
3231
3232         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3233         if (rc != 0)
3234                 lfsck_tgt_put(ltd);
3235
3236         lfsck_instance_put(env, lfsck);
3237
3238         RETURN(rc);
3239 }
3240 EXPORT_SYMBOL(lfsck_add_target);
3241
3242 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3243                       struct dt_device *tgt, __u32 index, bool for_ost)
3244 {
3245         struct lfsck_instance   *lfsck;
3246         struct lfsck_tgt_descs  *ltds;
3247         struct lfsck_tgt_desc   *ltd;
3248         struct list_head        *head;
3249
3250         if (for_ost)
3251                 head = &lfsck_ost_orphan_list;
3252         else
3253                 head = &lfsck_mdt_orphan_list;
3254
3255         spin_lock(&lfsck_instance_lock);
3256         list_for_each_entry(ltd, head, ltd_orphan_list) {
3257                 if (ltd->ltd_tgt == tgt) {
3258                         list_del_init(&ltd->ltd_orphan_list);
3259                         spin_unlock(&lfsck_instance_lock);
3260                         lfsck_tgt_put(ltd);
3261
3262                         return;
3263                 }
3264         }
3265
3266         ltd = NULL;
3267         lfsck = __lfsck_instance_find(key, true, false);
3268         spin_unlock(&lfsck_instance_lock);
3269         if (unlikely(lfsck == NULL))
3270                 return;
3271
3272         if (for_ost)
3273                 ltds = &lfsck->li_ost_descs;
3274         else
3275                 ltds = &lfsck->li_mdt_descs;
3276
3277         down_write(&ltds->ltd_rw_sem);
3278         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3279
3280         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3281                 goto unlock;
3282
3283         ltd = LTD_TGT(ltds, index);
3284         if (unlikely(ltd == NULL))
3285                 goto unlock;
3286
3287         LASSERT(ltds->ltd_tgtnr > 0);
3288
3289         ltds->ltd_tgtnr--;
3290         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3291         LTD_TGT(ltds, index) = NULL;
3292
3293 unlock:
3294         if (ltd == NULL) {
3295                 if (for_ost)
3296                         head = &lfsck->li_ost_descs.ltd_orphan;
3297                 else
3298                         head = &lfsck->li_mdt_descs.ltd_orphan;
3299
3300                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3301                         if (ltd->ltd_tgt == tgt) {
3302                                 list_del_init(&ltd->ltd_orphan_list);
3303                                 break;
3304                         }
3305                 }
3306         }
3307
3308         up_write(&ltds->ltd_rw_sem);
3309         if (ltd != NULL) {
3310                 spin_lock(&ltds->ltd_lock);
3311                 ltd->ltd_dead = 1;
3312                 spin_unlock(&ltds->ltd_lock);
3313                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3314                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3315                 lfsck_tgt_put(ltd);
3316         }
3317
3318         lfsck_instance_put(env, lfsck);
3319 }
3320 EXPORT_SYMBOL(lfsck_del_target);
3321
3322 static int __init lfsck_init(void)
3323 {
3324         int rc;
3325
3326         INIT_LIST_HEAD(&lfsck_instance_list);
3327         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3328         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3329         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3330         rc = lu_context_key_register(&lfsck_thread_key);
3331         if (rc == 0) {
3332                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3333                 tgt_register_lfsck_query(lfsck_query);
3334         }
3335
3336         return rc;
3337 }
3338
3339 static void __exit lfsck_exit(void)
3340 {
3341         struct lfsck_tgt_desc *ltd;
3342         struct lfsck_tgt_desc *next;
3343
3344         LASSERT(list_empty(&lfsck_instance_list));
3345
3346         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3347                                  ltd_orphan_list) {
3348                 list_del_init(&ltd->ltd_orphan_list);
3349                 lfsck_tgt_put(ltd);
3350         }
3351
3352         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3353                                  ltd_orphan_list) {
3354                 list_del_init(&ltd->ltd_orphan_list);
3355                 lfsck_tgt_put(ltd);
3356         }
3357
3358         lu_context_key_degister(&lfsck_thread_key);
3359 }
3360
3361 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3362 MODULE_DESCRIPTION("LFSCK");
3363 MODULE_LICENSE("GPL");
3364
3365 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);