Whamcloud - gitweb
LU-5180 lfsck: linkea for orphan
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static CFS_LIST_HEAD(lfsck_instance_list);
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         "broadcast",
96         "orphan",
97         "create_ostobj",
98         NULL
99 };
100
101 const char *lfsck_status2names(enum lfsck_status status)
102 {
103         if (unlikely(status < 0 || status >= LS_MAX))
104                 return "unknown";
105
106         return lfsck_status_names[status];
107 }
108
109 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
110 {
111         spin_lock_init(&ltds->ltd_lock);
112         init_rwsem(&ltds->ltd_rw_sem);
113         INIT_LIST_HEAD(&ltds->ltd_orphan);
114         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
115         if (ltds->ltd_tgts_bitmap == NULL)
116                 return -ENOMEM;
117
118         return 0;
119 }
120
121 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
122 {
123         struct lfsck_tgt_desc   *ltd;
124         struct lfsck_tgt_desc   *next;
125         int                      idx;
126
127         down_write(&ltds->ltd_rw_sem);
128
129         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
130                                  ltd_orphan_list) {
131                 list_del_init(&ltd->ltd_orphan_list);
132                 lfsck_tgt_put(ltd);
133         }
134
135         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
136                 up_write(&ltds->ltd_rw_sem);
137
138                 return;
139         }
140
141         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
142                 ltd = LTD_TGT(ltds, idx);
143                 if (likely(ltd != NULL)) {
144                         LASSERT(list_empty(&ltd->ltd_layout_list));
145                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
146
147                         ltds->ltd_tgtnr--;
148                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
149                         LTD_TGT(ltds, idx) = NULL;
150                         lfsck_tgt_put(ltd);
151                 }
152         }
153
154         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
155                  ltds->ltd_tgtnr);
156
157         for (idx = 0; idx < TGT_PTRS; idx++) {
158                 if (ltds->ltd_tgts_idx[idx] != NULL) {
159                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
160                         ltds->ltd_tgts_idx[idx] = NULL;
161                 }
162         }
163
164         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
165         ltds->ltd_tgts_bitmap = NULL;
166         up_write(&ltds->ltd_rw_sem);
167 }
168
169 static int __lfsck_add_target(const struct lu_env *env,
170                               struct lfsck_instance *lfsck,
171                               struct lfsck_tgt_desc *ltd,
172                               bool for_ost, bool locked)
173 {
174         struct lfsck_tgt_descs *ltds;
175         __u32                   index = ltd->ltd_index;
176         int                     rc    = 0;
177         ENTRY;
178
179         if (for_ost)
180                 ltds = &lfsck->li_ost_descs;
181         else
182                 ltds = &lfsck->li_mdt_descs;
183
184         if (!locked)
185                 down_write(&ltds->ltd_rw_sem);
186
187         LASSERT(ltds->ltd_tgts_bitmap != NULL);
188
189         if (index >= ltds->ltd_tgts_bitmap->size) {
190                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
191                                     (__u32)BITS_PER_LONG);
192                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
193                 cfs_bitmap_t *new_bitmap;
194
195                 while (newsize < index + 1)
196                         newsize <<= 1;
197
198                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
199                 if (new_bitmap == NULL)
200                         GOTO(unlock, rc = -ENOMEM);
201
202                 if (ltds->ltd_tgtnr > 0)
203                         cfs_bitmap_copy(new_bitmap, old_bitmap);
204                 ltds->ltd_tgts_bitmap = new_bitmap;
205                 CFS_FREE_BITMAP(old_bitmap);
206         }
207
208         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
209                 CERROR("%s: the device %s (%u) is registered already\n",
210                        lfsck_lfsck2name(lfsck),
211                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
212                 GOTO(unlock, rc = -EEXIST);
213         }
214
215         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
216                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
217                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
218                         GOTO(unlock, rc = -ENOMEM);
219         }
220
221         LTD_TGT(ltds, index) = ltd;
222         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
223         ltds->ltd_tgtnr++;
224
225         GOTO(unlock, rc = 0);
226
227 unlock:
228         if (!locked)
229                 up_write(&ltds->ltd_rw_sem);
230
231         return rc;
232 }
233
234 static int lfsck_add_target_from_orphan(const struct lu_env *env,
235                                         struct lfsck_instance *lfsck)
236 {
237         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
238         struct lfsck_tgt_desc   *ltd;
239         struct lfsck_tgt_desc   *next;
240         struct list_head        *head    = &lfsck_ost_orphan_list;
241         int                      rc;
242         bool                     for_ost = true;
243
244 again:
245         spin_lock(&lfsck_instance_lock);
246         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
247                 if (ltd->ltd_key == lfsck->li_bottom) {
248                         list_del_init(&ltd->ltd_orphan_list);
249                         list_add_tail(&ltd->ltd_orphan_list,
250                                       &ltds->ltd_orphan);
251                 }
252         }
253         spin_unlock(&lfsck_instance_lock);
254
255         down_write(&ltds->ltd_rw_sem);
256         while (!list_empty(&ltds->ltd_orphan)) {
257                 ltd = list_entry(ltds->ltd_orphan.next,
258                                  struct lfsck_tgt_desc,
259                                  ltd_orphan_list);
260                 list_del_init(&ltd->ltd_orphan_list);
261                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
262                 /* Do not hold the semaphore for too long time. */
263                 up_write(&ltds->ltd_rw_sem);
264                 if (rc != 0)
265                         return rc;
266
267                 down_write(&ltds->ltd_rw_sem);
268         }
269         up_write(&ltds->ltd_rw_sem);
270
271         if (for_ost) {
272                 ltds = &lfsck->li_mdt_descs;
273                 head = &lfsck_mdt_orphan_list;
274                 for_ost = false;
275                 goto again;
276         }
277
278         return 0;
279 }
280
281 static inline struct lfsck_component *
282 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
283 {
284         struct lfsck_component *com;
285
286         cfs_list_for_each_entry(com, list, lc_link) {
287                 if (com->lc_type == type)
288                         return com;
289         }
290         return NULL;
291 }
292
293 struct lfsck_component *
294 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
295 {
296         struct lfsck_component *com;
297
298         spin_lock(&lfsck->li_lock);
299         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
300         if (com != NULL)
301                 goto unlock;
302
303         com = __lfsck_component_find(lfsck, type,
304                                      &lfsck->li_list_double_scan);
305         if (com != NULL)
306                 goto unlock;
307
308         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
309
310 unlock:
311         if (com != NULL)
312                 lfsck_component_get(com);
313         spin_unlock(&lfsck->li_lock);
314         return com;
315 }
316
317 void lfsck_component_cleanup(const struct lu_env *env,
318                              struct lfsck_component *com)
319 {
320         if (!cfs_list_empty(&com->lc_link))
321                 cfs_list_del_init(&com->lc_link);
322         if (!cfs_list_empty(&com->lc_link_dir))
323                 cfs_list_del_init(&com->lc_link_dir);
324
325         lfsck_component_put(env, com);
326 }
327
328 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
329                     struct lu_fid *fid, bool locked)
330 {
331         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
332         int                      rc = 0;
333         ENTRY;
334
335         if (!locked)
336                 mutex_lock(&lfsck->li_mutex);
337
338         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
339         if (rc >= 0) {
340                 bk->lb_last_fid = *fid;
341                 /* We do not care about whether the subsequent sub-operations
342                  * failed or not. The worst case is that one FID is lost that
343                  * is not a big issue for the LFSCK since it is relative rare
344                  * for LFSCK create. */
345                 rc = lfsck_bookmark_store(env, lfsck);
346         }
347
348         if (!locked)
349                 mutex_unlock(&lfsck->li_mutex);
350
351         RETURN(rc);
352 }
353
354 static const char dot[] = ".";
355 static const char dotdot[] = "..";
356 static const char dotlustre[] = ".lustre";
357 static const char lostfound[] = "lost+found";
358
359 static int lfsck_create_lpf_local(const struct lu_env *env,
360                                   struct lfsck_instance *lfsck,
361                                   struct dt_object *parent,
362                                   struct dt_object *child,
363                                   struct lu_attr *la,
364                                   struct dt_object_format *dof,
365                                   const char *name)
366 {
367         struct dt_device        *dev    = lfsck->li_bottom;
368         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
369         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
370         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
371         struct thandle          *th     = NULL;
372         struct linkea_data       ldata  = { 0 };
373         struct lu_buf            linkea_buf;
374         const struct lu_name    *cname;
375         loff_t                   pos    = 0;
376         int                      len    = sizeof(struct lfsck_bookmark);
377         int                      rc;
378         ENTRY;
379
380         rc = linkea_data_new(&ldata,
381                              &lfsck_env_info(env)->lti_linkea_buf);
382         if (rc != 0)
383                 RETURN(rc);
384
385         cname = lfsck_name_get_const(env, name, strlen(name));
386         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
387         if (rc != 0)
388                 RETURN(rc);
389
390         th = dt_trans_create(env, dev);
391         if (IS_ERR(th))
392                 RETURN(PTR_ERR(th));
393
394         /* 1a. create child */
395         rc = dt_declare_create(env, child, la, NULL, dof, th);
396         if (rc != 0)
397                 GOTO(stop, rc);
398
399         /* 2a. increase child nlink */
400         rc = dt_declare_ref_add(env, child, th);
401         if (rc != 0)
402                 GOTO(stop, rc);
403
404         /* 3a. insert linkEA for child */
405         linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
406         linkea_buf.lb_len = ldata.ld_leh->leh_len;
407         rc = dt_declare_xattr_set(env, child, &linkea_buf,
408                                   XATTR_NAME_LINK, 0, th);
409         if (rc != 0)
410                 GOTO(stop, rc);
411
412         /* 4a. insert name into parent dir */
413         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
414                                (const struct dt_key *)name, th);
415         if (rc != 0)
416                 GOTO(stop, rc);
417
418         /* 5a. increase parent nlink */
419         rc = dt_declare_ref_add(env, parent, th);
420         if (rc != 0)
421                 GOTO(stop, rc);
422
423         /* 6a. update bookmark */
424         rc = dt_declare_record_write(env, bk_obj,
425                                      lfsck_buf_get(env, bk, len), 0, th);
426         if (rc != 0)
427                 GOTO(stop, rc);
428
429         rc = dt_trans_start_local(env, dev, th);
430         if (rc != 0)
431                 GOTO(stop, rc);
432
433         dt_write_lock(env, child, 0);
434         /* 1b.1. create child */
435         rc = dt_create(env, child, la, NULL, dof, th);
436         if (rc != 0)
437                 GOTO(unlock, rc);
438
439         if (unlikely(!dt_try_as_dir(env, child)))
440                 GOTO(unlock, rc = -ENOTDIR);
441
442         /* 1b.2. insert dot into child dir */
443         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
444                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
445         if (rc != 0)
446                 GOTO(unlock, rc);
447
448         /* 1b.3. insert dotdot into child dir */
449         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
450                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
451         if (rc != 0)
452                 GOTO(unlock, rc);
453
454         /* 2b. increase child nlink */
455         rc = dt_ref_add(env, child, th);
456         if (rc != 0)
457                 GOTO(unlock, rc);
458
459         /* 3b. insert linkEA for child. */
460         rc = dt_xattr_set(env, child, &linkea_buf,
461                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
462         dt_write_unlock(env, child);
463         if (rc != 0)
464                 GOTO(stop, rc);
465
466         /* 4b. insert name into parent dir */
467         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
468                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
469         if (rc != 0)
470                 GOTO(stop, rc);
471
472         dt_write_lock(env, parent, 0);
473         /* 5b. increase parent nlink */
474         rc = dt_ref_add(env, parent, th);
475         dt_write_unlock(env, parent);
476         if (rc != 0)
477                 GOTO(stop, rc);
478
479         bk->lb_lpf_fid = *cfid;
480         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
481
482         /* 6b. update bookmark */
483         rc = dt_record_write(env, bk_obj,
484                              lfsck_buf_get(env, bk, len), &pos, th);
485
486         GOTO(stop, rc);
487
488 unlock:
489         dt_write_unlock(env, child);
490
491 stop:
492         dt_trans_stop(env, dev, th);
493
494         return rc;
495 }
496
497 static int lfsck_create_lpf_remote(const struct lu_env *env,
498                                    struct lfsck_instance *lfsck,
499                                    struct dt_object *parent,
500                                    struct dt_object *child,
501                                    struct lu_attr *la,
502                                    struct dt_object_format *dof,
503                                    const char *name)
504 {
505         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
506         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
507         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
508         struct thandle          *th     = NULL;
509         struct linkea_data       ldata  = { 0 };
510         struct lu_buf            linkea_buf;
511         const struct lu_name    *cname;
512         struct dt_device        *dev;
513         loff_t                   pos    = 0;
514         int                      len    = sizeof(struct lfsck_bookmark);
515         int                      rc;
516         ENTRY;
517
518         rc = linkea_data_new(&ldata,
519                              &lfsck_env_info(env)->lti_linkea_buf);
520         if (rc != 0)
521                 RETURN(rc);
522
523         cname = lfsck_name_get_const(env, name, strlen(name));
524         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
525         if (rc != 0)
526                 RETURN(rc);
527
528         /* Create .lustre/lost+found/MDTxxxx. */
529
530         /* XXX: Currently, cross-MDT create operation needs to create the child
531          *      object firstly, then insert name into the parent directory. For
532          *      this case, the child object resides on current MDT (local), but
533          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
534          *      easy to contain all the sub-modifications orderly within single
535          *      transaction.
536          *
537          *      To avoid more inconsistency, we split the create operation into
538          *      two transactions:
539          *
540          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
541          *         locally.
542          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
543          *         remotely.
544          *
545          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
546          *      repair such inconsistency when LFSCK run next time. */
547
548         /* Transaction I: locally */
549
550         dev = lfsck->li_bottom;
551         th = dt_trans_create(env, dev);
552         if (IS_ERR(th))
553                 RETURN(PTR_ERR(th));
554
555         /* 1a. create child */
556         rc = dt_declare_create(env, child, la, NULL, dof, th);
557         if (rc != 0)
558                 GOTO(stop, rc);
559
560         /* 2a. increase child nlink */
561         rc = dt_declare_ref_add(env, child, th);
562         if (rc != 0)
563                 GOTO(stop, rc);
564
565         /* 3a. insert linkEA for child */
566         linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
567         linkea_buf.lb_len = ldata.ld_leh->leh_len;
568         rc = dt_declare_xattr_set(env, child, &linkea_buf,
569                                   XATTR_NAME_LINK, 0, th);
570         if (rc != 0)
571                 GOTO(stop, rc);
572
573         /* 4a. update bookmark */
574         rc = dt_declare_record_write(env, bk_obj,
575                                      lfsck_buf_get(env, bk, len), 0, th);
576         if (rc != 0)
577                 GOTO(stop, rc);
578
579         rc = dt_trans_start_local(env, dev, th);
580         if (rc != 0)
581                 GOTO(stop, rc);
582
583         dt_write_lock(env, child, 0);
584         /* 1b.1. create child */
585         rc = dt_create(env, child, la, NULL, dof, th);
586         if (rc != 0)
587                 GOTO(unlock, rc);
588
589         if (unlikely(!dt_try_as_dir(env, child)))
590                 GOTO(unlock, rc = -ENOTDIR);
591
592         /* 1b.2. insert dot into child dir */
593         rc = dt_insert(env, child, (const struct dt_rec *)cfid,
594                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
595         if (rc != 0)
596                 GOTO(unlock, rc);
597
598         /* 1b.3. insert dotdot into child dir */
599         rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
600                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
601         if (rc != 0)
602                 GOTO(unlock, rc);
603
604         /* 2b. increase child nlink */
605         rc = dt_ref_add(env, child, th);
606         if (rc != 0)
607                 GOTO(unlock, rc);
608
609         /* 3b. insert linkEA for child */
610         rc = dt_xattr_set(env, child, &linkea_buf,
611                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
612         if (rc != 0)
613                 GOTO(unlock, rc);
614
615         bk->lb_lpf_fid = *cfid;
616         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
617
618         /* 4b. update bookmark */
619         rc = dt_record_write(env, bk_obj,
620                              lfsck_buf_get(env, bk, len), &pos, th);
621
622         dt_write_unlock(env, child);
623         dt_trans_stop(env, dev, th);
624         if (rc != 0)
625                 RETURN(rc);
626
627         /* Transaction II: remotely */
628
629         dev = lfsck->li_next;
630         th = dt_trans_create(env, dev);
631         if (IS_ERR(th))
632                 RETURN(PTR_ERR(th));
633
634         /* 5a. insert name into parent dir */
635         rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
636                                (const struct dt_key *)name, th);
637         if (rc != 0)
638                 GOTO(stop, rc);
639
640         /* 6a. increase parent nlink */
641         rc = dt_declare_ref_add(env, parent, th);
642         if (rc != 0)
643                 GOTO(stop, rc);
644
645         rc = dt_trans_start(env, dev, th);
646         if (rc != 0)
647                 GOTO(stop, rc);
648
649         /* 5b. insert name into parent dir */
650         rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
651                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
652         if (rc != 0)
653                 GOTO(stop, rc);
654
655         dt_write_lock(env, parent, 0);
656         /* 6b. increase parent nlink */
657         rc = dt_ref_add(env, parent, th);
658         dt_write_unlock(env, parent);
659
660         GOTO(stop, rc);
661
662 unlock:
663         dt_write_unlock(env, child);
664 stop:
665         dt_trans_stop(env, dev, th);
666
667         if (rc != 0 && dev == lfsck->li_next)
668                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
669                        "for orphans, but failed to insert the name %s "
670                        "to the .lustre/lost+found/. Such inconsistency "
671                        "will be repaired when LFSCK run next time: rc = %d\n",
672                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
673
674         return rc;
675 }
676
677 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
678  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
679  * only when it is required, such as orphan OST-objects repairing. */
680 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
681 {
682         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
683         struct lfsck_thread_info *info  = lfsck_env_info(env);
684         struct lu_fid            *cfid  = &info->lti_fid2;
685         struct lu_attr           *la    = &info->lti_la;
686         struct dt_object_format  *dof   = &info->lti_dof;
687         struct dt_object         *parent = NULL;
688         struct dt_object         *child = NULL;
689         char                      name[8];
690         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
691         int                       rc    = 0;
692         ENTRY;
693
694         LASSERT(lfsck->li_master);
695
696         sprintf(name, "MDT%04x", node);
697         if (node == 0) {
698                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
699                                                   &LU_LPF_FID);
700         } else {
701                 struct lfsck_tgt_desc *ltd;
702
703                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
704                 if (unlikely(ltd == NULL))
705                         RETURN(-ENXIO);
706
707                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
708                                                   &LU_LPF_FID);
709                 lfsck_tgt_put(ltd);
710         }
711         if (IS_ERR(parent))
712                 RETURN(PTR_ERR(parent));
713
714         if (unlikely(!dt_try_as_dir(env, parent)))
715                 GOTO(out, rc = -ENOTDIR);
716
717         mutex_lock(&lfsck->li_mutex);
718         if (lfsck->li_lpf_obj != NULL)
719                 GOTO(unlock, rc = 0);
720
721         if (fid_is_zero(&bk->lb_lpf_fid)) {
722                 /* There is corner case that: in former LFSCK scanning we have
723                  * created the .lustre/lost+found/MDTxxxx but failed to update
724                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
725                  * it from MDT0 firstly. */
726                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
727                                (const struct dt_key *)name, BYPASS_CAPA);
728                 if (rc != 0 && rc != -ENOENT)
729                         GOTO(unlock, rc);
730
731                 if (rc == 0) {
732                         bk->lb_lpf_fid = *cfid;
733                         rc = lfsck_bookmark_store(env, lfsck);
734                 } else {
735                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
736                 }
737                 if (rc != 0)
738                         GOTO(unlock, rc);
739         } else {
740                 *cfid = bk->lb_lpf_fid;
741         }
742
743         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
744         if (IS_ERR(child))
745                 GOTO(unlock, rc = PTR_ERR(child));
746
747         if (dt_object_exists(child) != 0) {
748                 if (unlikely(!dt_try_as_dir(env, child)))
749                         rc = -ENOTDIR;
750                 else
751                         lfsck->li_lpf_obj = child;
752
753                 GOTO(unlock, rc);
754         }
755
756         memset(la, 0, sizeof(*la));
757         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
758         la->la_mode = S_IFDIR | S_IRWXU;
759         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
760                        LA_UID | LA_GID;
761         memset(dof, 0, sizeof(*dof));
762         dof->dof_type = dt_mode_to_dft(S_IFDIR);
763
764         if (node == 0)
765                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
766                                             dof, name);
767         else
768                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
769                                              dof, name);
770         if (rc == 0)
771                 lfsck->li_lpf_obj = child;
772
773         GOTO(unlock, rc);
774
775 unlock:
776         mutex_unlock(&lfsck->li_mutex);
777         if (rc != 0 && child != NULL && !IS_ERR(child))
778                 lu_object_put(env, &child->do_lu);
779 out:
780         if (parent != NULL && !IS_ERR(parent))
781                 lu_object_put(env, &parent->do_lu);
782
783         return rc;
784 }
785
786 static int lfsck_fid_init(struct lfsck_instance *lfsck)
787 {
788         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
789         struct seq_server_site  *ss;
790         char                    *prefix;
791         int                      rc     = 0;
792         ENTRY;
793
794         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
795         if (unlikely(ss == NULL))
796                 RETURN(-ENXIO);
797
798         OBD_ALLOC_PTR(lfsck->li_seq);
799         if (lfsck->li_seq == NULL)
800                 RETURN(-ENOMEM);
801
802         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
803         if (prefix == NULL)
804                 GOTO(out, rc = -ENOMEM);
805
806         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
807         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
808                              ss->ss_server_seq);
809         OBD_FREE(prefix, MAX_OBD_NAME + 7);
810         if (rc != 0)
811                 GOTO(out, rc);
812
813         if (fid_is_sane(&bk->lb_last_fid))
814                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
815
816         RETURN(0);
817
818 out:
819         OBD_FREE_PTR(lfsck->li_seq);
820         lfsck->li_seq = NULL;
821
822         return rc;
823 }
824
825 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
826 {
827         if (lfsck->li_seq != NULL) {
828                 seq_client_fini(lfsck->li_seq);
829                 OBD_FREE_PTR(lfsck->li_seq);
830                 lfsck->li_seq = NULL;
831         }
832 }
833
834 void lfsck_instance_cleanup(const struct lu_env *env,
835                             struct lfsck_instance *lfsck)
836 {
837         struct ptlrpc_thread    *thread = &lfsck->li_thread;
838         struct lfsck_component  *com;
839         ENTRY;
840
841         LASSERT(list_empty(&lfsck->li_link));
842         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
843
844         if (lfsck->li_obj_oit != NULL) {
845                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
846                 lfsck->li_obj_oit = NULL;
847         }
848
849         LASSERT(lfsck->li_obj_dir == NULL);
850
851         while (!cfs_list_empty(&lfsck->li_list_scan)) {
852                 com = cfs_list_entry(lfsck->li_list_scan.next,
853                                      struct lfsck_component,
854                                      lc_link);
855                 lfsck_component_cleanup(env, com);
856         }
857
858         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
859
860         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
861                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
862                                      struct lfsck_component,
863                                      lc_link);
864                 lfsck_component_cleanup(env, com);
865         }
866
867         while (!cfs_list_empty(&lfsck->li_list_idle)) {
868                 com = cfs_list_entry(lfsck->li_list_idle.next,
869                                      struct lfsck_component,
870                                      lc_link);
871                 lfsck_component_cleanup(env, com);
872         }
873
874         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
875         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
876
877         if (lfsck->li_bookmark_obj != NULL) {
878                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
879                 lfsck->li_bookmark_obj = NULL;
880         }
881
882         if (lfsck->li_lpf_obj != NULL) {
883                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
884                 lfsck->li_lpf_obj = NULL;
885         }
886
887         if (lfsck->li_los != NULL) {
888                 local_oid_storage_fini(env, lfsck->li_los);
889                 lfsck->li_los = NULL;
890         }
891
892         lfsck_fid_fini(lfsck);
893
894         OBD_FREE_PTR(lfsck);
895 }
896
897 static inline struct lfsck_instance *
898 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
899 {
900         struct lfsck_instance *lfsck;
901
902         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
903                 if (lfsck->li_bottom == key) {
904                         if (ref)
905                                 lfsck_instance_get(lfsck);
906                         if (unlink)
907                                 list_del_init(&lfsck->li_link);
908
909                         return lfsck;
910                 }
911         }
912
913         return NULL;
914 }
915
916 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
917                                            bool unlink)
918 {
919         struct lfsck_instance *lfsck;
920
921         spin_lock(&lfsck_instance_lock);
922         lfsck = __lfsck_instance_find(key, ref, unlink);
923         spin_unlock(&lfsck_instance_lock);
924
925         return lfsck;
926 }
927
928 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
929 {
930         struct lfsck_instance *tmp;
931
932         spin_lock(&lfsck_instance_lock);
933         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
934                 if (lfsck->li_bottom == tmp->li_bottom) {
935                         spin_unlock(&lfsck_instance_lock);
936                         return -EEXIST;
937                 }
938         }
939
940         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
941         spin_unlock(&lfsck_instance_lock);
942         return 0;
943 }
944
945 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
946                     const char *prefix)
947 {
948         int flag;
949         int i;
950         bool newline = (bits != 0 ? false : true);
951
952         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
953
954         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
955                 if (flag & bits) {
956                         bits &= ~flag;
957                         if (names[i] != NULL) {
958                                 if (bits == 0)
959                                         newline = true;
960
961                                 seq_printf(m, "%s%c", names[i],
962                                            newline ? '\n' : ',');
963                         }
964                 }
965         }
966
967         if (!newline)
968                 seq_printf(m, "\n");
969         return 0;
970 }
971
972 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
973 {
974         if (time != 0)
975                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
976                           cfs_time_current_sec() - time);
977         else
978                 seq_printf(m, "%s: N/A\n", prefix);
979         return 0;
980 }
981
982 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
983                    const char *prefix)
984 {
985         if (fid_is_zero(&pos->lp_dir_parent)) {
986                 if (pos->lp_oit_cookie == 0)
987                         seq_printf(m, "%s: N/A, N/A, N/A\n",
988                                    prefix);
989                 else
990                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
991                                    prefix, pos->lp_oit_cookie);
992         } else {
993                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
994                            prefix, pos->lp_oit_cookie,
995                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
996         }
997         return 0;
998 }
999
1000 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1001                     struct lfsck_position *pos, bool init)
1002 {
1003         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1004
1005         if (unlikely(lfsck->li_di_oit == NULL)) {
1006                 memset(pos, 0, sizeof(*pos));
1007                 return;
1008         }
1009
1010         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1011         if (!lfsck->li_current_oit_processed && !init)
1012                 pos->lp_oit_cookie--;
1013
1014         LASSERT(pos->lp_oit_cookie > 0);
1015
1016         if (lfsck->li_di_dir != NULL) {
1017                 struct dt_object *dto = lfsck->li_obj_dir;
1018
1019                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1020                                                         lfsck->li_di_dir);
1021
1022                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1023                         fid_zero(&pos->lp_dir_parent);
1024                         pos->lp_dir_cookie = 0;
1025                 } else {
1026                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1027                 }
1028         } else {
1029                 fid_zero(&pos->lp_dir_parent);
1030                 pos->lp_dir_cookie = 0;
1031         }
1032 }
1033
1034 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1035 {
1036         bool dirty = false;
1037
1038         if (limit != LFSCK_SPEED_NO_LIMIT) {
1039                 if (limit > HZ) {
1040                         lfsck->li_sleep_rate = limit / HZ;
1041                         lfsck->li_sleep_jif = 1;
1042                 } else {
1043                         lfsck->li_sleep_rate = 1;
1044                         lfsck->li_sleep_jif = HZ / limit;
1045                 }
1046         } else {
1047                 lfsck->li_sleep_jif = 0;
1048                 lfsck->li_sleep_rate = 0;
1049         }
1050
1051         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1052                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1053                 dirty = true;
1054         }
1055
1056         return dirty;
1057 }
1058
1059 void lfsck_control_speed(struct lfsck_instance *lfsck)
1060 {
1061         struct ptlrpc_thread *thread = &lfsck->li_thread;
1062         struct l_wait_info    lwi;
1063
1064         if (lfsck->li_sleep_jif > 0 &&
1065             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1066                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1067                                        LWI_ON_SIGNAL_NOOP, NULL);
1068
1069                 l_wait_event(thread->t_ctl_waitq,
1070                              !thread_is_running(thread),
1071                              &lwi);
1072                 lfsck->li_new_scanned = 0;
1073         }
1074 }
1075
1076 void lfsck_control_speed_by_self(struct lfsck_component *com)
1077 {
1078         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1079         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1080         struct l_wait_info       lwi;
1081
1082         if (lfsck->li_sleep_jif > 0 &&
1083             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1084                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1085                                        LWI_ON_SIGNAL_NOOP, NULL);
1086
1087                 l_wait_event(thread->t_ctl_waitq,
1088                              !thread_is_running(thread),
1089                              &lwi);
1090                 com->lc_new_scanned = 0;
1091         }
1092 }
1093
1094 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
1095                             struct lu_fid *fid)
1096 {
1097         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
1098                      !dt_try_as_dir(env, obj)))
1099                 return -ENOTDIR;
1100
1101         return dt_lookup(env, obj, (struct dt_rec *)fid,
1102                          (const struct dt_key *)"..", BYPASS_CAPA);
1103 }
1104
1105 static int lfsck_needs_scan_dir(const struct lu_env *env,
1106                                 struct lfsck_instance *lfsck,
1107                                 struct dt_object *obj)
1108 {
1109         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
1110         int            depth = 0;
1111         int            rc;
1112
1113         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
1114             cfs_list_empty(&lfsck->li_list_dir))
1115                RETURN(0);
1116
1117         while (1) {
1118                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
1119                  *      which is the agent directory to manage the objects
1120                  *      which name entries reside on remote MDTs. Related
1121                  *      consistency verification will be processed in LFSCK
1122                  *      phase III. */
1123                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
1124                         if (depth > 0)
1125                                 lfsck_object_put(env, obj);
1126                         return 1;
1127                 }
1128
1129                 /* No need to check .lustre and its children. */
1130                 if (fid_seq_is_dot_lustre(fid_seq(lfsck_dto2fid(obj)))) {
1131                         if (depth > 0)
1132                                 lfsck_object_put(env, obj);
1133                         return 0;
1134                 }
1135
1136                 dt_read_lock(env, obj, MOR_TGT_CHILD);
1137                 if (unlikely(lfsck_is_dead_obj(obj))) {
1138                         dt_read_unlock(env, obj);
1139                         if (depth > 0)
1140                                 lfsck_object_put(env, obj);
1141                         return 0;
1142                 }
1143
1144                 rc = dt_xattr_get(env, obj,
1145                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
1146                                   BYPASS_CAPA);
1147                 dt_read_unlock(env, obj);
1148                 if (rc >= 0) {
1149                         if (depth > 0)
1150                                 lfsck_object_put(env, obj);
1151                         return 1;
1152                 }
1153
1154                 if (rc < 0 && rc != -ENODATA) {
1155                         if (depth > 0)
1156                                 lfsck_object_put(env, obj);
1157                         return rc;
1158                 }
1159
1160                 rc = lfsck_parent_fid(env, obj, fid);
1161                 if (depth > 0)
1162                         lfsck_object_put(env, obj);
1163                 if (rc != 0)
1164                         return rc;
1165
1166                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
1167                         return 0;
1168
1169                 obj = lfsck_object_find(env, lfsck, fid);
1170                 if (obj == NULL)
1171                         return 0;
1172                 else if (IS_ERR(obj))
1173                         return PTR_ERR(obj);
1174
1175                 if (!dt_object_exists(obj)) {
1176                         lfsck_object_put(env, obj);
1177                         return 0;
1178                 }
1179
1180                 if (dt_object_remote(obj)) {
1181                         /* .lustre/lost+found/MDTxxx can be remote directory. */
1182                         if (fid_seq_is_dot_lustre(fid_seq(lfsck_dto2fid(obj))))
1183                                 rc = 0;
1184                         else
1185                                 /* Other remote directory should be client
1186                                  * visible and need to be checked. */
1187                                 rc = 1;
1188                         lfsck_object_put(env, obj);
1189                         return rc;
1190                 }
1191
1192                 depth++;
1193         }
1194         return 0;
1195 }
1196
1197 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1198                                                  struct lfsck_component *com,
1199                                                  struct lfsck_start_param *lsp)
1200 {
1201         struct lfsck_thread_args *lta;
1202         int                       rc;
1203
1204         OBD_ALLOC_PTR(lta);
1205         if (lta == NULL)
1206                 return ERR_PTR(-ENOMEM);
1207
1208         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1209         if (rc != 0) {
1210                 OBD_FREE_PTR(lta);
1211                 return ERR_PTR(rc);
1212         }
1213
1214         lta->lta_lfsck = lfsck_instance_get(lfsck);
1215         if (com != NULL)
1216                 lta->lta_com = lfsck_component_get(com);
1217
1218         lta->lta_lsp = lsp;
1219
1220         return lta;
1221 }
1222
1223 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1224 {
1225         if (lta->lta_com != NULL)
1226                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1227         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1228         lu_env_fini(&lta->lta_env);
1229         OBD_FREE_PTR(lta);
1230 }
1231
1232 /* LFSCK wrap functions */
1233
1234 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
1235                 bool new_checked)
1236 {
1237         struct lfsck_component *com;
1238
1239         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1240                 com->lc_ops->lfsck_fail(env, com, new_checked);
1241         }
1242 }
1243
1244 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
1245 {
1246         struct lfsck_component *com;
1247         int                     rc  = 0;
1248         int                     rc1 = 0;
1249
1250         if (likely(cfs_time_beforeq(cfs_time_current(),
1251                                     lfsck->li_time_next_checkpoint)))
1252                 return 0;
1253
1254         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1255         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1256                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
1257                 if (rc != 0)
1258                         rc1 = rc;
1259         }
1260
1261         lfsck->li_time_last_checkpoint = cfs_time_current();
1262         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1263                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1264         return rc1 != 0 ? rc1 : rc;
1265 }
1266
1267 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
1268                struct lfsck_start_param *lsp)
1269 {
1270         struct dt_object       *obj     = NULL;
1271         struct lfsck_component *com;
1272         struct lfsck_component *next;
1273         struct lfsck_position  *pos     = NULL;
1274         const struct dt_it_ops *iops    =
1275                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
1276         struct dt_it           *di;
1277         int                     rc;
1278         ENTRY;
1279
1280         LASSERT(lfsck->li_obj_dir == NULL);
1281         LASSERT(lfsck->li_di_dir == NULL);
1282
1283         lfsck->li_current_oit_processed = 0;
1284         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1285                 com->lc_new_checked = 0;
1286                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1287                         com->lc_journal = 0;
1288
1289                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
1290                 if (rc != 0)
1291                         GOTO(out, rc);
1292
1293                 if ((pos == NULL) ||
1294                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
1295                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
1296                         pos = &com->lc_pos_start;
1297         }
1298
1299         /* Init otable-based iterator. */
1300         if (pos == NULL) {
1301                 rc = iops->load(env, lfsck->li_di_oit, 0);
1302                 if (rc > 0) {
1303                         lfsck->li_oit_over = 1;
1304                         rc = 0;
1305                 }
1306
1307                 GOTO(out, rc);
1308         }
1309
1310         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
1311         if (rc < 0)
1312                 GOTO(out, rc);
1313         else if (rc > 0)
1314                 lfsck->li_oit_over = 1;
1315
1316         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
1317                 GOTO(out, rc = 0);
1318
1319         /* Find the directory for namespace-based traverse. */
1320         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
1321         if (obj == NULL)
1322                 GOTO(out, rc = 0);
1323         else if (IS_ERR(obj))
1324                 RETURN(PTR_ERR(obj));
1325
1326         /* XXX: Currently, skip remote object, the consistency for
1327          *      remote object will be processed in LFSCK phase III. */
1328         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
1329             unlikely(!S_ISDIR(lfsck_object_type(obj))))
1330                 GOTO(out, rc = 0);
1331
1332         if (unlikely(!dt_try_as_dir(env, obj)))
1333                 GOTO(out, rc = -ENOTDIR);
1334
1335         /* Init the namespace-based directory traverse. */
1336         iops = &obj->do_index_ops->dio_it;
1337         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1338         if (IS_ERR(di))
1339                 GOTO(out, rc = PTR_ERR(di));
1340
1341         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
1342
1343         rc = iops->load(env, di, pos->lp_dir_cookie);
1344         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
1345                 rc = iops->next(env, di);
1346         else if (rc > 0)
1347                 rc = 0;
1348
1349         if (rc != 0) {
1350                 iops->put(env, di);
1351                 iops->fini(env, di);
1352                 GOTO(out, rc);
1353         }
1354
1355         lfsck->li_obj_dir = lfsck_object_get(obj);
1356         lfsck->li_cookie_dir = iops->store(env, di);
1357         spin_lock(&lfsck->li_lock);
1358         lfsck->li_di_dir = di;
1359         spin_unlock(&lfsck->li_lock);
1360
1361         GOTO(out, rc = 0);
1362
1363 out:
1364         if (obj != NULL)
1365                 lfsck_object_put(env, obj);
1366
1367         if (rc < 0) {
1368                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1369                                              lc_link)
1370                         com->lc_ops->lfsck_post(env, com, rc, true);
1371
1372                 return rc;
1373         }
1374
1375         rc = 0;
1376         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
1377         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1378                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
1379                 if (rc != 0)
1380                         break;
1381         }
1382
1383         lfsck->li_time_last_checkpoint = cfs_time_current();
1384         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1385                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1386         return rc;
1387 }
1388
1389 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
1390                    struct dt_object *obj)
1391 {
1392         struct lfsck_component *com;
1393         const struct dt_it_ops *iops;
1394         struct dt_it           *di;
1395         int                     rc;
1396         ENTRY;
1397
1398         LASSERT(lfsck->li_obj_dir == NULL);
1399
1400         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1401                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
1402                 if (rc != 0)
1403                         RETURN(rc);
1404         }
1405
1406         rc = lfsck_needs_scan_dir(env, lfsck, obj);
1407         if (rc <= 0)
1408                 GOTO(out, rc);
1409
1410         if (unlikely(!dt_try_as_dir(env, obj)))
1411                 GOTO(out, rc = -ENOTDIR);
1412
1413         iops = &obj->do_index_ops->dio_it;
1414         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1415         if (IS_ERR(di))
1416                 GOTO(out, rc = PTR_ERR(di));
1417
1418         rc = iops->load(env, di, 0);
1419         if (rc == 0)
1420                 rc = iops->next(env, di);
1421         else if (rc > 0)
1422                 rc = 0;
1423
1424         if (rc != 0) {
1425                 iops->put(env, di);
1426                 iops->fini(env, di);
1427                 GOTO(out, rc);
1428         }
1429
1430         lfsck->li_obj_dir = lfsck_object_get(obj);
1431         lfsck->li_cookie_dir = iops->store(env, di);
1432         spin_lock(&lfsck->li_lock);
1433         lfsck->li_di_dir = di;
1434         spin_unlock(&lfsck->li_lock);
1435
1436         GOTO(out, rc = 0);
1437
1438 out:
1439         if (rc < 0)
1440                 lfsck_fail(env, lfsck, false);
1441         return (rc > 0 ? 0 : rc);
1442 }
1443
1444 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
1445                    struct dt_object *obj, struct lu_dirent *ent)
1446 {
1447         struct lfsck_component *com;
1448         int                     rc;
1449
1450         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1451                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
1452                 if (rc != 0)
1453                         return rc;
1454         }
1455         return 0;
1456 }
1457
1458 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
1459                int result)
1460 {
1461         struct lfsck_component *com;
1462         struct lfsck_component *next;
1463         int                     rc  = 0;
1464         int                     rc1 = 0;
1465
1466         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1467         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1468                 rc = com->lc_ops->lfsck_post(env, com, result, false);
1469                 if (rc != 0)
1470                         rc1 = rc;
1471         }
1472
1473         lfsck->li_time_last_checkpoint = cfs_time_current();
1474         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1475                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1476
1477         /* Ignore some component post failure to make other can go ahead. */
1478         return result;
1479 }
1480
1481 static void lfsck_interpret(const struct lu_env *env,
1482                             struct lfsck_instance *lfsck,
1483                             struct ptlrpc_request *req, void *args, int result)
1484 {
1485         struct lfsck_async_interpret_args *laia = args;
1486         struct lfsck_component            *com;
1487
1488         LASSERT(laia->laia_com == NULL);
1489         LASSERT(laia->laia_shared);
1490
1491         spin_lock(&lfsck->li_lock);
1492         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1493                 if (com->lc_ops->lfsck_interpret != NULL) {
1494                         laia->laia_com = com;
1495                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1496                 }
1497         }
1498
1499         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1500                 if (com->lc_ops->lfsck_interpret != NULL) {
1501                         laia->laia_com = com;
1502                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1503                 }
1504         }
1505         spin_unlock(&lfsck->li_lock);
1506 }
1507
1508 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
1509 {
1510         struct lfsck_component *com;
1511         struct lfsck_component *next;
1512         struct l_wait_info      lwi = { 0 };
1513         int                     rc  = 0;
1514         int                     rc1 = 0;
1515
1516         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1517                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1518                         com->lc_journal = 0;
1519
1520                 rc = com->lc_ops->lfsck_double_scan(env, com);
1521                 if (rc != 0)
1522                         rc1 = rc;
1523         }
1524
1525         l_wait_event(lfsck->li_thread.t_ctl_waitq,
1526                      atomic_read(&lfsck->li_double_scan_count) == 0,
1527                      &lwi);
1528
1529         if (lfsck->li_status != LS_PAUSED &&
1530             lfsck->li_status != LS_CO_PAUSED) {
1531                 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1532                                          lc_link) {
1533                         spin_lock(&lfsck->li_lock);
1534                         list_del_init(&com->lc_link);
1535                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1536                         spin_unlock(&lfsck->li_lock);
1537                 }
1538         }
1539
1540         return rc1 != 0 ? rc1 : rc;
1541 }
1542
1543 static int lfsck_stop_notify(const struct lu_env *env,
1544                              struct lfsck_instance *lfsck,
1545                              struct lfsck_tgt_descs *ltds,
1546                              struct lfsck_tgt_desc *ltd, __u16 type)
1547 {
1548         struct ptlrpc_request_set *set;
1549         struct lfsck_component    *com;
1550         int                        rc  = 0;
1551         ENTRY;
1552
1553         spin_lock(&lfsck->li_lock);
1554         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1555         if (com == NULL)
1556                 com = __lfsck_component_find(lfsck, type,
1557                                              &lfsck->li_list_double_scan);
1558         if (com != NULL)
1559                 lfsck_component_get(com);
1560         spin_lock(&lfsck->li_lock);
1561
1562         if (com != NULL) {
1563                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1564                         set = ptlrpc_prep_set();
1565                         if (set == NULL) {
1566                                 lfsck_component_put(env, com);
1567
1568                                 RETURN(-ENOMEM);
1569                         }
1570
1571                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1572                                                             ltd, set);
1573                         if (rc == 0)
1574                                 rc = ptlrpc_set_wait(set);
1575
1576                         ptlrpc_set_destroy(set);
1577                 }
1578
1579                 lfsck_component_put(env, com);
1580         }
1581
1582         RETURN(rc);
1583 }
1584
1585 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
1586 {
1587         struct lfsck_component *com;
1588         struct lfsck_component *next;
1589
1590         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1591                                  lc_link) {
1592                 if (com->lc_ops->lfsck_quit != NULL)
1593                         com->lc_ops->lfsck_quit(env, com);
1594
1595                 spin_lock(&lfsck->li_lock);
1596                 list_del_init(&com->lc_link);
1597                 list_del_init(&com->lc_link_dir);
1598                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1599                 spin_unlock(&lfsck->li_lock);
1600         }
1601
1602         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1603                                  lc_link) {
1604                 if (com->lc_ops->lfsck_quit != NULL)
1605                         com->lc_ops->lfsck_quit(env, com);
1606
1607                 spin_lock(&lfsck->li_lock);
1608                 list_del_init(&com->lc_link);
1609                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1610                 spin_unlock(&lfsck->li_lock);
1611         }
1612 }
1613
1614 static int lfsck_async_interpret(const struct lu_env *env,
1615                                  struct ptlrpc_request *req,
1616                                  void *args, int rc)
1617 {
1618         struct lfsck_async_interpret_args *laia = args;
1619         struct lfsck_instance             *lfsck;
1620
1621         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1622                               li_mdt_descs);
1623         lfsck_interpret(env, lfsck, req, laia, rc);
1624         lfsck_tgt_put(laia->laia_ltd);
1625         if (rc != 0 && laia->laia_result != -EALREADY)
1626                 laia->laia_result = rc;
1627
1628         return 0;
1629 }
1630
1631 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1632                         struct lfsck_request *lr,
1633                         struct ptlrpc_request_set *set,
1634                         ptlrpc_interpterer_t interpreter,
1635                         void *args, int request)
1636 {
1637         struct lfsck_async_interpret_args *laia;
1638         struct ptlrpc_request             *req;
1639         struct lfsck_request              *tmp;
1640         struct req_format                 *format;
1641         int                                rc;
1642
1643         switch (request) {
1644         case LFSCK_NOTIFY:
1645                 format = &RQF_LFSCK_NOTIFY;
1646                 break;
1647         case LFSCK_QUERY:
1648                 format = &RQF_LFSCK_QUERY;
1649                 break;
1650         default:
1651                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
1652                        exp->exp_obd->obd_name, request, -EINVAL);
1653                 return -EINVAL;
1654         }
1655
1656         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1657         if (req == NULL)
1658                 return -ENOMEM;
1659
1660         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1661         if (rc != 0) {
1662                 ptlrpc_request_free(req);
1663
1664                 return rc;
1665         }
1666
1667         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1668         *tmp = *lr;
1669         ptlrpc_request_set_replen(req);
1670
1671         laia = ptlrpc_req_async_args(req);
1672         *laia = *(struct lfsck_async_interpret_args *)args;
1673         if (laia->laia_com != NULL)
1674                 lfsck_component_get(laia->laia_com);
1675         req->rq_interpret_reply = interpreter;
1676         ptlrpc_set_add_req(set, req);
1677
1678         return 0;
1679 }
1680
1681 /* external interfaces */
1682
1683 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
1684 {
1685         struct lu_env           env;
1686         struct lfsck_instance  *lfsck;
1687         int                     rc;
1688         ENTRY;
1689
1690         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1691         if (rc != 0)
1692                 RETURN(rc);
1693
1694         lfsck = lfsck_instance_find(key, true, false);
1695         if (likely(lfsck != NULL)) {
1696                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
1697                 lfsck_instance_put(&env, lfsck);
1698         } else {
1699                 rc = -ENXIO;
1700         }
1701
1702         lu_env_fini(&env);
1703
1704         RETURN(rc);
1705 }
1706 EXPORT_SYMBOL(lfsck_get_speed);
1707
1708 int lfsck_set_speed(struct dt_device *key, int val)
1709 {
1710         struct lu_env           env;
1711         struct lfsck_instance  *lfsck;
1712         int                     rc;
1713         ENTRY;
1714
1715         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1716         if (rc != 0)
1717                 RETURN(rc);
1718
1719         lfsck = lfsck_instance_find(key, true, false);
1720         if (likely(lfsck != NULL)) {
1721                 mutex_lock(&lfsck->li_mutex);
1722                 if (__lfsck_set_speed(lfsck, val))
1723                         rc = lfsck_bookmark_store(&env, lfsck);
1724                 mutex_unlock(&lfsck->li_mutex);
1725                 lfsck_instance_put(&env, lfsck);
1726         } else {
1727                 rc = -ENXIO;
1728         }
1729
1730         lu_env_fini(&env);
1731
1732         RETURN(rc);
1733 }
1734 EXPORT_SYMBOL(lfsck_set_speed);
1735
1736 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
1737 {
1738         struct lu_env           env;
1739         struct lfsck_instance  *lfsck;
1740         int                     rc;
1741         ENTRY;
1742
1743         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1744         if (rc != 0)
1745                 RETURN(rc);
1746
1747         lfsck = lfsck_instance_find(key, true, false);
1748         if (likely(lfsck != NULL)) {
1749                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
1750                 lfsck_instance_put(&env, lfsck);
1751         } else {
1752                 rc = -ENXIO;
1753         }
1754
1755         lu_env_fini(&env);
1756
1757         RETURN(rc);
1758 }
1759 EXPORT_SYMBOL(lfsck_get_windows);
1760
1761 int lfsck_set_windows(struct dt_device *key, int val)
1762 {
1763         struct lu_env           env;
1764         struct lfsck_instance  *lfsck;
1765         int                     rc;
1766         ENTRY;
1767
1768         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1769         if (rc != 0)
1770                 RETURN(rc);
1771
1772         lfsck = lfsck_instance_find(key, true, false);
1773         if (likely(lfsck != NULL)) {
1774                 if (val > LFSCK_ASYNC_WIN_MAX) {
1775                         CWARN("%s: Too large async window size, which "
1776                               "may cause memory issues. The valid range "
1777                               "is [0 - %u]. If you do not want to restrict "
1778                               "the window size for async requests pipeline, "
1779                               "just set it as 0.\n",
1780                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1781                         rc = -EINVAL;
1782                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1783                         mutex_lock(&lfsck->li_mutex);
1784                         lfsck->li_bookmark_ram.lb_async_windows = val;
1785                         rc = lfsck_bookmark_store(&env, lfsck);
1786                         mutex_unlock(&lfsck->li_mutex);
1787                 }
1788                 lfsck_instance_put(&env, lfsck);
1789         } else {
1790                 rc = -ENXIO;
1791         }
1792
1793         lu_env_fini(&env);
1794
1795         RETURN(rc);
1796 }
1797 EXPORT_SYMBOL(lfsck_set_windows);
1798
1799 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
1800 {
1801         struct lu_env           env;
1802         struct lfsck_instance  *lfsck;
1803         struct lfsck_component *com;
1804         int                     rc;
1805         ENTRY;
1806
1807         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1808         if (rc != 0)
1809                 RETURN(rc);
1810
1811         lfsck = lfsck_instance_find(key, true, false);
1812         if (likely(lfsck != NULL)) {
1813                 com = lfsck_component_find(lfsck, type);
1814                 if (likely(com != NULL)) {
1815                         rc = com->lc_ops->lfsck_dump(&env, com, m);
1816                         lfsck_component_put(&env, com);
1817                 } else {
1818                         rc = -ENOTSUPP;
1819                 }
1820
1821                 lfsck_instance_put(&env, lfsck);
1822         } else {
1823                 rc = -ENXIO;
1824         }
1825
1826         lu_env_fini(&env);
1827
1828         RETURN(rc);
1829 }
1830 EXPORT_SYMBOL(lfsck_dump);
1831
1832 static int lfsck_stop_all(const struct lu_env *env,
1833                           struct lfsck_instance *lfsck,
1834                           struct lfsck_stop *stop)
1835 {
1836         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1837         struct lfsck_request              *lr     = &info->lti_lr;
1838         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1839         struct ptlrpc_request_set         *set;
1840         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1841         struct lfsck_tgt_desc             *ltd;
1842         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1843         __u32                              idx;
1844         int                                rc     = 0;
1845         int                                rc1    = 0;
1846         ENTRY;
1847
1848         LASSERT(stop->ls_flags & LPF_BROADCAST);
1849
1850         set = ptlrpc_prep_set();
1851         if (unlikely(set == NULL))
1852                 RETURN(-ENOMEM);
1853
1854         memset(lr, 0, sizeof(*lr));
1855         lr->lr_event = LE_STOP;
1856         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1857         lr->lr_status = stop->ls_status;
1858         lr->lr_version = bk->lb_version;
1859         lr->lr_active = LFSCK_TYPES_ALL;
1860         lr->lr_param = stop->ls_flags;
1861
1862         laia->laia_com = NULL;
1863         laia->laia_ltds = ltds;
1864         laia->laia_lr = lr;
1865         laia->laia_result = 0;
1866         laia->laia_shared = 1;
1867
1868         down_read(&ltds->ltd_rw_sem);
1869         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1870                 ltd = lfsck_tgt_get(ltds, idx);
1871                 LASSERT(ltd != NULL);
1872
1873                 laia->laia_ltd = ltd;
1874                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1875                                          lfsck_async_interpret, laia,
1876                                          LFSCK_NOTIFY);
1877                 if (rc != 0) {
1878                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1879                         lfsck_tgt_put(ltd);
1880                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
1881                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1882                         rc1 = rc;
1883                 }
1884         }
1885         up_read(&ltds->ltd_rw_sem);
1886
1887         rc = ptlrpc_set_wait(set);
1888         ptlrpc_set_destroy(set);
1889
1890         if (rc == 0)
1891                 rc = laia->laia_result;
1892
1893         if (rc == -EALREADY)
1894                 rc = 0;
1895
1896         if (rc != 0)
1897                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1898                        lfsck_lfsck2name(lfsck), rc);
1899
1900         RETURN(rc != 0 ? rc : rc1);
1901 }
1902
1903 static int lfsck_start_all(const struct lu_env *env,
1904                            struct lfsck_instance *lfsck,
1905                            struct lfsck_start *start)
1906 {
1907         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1908         struct lfsck_request              *lr     = &info->lti_lr;
1909         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1910         struct ptlrpc_request_set         *set;
1911         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1912         struct lfsck_tgt_desc             *ltd;
1913         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1914         __u32                              idx;
1915         int                                rc     = 0;
1916         ENTRY;
1917
1918         LASSERT(start->ls_flags & LPF_BROADCAST);
1919
1920         set = ptlrpc_prep_set();
1921         if (unlikely(set == NULL))
1922                 RETURN(-ENOMEM);
1923
1924         memset(lr, 0, sizeof(*lr));
1925         lr->lr_event = LE_START;
1926         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1927         lr->lr_speed = bk->lb_speed_limit;
1928         lr->lr_version = bk->lb_version;
1929         lr->lr_active = start->ls_active;
1930         lr->lr_param = start->ls_flags;
1931         lr->lr_async_windows = bk->lb_async_windows;
1932         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1933                        LSV_ASYNC_WINDOWS;
1934
1935         laia->laia_com = NULL;
1936         laia->laia_ltds = ltds;
1937         laia->laia_lr = lr;
1938         laia->laia_result = 0;
1939         laia->laia_shared = 1;
1940
1941         down_read(&ltds->ltd_rw_sem);
1942         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1943                 ltd = lfsck_tgt_get(ltds, idx);
1944                 LASSERT(ltd != NULL);
1945
1946                 laia->laia_ltd = ltd;
1947                 ltd->ltd_layout_done = 0;
1948                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1949                                          lfsck_async_interpret, laia,
1950                                          LFSCK_NOTIFY);
1951                 if (rc != 0) {
1952                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1953                         lfsck_tgt_put(ltd);
1954                         CERROR("%s: cannot notify MDT %x for LFSCK "
1955                                "start, failout: rc = %d\n",
1956                                lfsck_lfsck2name(lfsck), idx, rc);
1957                         break;
1958                 }
1959         }
1960         up_read(&ltds->ltd_rw_sem);
1961
1962         if (rc != 0) {
1963                 ptlrpc_set_destroy(set);
1964
1965                 RETURN(rc);
1966         }
1967
1968         rc = ptlrpc_set_wait(set);
1969         ptlrpc_set_destroy(set);
1970
1971         if (rc == 0)
1972                 rc = laia->laia_result;
1973
1974         if (rc != 0) {
1975                 struct lfsck_stop *stop = &info->lti_stop;
1976
1977                 CERROR("%s: cannot start LFSCK on some MDTs, "
1978                        "stop all: rc = %d\n",
1979                        lfsck_lfsck2name(lfsck), rc);
1980                 if (rc != -EALREADY) {
1981                         stop->ls_status = LS_FAILED;
1982                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1983                         lfsck_stop_all(env, lfsck, stop);
1984                 }
1985         }
1986
1987         RETURN(rc);
1988 }
1989
1990 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1991                 struct lfsck_start_param *lsp)
1992 {
1993         struct lfsck_start              *start  = lsp->lsp_start;
1994         struct lfsck_instance           *lfsck;
1995         struct lfsck_bookmark           *bk;
1996         struct ptlrpc_thread            *thread;
1997         struct lfsck_component          *com;
1998         struct l_wait_info               lwi    = { 0 };
1999         struct lfsck_thread_args        *lta;
2000         struct task_struct              *task;
2001         int                              rc     = 0;
2002         __u16                            valid  = 0;
2003         __u16                            flags  = 0;
2004         __u16                            type   = 1;
2005         ENTRY;
2006
2007         lfsck = lfsck_instance_find(key, true, false);
2008         if (unlikely(lfsck == NULL))
2009                 RETURN(-ENXIO);
2010
2011         /* System is not ready, try again later. */
2012         if (unlikely(lfsck->li_namespace == NULL))
2013                 GOTO(put, rc = -EAGAIN);
2014
2015         /* start == NULL means auto trigger paused LFSCK. */
2016         if ((start == NULL) &&
2017             (cfs_list_empty(&lfsck->li_list_scan) ||
2018              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2019                 GOTO(put, rc = 0);
2020
2021         bk = &lfsck->li_bookmark_ram;
2022         thread = &lfsck->li_thread;
2023         mutex_lock(&lfsck->li_mutex);
2024         spin_lock(&lfsck->li_lock);
2025         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2026                 rc = -EALREADY;
2027                 if (unlikely(start == NULL)) {
2028                         spin_unlock(&lfsck->li_lock);
2029                         GOTO(out, rc);
2030                 }
2031
2032                 while (start->ls_active != 0) {
2033                         if (!(type & start->ls_active)) {
2034                                 type <<= 1;
2035                                 continue;
2036                         }
2037
2038                         com = __lfsck_component_find(lfsck, type,
2039                                                      &lfsck->li_list_scan);
2040                         if (com == NULL)
2041                                 com = __lfsck_component_find(lfsck, type,
2042                                                 &lfsck->li_list_double_scan);
2043                         if (com == NULL) {
2044                                 rc = -EOPNOTSUPP;
2045                                 break;
2046                         }
2047
2048                         if (com->lc_ops->lfsck_join != NULL) {
2049                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2050                                 if (rc != 0 && rc != -EALREADY)
2051                                         break;
2052                         }
2053                         start->ls_active &= ~type;
2054                         type <<= 1;
2055                 }
2056                 spin_unlock(&lfsck->li_lock);
2057                 GOTO(out, rc);
2058         }
2059         spin_unlock(&lfsck->li_lock);
2060
2061         lfsck->li_status = 0;
2062         lfsck->li_oit_over = 0;
2063         lfsck->li_start_unplug = 0;
2064         lfsck->li_drop_dryrun = 0;
2065         lfsck->li_new_scanned = 0;
2066
2067         /* For auto trigger. */
2068         if (start == NULL)
2069                 goto trigger;
2070
2071         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2072                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2073                        lfsck_lfsck2name(lfsck));
2074
2075                 GOTO(out, rc = -EPERM);
2076         }
2077
2078         start->ls_version = bk->lb_version;
2079
2080         if (start->ls_active != 0) {
2081                 struct lfsck_component *next;
2082
2083                 if (start->ls_active == LFSCK_TYPES_ALL)
2084                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2085
2086                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2087                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2088                         GOTO(out, rc = -ENOTSUPP);
2089                 }
2090
2091                 list_for_each_entry_safe(com, next,
2092                                          &lfsck->li_list_scan, lc_link) {
2093                         if (!(com->lc_type & start->ls_active)) {
2094                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2095                                                              false);
2096                                 if (rc != 0)
2097                                         GOTO(out, rc);
2098                         }
2099                 }
2100
2101                 while (start->ls_active != 0) {
2102                         if (type & start->ls_active) {
2103                                 com = __lfsck_component_find(lfsck, type,
2104                                                         &lfsck->li_list_idle);
2105                                 if (com != NULL) {
2106                                         /* The component status will be updated
2107                                          * when its prep() is called later by
2108                                          * the LFSCK main engine. */
2109                                         list_del_init(&com->lc_link);
2110                                         list_add_tail(&com->lc_link,
2111                                                       &lfsck->li_list_scan);
2112                                 }
2113                                 start->ls_active &= ~type;
2114                         }
2115                         type <<= 1;
2116                 }
2117         }
2118
2119         if (list_empty(&lfsck->li_list_scan)) {
2120                 /* The speed limit will be used to control both the LFSCK and
2121                  * low layer scrub (if applied), need to be handled firstly. */
2122                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2123                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2124                                 rc = lfsck_bookmark_store(env, lfsck);
2125                                 if (rc != 0)
2126                                         GOTO(out, rc);
2127                         }
2128                 }
2129
2130                 goto trigger;
2131         }
2132
2133         if (start->ls_flags & LPF_RESET)
2134                 flags |= DOIF_RESET;
2135
2136         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2137         if (rc != 0)
2138                 GOTO(out, rc);
2139
2140         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2141                 start->ls_active |= com->lc_type;
2142                 if (flags & DOIF_RESET) {
2143                         rc = com->lc_ops->lfsck_reset(env, com, false);
2144                         if (rc != 0)
2145                                 GOTO(out, rc);
2146                 }
2147         }
2148
2149 trigger:
2150         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
2151         if (bk->lb_param & LPF_DRYRUN)
2152                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2153
2154         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2155                 valid |= DOIV_ERROR_HANDLE;
2156                 if (start->ls_flags & LPF_FAILOUT)
2157                         flags |= DOIF_FAILOUT;
2158         }
2159
2160         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2161                 valid |= DOIV_DRYRUN;
2162                 if (start->ls_flags & LPF_DRYRUN)
2163                         flags |= DOIF_DRYRUN;
2164         }
2165
2166         if (!list_empty(&lfsck->li_list_scan))
2167                 flags |= DOIF_OUTUSED;
2168
2169         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2170         thread_set_flags(thread, 0);
2171         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2172         if (IS_ERR(lta))
2173                 GOTO(out, rc = PTR_ERR(lta));
2174
2175         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2176         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2177         if (IS_ERR(task)) {
2178                 rc = PTR_ERR(task);
2179                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2180                        lfsck_lfsck2name(lfsck), rc);
2181                 lfsck_thread_args_fini(lta);
2182
2183                 GOTO(out, rc);
2184         }
2185
2186         l_wait_event(thread->t_ctl_waitq,
2187                      thread_is_running(thread) ||
2188                      thread_is_stopped(thread),
2189                      &lwi);
2190         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2191                 lfsck->li_start_unplug = 1;
2192                 wake_up_all(&thread->t_ctl_waitq);
2193
2194                 GOTO(out, rc = 0);
2195         }
2196
2197         /* release lfsck::li_mutex to avoid deadlock. */
2198         mutex_unlock(&lfsck->li_mutex);
2199         rc = lfsck_start_all(env, lfsck, start);
2200         if (rc != 0) {
2201                 spin_lock(&lfsck->li_lock);
2202                 if (thread_is_stopped(thread)) {
2203                         spin_unlock(&lfsck->li_lock);
2204                 } else {
2205                         lfsck->li_status = LS_FAILED;
2206                         lfsck->li_flags = 0;
2207                         thread_set_flags(thread, SVC_STOPPING);
2208                         spin_unlock(&lfsck->li_lock);
2209
2210                         lfsck->li_start_unplug = 1;
2211                         wake_up_all(&thread->t_ctl_waitq);
2212                         l_wait_event(thread->t_ctl_waitq,
2213                                      thread_is_stopped(thread),
2214                                      &lwi);
2215                 }
2216         } else {
2217                 lfsck->li_start_unplug = 1;
2218                 wake_up_all(&thread->t_ctl_waitq);
2219         }
2220
2221         GOTO(put, rc);
2222
2223 out:
2224         mutex_unlock(&lfsck->li_mutex);
2225
2226 put:
2227         lfsck_instance_put(env, lfsck);
2228
2229         return rc < 0 ? rc : 0;
2230 }
2231 EXPORT_SYMBOL(lfsck_start);
2232
2233 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2234                struct lfsck_stop *stop)
2235 {
2236         struct lfsck_instance   *lfsck;
2237         struct ptlrpc_thread    *thread;
2238         struct l_wait_info       lwi    = { 0 };
2239         int                      rc     = 0;
2240         int                      rc1    = 0;
2241         ENTRY;
2242
2243         lfsck = lfsck_instance_find(key, true, false);
2244         if (unlikely(lfsck == NULL))
2245                 RETURN(-ENXIO);
2246
2247         thread = &lfsck->li_thread;
2248         /* release lfsck::li_mutex to avoid deadlock. */
2249         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2250                 if (!lfsck->li_master) {
2251                         CERROR("%s: only allow to specify '-A' via MDS\n",
2252                                lfsck_lfsck2name(lfsck));
2253
2254                         GOTO(out, rc = -EPERM);
2255                 }
2256
2257                 rc1 = lfsck_stop_all(env, lfsck, stop);
2258         }
2259
2260         mutex_lock(&lfsck->li_mutex);
2261         spin_lock(&lfsck->li_lock);
2262         /* no error if LFSCK is already stopped, or was never started */
2263         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2264                 spin_unlock(&lfsck->li_lock);
2265                 GOTO(out, rc = 0);
2266         }
2267
2268         if (stop != NULL) {
2269                 lfsck->li_status = stop->ls_status;
2270                 lfsck->li_flags = stop->ls_flags;
2271         } else {
2272                 lfsck->li_status = LS_STOPPED;
2273                 lfsck->li_flags = 0;
2274         }
2275
2276         thread_set_flags(thread, SVC_STOPPING);
2277         spin_unlock(&lfsck->li_lock);
2278
2279         wake_up_all(&thread->t_ctl_waitq);
2280         l_wait_event(thread->t_ctl_waitq,
2281                      thread_is_stopped(thread),
2282                      &lwi);
2283
2284         GOTO(out, rc = 0);
2285
2286 out:
2287         mutex_unlock(&lfsck->li_mutex);
2288         lfsck_instance_put(env, lfsck);
2289
2290         return rc != 0 ? rc : rc1;
2291 }
2292 EXPORT_SYMBOL(lfsck_stop);
2293
2294 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2295                     struct lfsck_request *lr)
2296 {
2297         int rc = -EOPNOTSUPP;
2298         ENTRY;
2299
2300         switch (lr->lr_event) {
2301         case LE_START: {
2302                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2303                 struct lfsck_start_param  lsp;
2304
2305                 memset(start, 0, sizeof(*start));
2306                 start->ls_valid = lr->lr_valid;
2307                 start->ls_speed_limit = lr->lr_speed;
2308                 start->ls_version = lr->lr_version;
2309                 start->ls_active = lr->lr_active;
2310                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2311                 start->ls_async_windows = lr->lr_async_windows;
2312
2313                 lsp.lsp_start = start;
2314                 lsp.lsp_index = lr->lr_index;
2315                 lsp.lsp_index_valid = 1;
2316                 rc = lfsck_start(env, key, &lsp);
2317                 break;
2318         }
2319         case LE_STOP: {
2320                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2321
2322                 memset(stop, 0, sizeof(*stop));
2323                 stop->ls_status = lr->lr_status;
2324                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2325                 rc = lfsck_stop(env, key, stop);
2326                 break;
2327         }
2328         case LE_PHASE1_DONE:
2329         case LE_PHASE2_DONE:
2330         case LE_FID_ACCESSED:
2331         case LE_PEER_EXIT:
2332         case LE_CONDITIONAL_DESTROY:
2333         case LE_PAIRS_VERIFY: {
2334                 struct lfsck_instance  *lfsck;
2335                 struct lfsck_component *com;
2336
2337                 lfsck = lfsck_instance_find(key, true, false);
2338                 if (unlikely(lfsck == NULL))
2339                         RETURN(-ENXIO);
2340
2341                 com = lfsck_component_find(lfsck, lr->lr_active);
2342                 if (likely(com != NULL)) {
2343                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2344                         lfsck_component_put(env, com);
2345                 }
2346
2347                 lfsck_instance_put(env, lfsck);
2348                 break;
2349         }
2350         default:
2351                 break;
2352         }
2353
2354         RETURN(rc);
2355 }
2356 EXPORT_SYMBOL(lfsck_in_notify);
2357
2358 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2359                 struct lfsck_request *lr)
2360 {
2361         struct lfsck_instance  *lfsck;
2362         struct lfsck_component *com;
2363         int                     rc;
2364         ENTRY;
2365
2366         lfsck = lfsck_instance_find(key, true, false);
2367         if (unlikely(lfsck == NULL))
2368                 RETURN(-ENXIO);
2369
2370         com = lfsck_component_find(lfsck, lr->lr_active);
2371         if (likely(com != NULL)) {
2372                 rc = com->lc_ops->lfsck_query(env, com);
2373                 lfsck_component_put(env, com);
2374         } else {
2375                 rc = -ENOTSUPP;
2376         }
2377
2378         lfsck_instance_put(env, lfsck);
2379
2380         RETURN(rc);
2381 }
2382 EXPORT_SYMBOL(lfsck_query);
2383
2384 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2385                              struct ldlm_namespace *ns)
2386 {
2387         struct lfsck_instance  *lfsck;
2388         int                     rc      = -ENXIO;
2389
2390         lfsck = lfsck_instance_find(key, true, false);
2391         if (likely(lfsck != NULL)) {
2392                 lfsck->li_namespace = ns;
2393                 lfsck_instance_put(env, lfsck);
2394                 rc = 0;
2395         }
2396
2397         return rc;
2398 }
2399 EXPORT_SYMBOL(lfsck_register_namespace);
2400
2401 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2402                    struct dt_device *next, struct obd_device *obd,
2403                    lfsck_out_notify notify, void *notify_data, bool master)
2404 {
2405         struct lfsck_instance   *lfsck;
2406         struct dt_object        *root  = NULL;
2407         struct dt_object        *obj   = NULL;
2408         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2409         int                      rc;
2410         ENTRY;
2411
2412         lfsck = lfsck_instance_find(key, false, false);
2413         if (unlikely(lfsck != NULL))
2414                 RETURN(-EEXIST);
2415
2416         OBD_ALLOC_PTR(lfsck);
2417         if (lfsck == NULL)
2418                 RETURN(-ENOMEM);
2419
2420         mutex_init(&lfsck->li_mutex);
2421         spin_lock_init(&lfsck->li_lock);
2422         CFS_INIT_LIST_HEAD(&lfsck->li_link);
2423         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
2424         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
2425         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2426         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
2427         atomic_set(&lfsck->li_ref, 1);
2428         atomic_set(&lfsck->li_double_scan_count, 0);
2429         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2430         lfsck->li_out_notify = notify;
2431         lfsck->li_out_notify_data = notify_data;
2432         lfsck->li_next = next;
2433         lfsck->li_bottom = key;
2434         lfsck->li_obd = obd;
2435
2436         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2437         if (rc != 0)
2438                 GOTO(out, rc);
2439
2440         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2441         if (rc != 0)
2442                 GOTO(out, rc);
2443
2444         fid->f_seq = FID_SEQ_LOCAL_NAME;
2445         fid->f_oid = 1;
2446         fid->f_ver = 0;
2447         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
2448         if (rc != 0)
2449                 GOTO(out, rc);
2450
2451         rc = dt_root_get(env, key, fid);
2452         if (rc != 0)
2453                 GOTO(out, rc);
2454
2455         root = dt_locate(env, key, fid);
2456         if (IS_ERR(root))
2457                 GOTO(out, rc = PTR_ERR(root));
2458
2459         if (unlikely(!dt_try_as_dir(env, root)))
2460                 GOTO(out, rc = -ENOTDIR);
2461
2462         lfsck->li_local_root_fid = *fid;
2463         if (master) {
2464                 lfsck->li_master = 1;
2465                 if (lfsck_dev_idx(key) == 0) {
2466                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
2467                         const struct lu_name *cname;
2468
2469                         rc = dt_lookup(env, root,
2470                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2471                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2472                         if (rc != 0)
2473                                 GOTO(out, rc);
2474
2475                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
2476                         if (IS_ERR(obj))
2477                                 GOTO(out, rc = PTR_ERR(obj));
2478
2479                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2480                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
2481                         if (rc != 0)
2482                                 GOTO(out, rc);
2483
2484                         lu_object_put(env, &obj->do_lu);
2485                         obj = dt_locate(env, key, fid);
2486                         if (IS_ERR(obj))
2487                                 GOTO(out, rc = PTR_ERR(obj));
2488
2489                         cname = lfsck_name_get_const(env, dotlustre,
2490                                                      strlen(dotlustre));
2491                         rc = lfsck_verify_linkea(env, key, obj, cname,
2492                                                  &lfsck->li_global_root_fid);
2493                         if (rc != 0)
2494                                 GOTO(out, rc);
2495
2496                         *pfid = *fid;
2497                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2498                                        (const struct dt_key *)lostfound,
2499                                        BYPASS_CAPA);
2500                         if (rc != 0)
2501                                 GOTO(out, rc);
2502
2503                         lu_object_put(env, &obj->do_lu);
2504                         obj = dt_locate(env, key, fid);
2505                         if (IS_ERR(obj))
2506                                 GOTO(out, rc = PTR_ERR(obj));
2507
2508                         cname = lfsck_name_get_const(env, lostfound,
2509                                                      strlen(lostfound));
2510                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
2511                         if (rc != 0)
2512                                 GOTO(out, rc);
2513
2514                         lu_object_put(env, &obj->do_lu);
2515                         obj = NULL;
2516                 }
2517         }
2518
2519         fid->f_seq = FID_SEQ_LOCAL_FILE;
2520         fid->f_oid = OTABLE_IT_OID;
2521         fid->f_ver = 0;
2522         obj = dt_locate(env, key, fid);
2523         if (IS_ERR(obj))
2524                 GOTO(out, rc = PTR_ERR(obj));
2525
2526         lu_object_get(&obj->do_lu);
2527         lfsck->li_obj_oit = obj;
2528         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2529         if (rc != 0)
2530                 GOTO(out, rc);
2531
2532         rc = lfsck_bookmark_setup(env, lfsck);
2533         if (rc != 0)
2534                 GOTO(out, rc);
2535
2536         if (master) {
2537                 rc = lfsck_fid_init(lfsck);
2538                 if (rc < 0)
2539                         GOTO(out, rc);
2540
2541                 rc = lfsck_namespace_setup(env, lfsck);
2542                 if (rc < 0)
2543                         GOTO(out, rc);
2544         }
2545
2546         rc = lfsck_layout_setup(env, lfsck);
2547         if (rc < 0)
2548                 GOTO(out, rc);
2549
2550         /* XXX: more LFSCK components initialization to be added here. */
2551
2552         rc = lfsck_instance_add(lfsck);
2553         if (rc == 0)
2554                 rc = lfsck_add_target_from_orphan(env, lfsck);
2555 out:
2556         if (obj != NULL && !IS_ERR(obj))
2557                 lu_object_put(env, &obj->do_lu);
2558         if (root != NULL && !IS_ERR(root))
2559                 lu_object_put(env, &root->do_lu);
2560         if (rc != 0)
2561                 lfsck_instance_cleanup(env, lfsck);
2562         return rc;
2563 }
2564 EXPORT_SYMBOL(lfsck_register);
2565
2566 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2567 {
2568         struct lfsck_instance *lfsck;
2569
2570         lfsck = lfsck_instance_find(key, false, true);
2571         if (lfsck != NULL)
2572                 lfsck_instance_put(env, lfsck);
2573 }
2574 EXPORT_SYMBOL(lfsck_degister);
2575
2576 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2577                      struct dt_device *tgt, struct obd_export *exp,
2578                      __u32 index, bool for_ost)
2579 {
2580         struct lfsck_instance   *lfsck;
2581         struct lfsck_tgt_desc   *ltd;
2582         int                      rc;
2583         ENTRY;
2584
2585         OBD_ALLOC_PTR(ltd);
2586         if (ltd == NULL)
2587                 RETURN(-ENOMEM);
2588
2589         ltd->ltd_tgt = tgt;
2590         ltd->ltd_key = key;
2591         ltd->ltd_exp = exp;
2592         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2593         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2594         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2595         atomic_set(&ltd->ltd_ref, 1);
2596         ltd->ltd_index = index;
2597
2598         spin_lock(&lfsck_instance_lock);
2599         lfsck = __lfsck_instance_find(key, true, false);
2600         if (lfsck == NULL) {
2601                 if (for_ost)
2602                         list_add_tail(&ltd->ltd_orphan_list,
2603                                       &lfsck_ost_orphan_list);
2604                 else
2605                         list_add_tail(&ltd->ltd_orphan_list,
2606                                       &lfsck_mdt_orphan_list);
2607                 spin_unlock(&lfsck_instance_lock);
2608
2609                 RETURN(0);
2610         }
2611         spin_unlock(&lfsck_instance_lock);
2612
2613         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2614         if (rc != 0)
2615                 lfsck_tgt_put(ltd);
2616
2617         lfsck_instance_put(env, lfsck);
2618
2619         RETURN(rc);
2620 }
2621 EXPORT_SYMBOL(lfsck_add_target);
2622
2623 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2624                       struct dt_device *tgt, __u32 index, bool for_ost)
2625 {
2626         struct lfsck_instance   *lfsck;
2627         struct lfsck_tgt_descs  *ltds;
2628         struct lfsck_tgt_desc   *ltd    = NULL;
2629         struct list_head        *head;
2630
2631         if (for_ost)
2632                 head = &lfsck_ost_orphan_list;
2633         else
2634                 head = &lfsck_mdt_orphan_list;
2635
2636         spin_lock(&lfsck_instance_lock);
2637         list_for_each_entry(ltd, head, ltd_orphan_list) {
2638                 if (ltd->ltd_tgt == tgt) {
2639                         list_del_init(&ltd->ltd_orphan_list);
2640                         spin_unlock(&lfsck_instance_lock);
2641                         lfsck_tgt_put(ltd);
2642
2643                         return;
2644                 }
2645         }
2646
2647         lfsck = __lfsck_instance_find(key, true, false);
2648         spin_unlock(&lfsck_instance_lock);
2649         if (unlikely(lfsck == NULL))
2650                 return;
2651
2652         if (for_ost)
2653                 ltds = &lfsck->li_ost_descs;
2654         else
2655                 ltds = &lfsck->li_mdt_descs;
2656
2657         down_write(&ltds->ltd_rw_sem);
2658         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2659
2660         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2661                 goto unlock;
2662
2663         ltd = LTD_TGT(ltds, index);
2664         if (unlikely(ltd == NULL))
2665                 goto unlock;
2666
2667         LASSERT(ltds->ltd_tgtnr > 0);
2668
2669         ltds->ltd_tgtnr--;
2670         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2671         LTD_TGT(ltds, index) = NULL;
2672
2673 unlock:
2674         if (ltd == NULL) {
2675                 if (for_ost)
2676                         head = &lfsck->li_ost_descs.ltd_orphan;
2677                 else
2678                         head = &lfsck->li_ost_descs.ltd_orphan;
2679
2680                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2681                         if (ltd->ltd_tgt == tgt) {
2682                                 list_del_init(&ltd->ltd_orphan_list);
2683                                 break;
2684                         }
2685                 }
2686         }
2687
2688         up_write(&ltds->ltd_rw_sem);
2689         if (ltd != NULL) {
2690                 spin_lock(&ltds->ltd_lock);
2691                 ltd->ltd_dead = 1;
2692                 spin_unlock(&ltds->ltd_lock);
2693                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
2694                 lfsck_tgt_put(ltd);
2695         }
2696
2697         lfsck_instance_put(env, lfsck);
2698 }
2699 EXPORT_SYMBOL(lfsck_del_target);
2700
2701 static int __init lfsck_init(void)
2702 {
2703         int rc;
2704
2705         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2706         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2707         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2708         rc = lu_context_key_register(&lfsck_thread_key);
2709         if (rc == 0) {
2710                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2711                 tgt_register_lfsck_query(lfsck_query);
2712         }
2713
2714         return rc;
2715 }
2716
2717 static void __exit lfsck_exit(void)
2718 {
2719         struct lfsck_tgt_desc *ltd;
2720         struct lfsck_tgt_desc *next;
2721
2722         LASSERT(cfs_list_empty(&lfsck_instance_list));
2723
2724         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2725                                  ltd_orphan_list) {
2726                 list_del_init(&ltd->ltd_orphan_list);
2727                 lfsck_tgt_put(ltd);
2728         }
2729
2730         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2731                                  ltd_orphan_list) {
2732                 list_del_init(&ltd->ltd_orphan_list);
2733                 lfsck_tgt_put(ltd);
2734         }
2735
2736         lu_context_key_degister(&lfsck_thread_key);
2737 }
2738
2739 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2740 MODULE_DESCRIPTION("LFSCK");
2741 MODULE_LICENSE("GPL");
2742
2743 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);