Whamcloud - gitweb
566a99e466f01a0e0e9e6df9cc8239c78be30699
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static struct list_head lfsck_instance_list;
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         "broadcast",
96         "orphan",
97         "create_ostobj",
98         NULL
99 };
100
101 const char *lfsck_status2names(enum lfsck_status status)
102 {
103         if (unlikely(status < 0 || status >= LS_MAX))
104                 return "unknown";
105
106         return lfsck_status_names[status];
107 }
108
109 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
110 {
111         spin_lock_init(&ltds->ltd_lock);
112         init_rwsem(&ltds->ltd_rw_sem);
113         INIT_LIST_HEAD(&ltds->ltd_orphan);
114         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
115         if (ltds->ltd_tgts_bitmap == NULL)
116                 return -ENOMEM;
117
118         return 0;
119 }
120
121 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
122 {
123         struct lfsck_tgt_desc   *ltd;
124         struct lfsck_tgt_desc   *next;
125         int                      idx;
126
127         down_write(&ltds->ltd_rw_sem);
128
129         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
130                                  ltd_orphan_list) {
131                 list_del_init(&ltd->ltd_orphan_list);
132                 lfsck_tgt_put(ltd);
133         }
134
135         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
136                 up_write(&ltds->ltd_rw_sem);
137
138                 return;
139         }
140
141         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
142                 ltd = LTD_TGT(ltds, idx);
143                 if (likely(ltd != NULL)) {
144                         LASSERT(list_empty(&ltd->ltd_layout_list));
145                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
146
147                         ltds->ltd_tgtnr--;
148                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
149                         LTD_TGT(ltds, idx) = NULL;
150                         lfsck_tgt_put(ltd);
151                 }
152         }
153
154         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
155                  ltds->ltd_tgtnr);
156
157         for (idx = 0; idx < TGT_PTRS; idx++) {
158                 if (ltds->ltd_tgts_idx[idx] != NULL) {
159                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
160                         ltds->ltd_tgts_idx[idx] = NULL;
161                 }
162         }
163
164         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
165         ltds->ltd_tgts_bitmap = NULL;
166         up_write(&ltds->ltd_rw_sem);
167 }
168
169 static int __lfsck_add_target(const struct lu_env *env,
170                               struct lfsck_instance *lfsck,
171                               struct lfsck_tgt_desc *ltd,
172                               bool for_ost, bool locked)
173 {
174         struct lfsck_tgt_descs *ltds;
175         __u32                   index = ltd->ltd_index;
176         int                     rc    = 0;
177         ENTRY;
178
179         if (for_ost)
180                 ltds = &lfsck->li_ost_descs;
181         else
182                 ltds = &lfsck->li_mdt_descs;
183
184         if (!locked)
185                 down_write(&ltds->ltd_rw_sem);
186
187         LASSERT(ltds->ltd_tgts_bitmap != NULL);
188
189         if (index >= ltds->ltd_tgts_bitmap->size) {
190                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
191                                     (__u32)BITS_PER_LONG);
192                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
193                 cfs_bitmap_t *new_bitmap;
194
195                 while (newsize < index + 1)
196                         newsize <<= 1;
197
198                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
199                 if (new_bitmap == NULL)
200                         GOTO(unlock, rc = -ENOMEM);
201
202                 if (ltds->ltd_tgtnr > 0)
203                         cfs_bitmap_copy(new_bitmap, old_bitmap);
204                 ltds->ltd_tgts_bitmap = new_bitmap;
205                 CFS_FREE_BITMAP(old_bitmap);
206         }
207
208         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
209                 CERROR("%s: the device %s (%u) is registered already\n",
210                        lfsck_lfsck2name(lfsck),
211                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
212                 GOTO(unlock, rc = -EEXIST);
213         }
214
215         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
216                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
217                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
218                         GOTO(unlock, rc = -ENOMEM);
219         }
220
221         LTD_TGT(ltds, index) = ltd;
222         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
223         ltds->ltd_tgtnr++;
224
225         GOTO(unlock, rc = 0);
226
227 unlock:
228         if (!locked)
229                 up_write(&ltds->ltd_rw_sem);
230
231         return rc;
232 }
233
234 static int lfsck_add_target_from_orphan(const struct lu_env *env,
235                                         struct lfsck_instance *lfsck)
236 {
237         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
238         struct lfsck_tgt_desc   *ltd;
239         struct lfsck_tgt_desc   *next;
240         struct list_head        *head    = &lfsck_ost_orphan_list;
241         int                      rc;
242         bool                     for_ost = true;
243
244 again:
245         spin_lock(&lfsck_instance_lock);
246         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
247                 if (ltd->ltd_key == lfsck->li_bottom)
248                         list_move_tail(&ltd->ltd_orphan_list,
249                                        &ltds->ltd_orphan);
250         }
251         spin_unlock(&lfsck_instance_lock);
252
253         down_write(&ltds->ltd_rw_sem);
254         while (!list_empty(&ltds->ltd_orphan)) {
255                 ltd = list_entry(ltds->ltd_orphan.next,
256                                  struct lfsck_tgt_desc,
257                                  ltd_orphan_list);
258                 list_del_init(&ltd->ltd_orphan_list);
259                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
260                 /* Do not hold the semaphore for too long time. */
261                 up_write(&ltds->ltd_rw_sem);
262                 if (rc != 0)
263                         return rc;
264
265                 down_write(&ltds->ltd_rw_sem);
266         }
267         up_write(&ltds->ltd_rw_sem);
268
269         if (for_ost) {
270                 ltds = &lfsck->li_mdt_descs;
271                 head = &lfsck_mdt_orphan_list;
272                 for_ost = false;
273                 goto again;
274         }
275
276         return 0;
277 }
278
279 static inline struct lfsck_component *
280 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
281                        struct list_head *list)
282 {
283         struct lfsck_component *com;
284
285         list_for_each_entry(com, list, lc_link) {
286                 if (com->lc_type == type)
287                         return com;
288         }
289         return NULL;
290 }
291
292 struct lfsck_component *
293 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
294 {
295         struct lfsck_component *com;
296
297         spin_lock(&lfsck->li_lock);
298         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
299         if (com != NULL)
300                 goto unlock;
301
302         com = __lfsck_component_find(lfsck, type,
303                                      &lfsck->li_list_double_scan);
304         if (com != NULL)
305                 goto unlock;
306
307         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
308
309 unlock:
310         if (com != NULL)
311                 lfsck_component_get(com);
312         spin_unlock(&lfsck->li_lock);
313         return com;
314 }
315
316 void lfsck_component_cleanup(const struct lu_env *env,
317                              struct lfsck_component *com)
318 {
319         if (!list_empty(&com->lc_link))
320                 list_del_init(&com->lc_link);
321         if (!list_empty(&com->lc_link_dir))
322                 list_del_init(&com->lc_link_dir);
323
324         lfsck_component_put(env, com);
325 }
326
327 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
328                     struct lu_fid *fid, bool locked)
329 {
330         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
331         int                      rc = 0;
332         ENTRY;
333
334         if (!locked)
335                 mutex_lock(&lfsck->li_mutex);
336
337         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
338         if (rc >= 0) {
339                 bk->lb_last_fid = *fid;
340                 /* We do not care about whether the subsequent sub-operations
341                  * failed or not. The worst case is that one FID is lost that
342                  * is not a big issue for the LFSCK since it is relative rare
343                  * for LFSCK create. */
344                 rc = lfsck_bookmark_store(env, lfsck);
345         }
346
347         if (!locked)
348                 mutex_unlock(&lfsck->li_mutex);
349
350         RETURN(rc);
351 }
352
353 static const char dot[] = ".";
354 static const char dotdot[] = "..";
355 static const char dotlustre[] = ".lustre";
356 static const char lostfound[] = "lost+found";
357
358 static int lfsck_create_lpf_local(const struct lu_env *env,
359                                   struct lfsck_instance *lfsck,
360                                   struct dt_object *parent,
361                                   struct dt_object *child,
362                                   struct lu_attr *la,
363                                   struct dt_object_format *dof,
364                                   const char *name)
365 {
366         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
367         struct dt_device        *dev    = lfsck->li_bottom;
368         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
369         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
370         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
371         struct thandle          *th     = NULL;
372         struct linkea_data       ldata  = { 0 };
373         struct lu_buf            linkea_buf;
374         const struct lu_name    *cname;
375         loff_t                   pos    = 0;
376         int                      len    = sizeof(struct lfsck_bookmark);
377         int                      rc;
378         ENTRY;
379
380         rc = linkea_data_new(&ldata,
381                              &lfsck_env_info(env)->lti_linkea_buf);
382         if (rc != 0)
383                 RETURN(rc);
384
385         cname = lfsck_name_get_const(env, name, strlen(name));
386         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
387         if (rc != 0)
388                 RETURN(rc);
389
390         th = dt_trans_create(env, dev);
391         if (IS_ERR(th))
392                 RETURN(PTR_ERR(th));
393
394         /* 1a. create child */
395         rc = dt_declare_create(env, child, la, NULL, dof, th);
396         if (rc != 0)
397                 GOTO(stop, rc);
398
399         /* 2a. increase child nlink */
400         rc = dt_declare_ref_add(env, child, th);
401         if (rc != 0)
402                 GOTO(stop, rc);
403
404         /* 3a. insert linkEA for child */
405         linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
406         linkea_buf.lb_len = ldata.ld_leh->leh_len;
407         rc = dt_declare_xattr_set(env, child, &linkea_buf,
408                                   XATTR_NAME_LINK, 0, th);
409         if (rc != 0)
410                 GOTO(stop, rc);
411
412         /* 4a. insert name into parent dir */
413         rec->rec_type = S_IFDIR;
414         rec->rec_fid = cfid;
415         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
416                                (const struct dt_key *)name, th);
417         if (rc != 0)
418                 GOTO(stop, rc);
419
420         /* 5a. increase parent nlink */
421         rc = dt_declare_ref_add(env, parent, th);
422         if (rc != 0)
423                 GOTO(stop, rc);
424
425         /* 6a. update bookmark */
426         rc = dt_declare_record_write(env, bk_obj,
427                                      lfsck_buf_get(env, bk, len), 0, th);
428         if (rc != 0)
429                 GOTO(stop, rc);
430
431         rc = dt_trans_start_local(env, dev, th);
432         if (rc != 0)
433                 GOTO(stop, rc);
434
435         dt_write_lock(env, child, 0);
436         /* 1b.1. create child */
437         rc = dt_create(env, child, la, NULL, dof, th);
438         if (rc != 0)
439                 GOTO(unlock, rc);
440
441         if (unlikely(!dt_try_as_dir(env, child)))
442                 GOTO(unlock, rc = -ENOTDIR);
443
444         /* 1b.2. insert dot into child dir */
445         rec->rec_fid = cfid;
446         rc = dt_insert(env, child, (const struct dt_rec *)rec,
447                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
448         if (rc != 0)
449                 GOTO(unlock, rc);
450
451         /* 1b.3. insert dotdot into child dir */
452         rec->rec_fid = &LU_LPF_FID;
453         rc = dt_insert(env, child, (const struct dt_rec *)rec,
454                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
455         if (rc != 0)
456                 GOTO(unlock, rc);
457
458         /* 2b. increase child nlink */
459         rc = dt_ref_add(env, child, th);
460         if (rc != 0)
461                 GOTO(unlock, rc);
462
463         /* 3b. insert linkEA for child. */
464         rc = dt_xattr_set(env, child, &linkea_buf,
465                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
466         dt_write_unlock(env, child);
467         if (rc != 0)
468                 GOTO(stop, rc);
469
470         /* 4b. insert name into parent dir */
471         rec->rec_fid = cfid;
472         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
473                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
474         if (rc != 0)
475                 GOTO(stop, rc);
476
477         dt_write_lock(env, parent, 0);
478         /* 5b. increase parent nlink */
479         rc = dt_ref_add(env, parent, th);
480         dt_write_unlock(env, parent);
481         if (rc != 0)
482                 GOTO(stop, rc);
483
484         bk->lb_lpf_fid = *cfid;
485         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
486
487         /* 6b. update bookmark */
488         rc = dt_record_write(env, bk_obj,
489                              lfsck_buf_get(env, bk, len), &pos, th);
490
491         GOTO(stop, rc);
492
493 unlock:
494         dt_write_unlock(env, child);
495
496 stop:
497         dt_trans_stop(env, dev, th);
498
499         return rc;
500 }
501
502 static int lfsck_create_lpf_remote(const struct lu_env *env,
503                                    struct lfsck_instance *lfsck,
504                                    struct dt_object *parent,
505                                    struct dt_object *child,
506                                    struct lu_attr *la,
507                                    struct dt_object_format *dof,
508                                    const char *name)
509 {
510         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
511         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
512         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
513         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
514         struct thandle          *th     = NULL;
515         struct linkea_data       ldata  = { 0 };
516         struct lu_buf            linkea_buf;
517         const struct lu_name    *cname;
518         struct dt_device        *dev;
519         loff_t                   pos    = 0;
520         int                      len    = sizeof(struct lfsck_bookmark);
521         int                      rc;
522         ENTRY;
523
524         rc = linkea_data_new(&ldata,
525                              &lfsck_env_info(env)->lti_linkea_buf);
526         if (rc != 0)
527                 RETURN(rc);
528
529         cname = lfsck_name_get_const(env, name, strlen(name));
530         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
531         if (rc != 0)
532                 RETURN(rc);
533
534         /* Create .lustre/lost+found/MDTxxxx. */
535
536         /* XXX: Currently, cross-MDT create operation needs to create the child
537          *      object firstly, then insert name into the parent directory. For
538          *      this case, the child object resides on current MDT (local), but
539          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
540          *      easy to contain all the sub-modifications orderly within single
541          *      transaction.
542          *
543          *      To avoid more inconsistency, we split the create operation into
544          *      two transactions:
545          *
546          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
547          *         locally.
548          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
549          *         remotely.
550          *
551          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
552          *      repair such inconsistency when LFSCK run next time. */
553
554         /* Transaction I: locally */
555
556         dev = lfsck->li_bottom;
557         th = dt_trans_create(env, dev);
558         if (IS_ERR(th))
559                 RETURN(PTR_ERR(th));
560
561         /* 1a. create child */
562         rc = dt_declare_create(env, child, la, NULL, dof, th);
563         if (rc != 0)
564                 GOTO(stop, rc);
565
566         /* 2a. increase child nlink */
567         rc = dt_declare_ref_add(env, child, th);
568         if (rc != 0)
569                 GOTO(stop, rc);
570
571         /* 3a. insert linkEA for child */
572         linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
573         linkea_buf.lb_len = ldata.ld_leh->leh_len;
574         rc = dt_declare_xattr_set(env, child, &linkea_buf,
575                                   XATTR_NAME_LINK, 0, th);
576         if (rc != 0)
577                 GOTO(stop, rc);
578
579         /* 4a. update bookmark */
580         rc = dt_declare_record_write(env, bk_obj,
581                                      lfsck_buf_get(env, bk, len), 0, th);
582         if (rc != 0)
583                 GOTO(stop, rc);
584
585         rc = dt_trans_start_local(env, dev, th);
586         if (rc != 0)
587                 GOTO(stop, rc);
588
589         dt_write_lock(env, child, 0);
590         /* 1b.1. create child */
591         rc = dt_create(env, child, la, NULL, dof, th);
592         if (rc != 0)
593                 GOTO(unlock, rc);
594
595         if (unlikely(!dt_try_as_dir(env, child)))
596                 GOTO(unlock, rc = -ENOTDIR);
597
598         /* 1b.2. insert dot into child dir */
599         rec->rec_type = S_IFDIR;
600         rec->rec_fid = cfid;
601         rc = dt_insert(env, child, (const struct dt_rec *)rec,
602                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
603         if (rc != 0)
604                 GOTO(unlock, rc);
605
606         /* 1b.3. insert dotdot into child dir */
607         rec->rec_fid = &LU_LPF_FID;
608         rc = dt_insert(env, child, (const struct dt_rec *)rec,
609                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
610         if (rc != 0)
611                 GOTO(unlock, rc);
612
613         /* 2b. increase child nlink */
614         rc = dt_ref_add(env, child, th);
615         if (rc != 0)
616                 GOTO(unlock, rc);
617
618         /* 3b. insert linkEA for child */
619         rc = dt_xattr_set(env, child, &linkea_buf,
620                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
621         if (rc != 0)
622                 GOTO(unlock, rc);
623
624         bk->lb_lpf_fid = *cfid;
625         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
626
627         /* 4b. update bookmark */
628         rc = dt_record_write(env, bk_obj,
629                              lfsck_buf_get(env, bk, len), &pos, th);
630
631         dt_write_unlock(env, child);
632         dt_trans_stop(env, dev, th);
633         if (rc != 0)
634                 RETURN(rc);
635
636         /* Transaction II: remotely */
637
638         dev = lfsck->li_next;
639         th = dt_trans_create(env, dev);
640         if (IS_ERR(th))
641                 RETURN(PTR_ERR(th));
642
643         /* 5a. insert name into parent dir */
644         rec->rec_fid = cfid;
645         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
646                                (const struct dt_key *)name, th);
647         if (rc != 0)
648                 GOTO(stop, rc);
649
650         /* 6a. increase parent nlink */
651         rc = dt_declare_ref_add(env, parent, th);
652         if (rc != 0)
653                 GOTO(stop, rc);
654
655         rc = dt_trans_start(env, dev, th);
656         if (rc != 0)
657                 GOTO(stop, rc);
658
659         /* 5b. insert name into parent dir */
660         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
661                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
662         if (rc != 0)
663                 GOTO(stop, rc);
664
665         dt_write_lock(env, parent, 0);
666         /* 6b. increase parent nlink */
667         rc = dt_ref_add(env, parent, th);
668         dt_write_unlock(env, parent);
669
670         GOTO(stop, rc);
671
672 unlock:
673         dt_write_unlock(env, child);
674 stop:
675         dt_trans_stop(env, dev, th);
676
677         if (rc != 0 && dev == lfsck->li_next)
678                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
679                        "for orphans, but failed to insert the name %s "
680                        "to the .lustre/lost+found/. Such inconsistency "
681                        "will be repaired when LFSCK run next time: rc = %d\n",
682                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
683
684         return rc;
685 }
686
687 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
688  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
689  * only when it is required, such as orphan OST-objects repairing. */
690 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
691 {
692         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
693         struct lfsck_thread_info *info  = lfsck_env_info(env);
694         struct lu_fid            *cfid  = &info->lti_fid2;
695         struct lu_attr           *la    = &info->lti_la;
696         struct dt_object_format  *dof   = &info->lti_dof;
697         struct dt_object         *parent = NULL;
698         struct dt_object         *child = NULL;
699         char                      name[8];
700         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
701         int                       rc    = 0;
702         ENTRY;
703
704         LASSERT(lfsck->li_master);
705
706         sprintf(name, "MDT%04x", node);
707         if (node == 0) {
708                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
709                                                   &LU_LPF_FID);
710         } else {
711                 struct lfsck_tgt_desc *ltd;
712
713                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
714                 if (unlikely(ltd == NULL))
715                         RETURN(-ENXIO);
716
717                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
718                                                   &LU_LPF_FID);
719                 lfsck_tgt_put(ltd);
720         }
721         if (IS_ERR(parent))
722                 RETURN(PTR_ERR(parent));
723
724         if (unlikely(!dt_try_as_dir(env, parent)))
725                 GOTO(out, rc = -ENOTDIR);
726
727         mutex_lock(&lfsck->li_mutex);
728         if (lfsck->li_lpf_obj != NULL)
729                 GOTO(unlock, rc = 0);
730
731         if (fid_is_zero(&bk->lb_lpf_fid)) {
732                 /* There is corner case that: in former LFSCK scanning we have
733                  * created the .lustre/lost+found/MDTxxxx but failed to update
734                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
735                  * it from MDT0 firstly. */
736                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
737                                (const struct dt_key *)name, BYPASS_CAPA);
738                 if (rc != 0 && rc != -ENOENT)
739                         GOTO(unlock, rc);
740
741                 if (rc == 0) {
742                         bk->lb_lpf_fid = *cfid;
743                         rc = lfsck_bookmark_store(env, lfsck);
744                 } else {
745                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
746                 }
747                 if (rc != 0)
748                         GOTO(unlock, rc);
749         } else {
750                 *cfid = bk->lb_lpf_fid;
751         }
752
753         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
754         if (IS_ERR(child))
755                 GOTO(unlock, rc = PTR_ERR(child));
756
757         if (dt_object_exists(child) != 0) {
758                 if (unlikely(!dt_try_as_dir(env, child)))
759                         rc = -ENOTDIR;
760                 else
761                         lfsck->li_lpf_obj = child;
762
763                 GOTO(unlock, rc);
764         }
765
766         memset(la, 0, sizeof(*la));
767         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
768         la->la_mode = S_IFDIR | S_IRWXU;
769         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
770                        LA_UID | LA_GID;
771         memset(dof, 0, sizeof(*dof));
772         dof->dof_type = dt_mode_to_dft(S_IFDIR);
773
774         if (node == 0)
775                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
776                                             dof, name);
777         else
778                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
779                                              dof, name);
780         if (rc == 0)
781                 lfsck->li_lpf_obj = child;
782
783         GOTO(unlock, rc);
784
785 unlock:
786         mutex_unlock(&lfsck->li_mutex);
787         if (rc != 0 && child != NULL && !IS_ERR(child))
788                 lu_object_put(env, &child->do_lu);
789 out:
790         if (parent != NULL && !IS_ERR(parent))
791                 lu_object_put(env, &parent->do_lu);
792
793         return rc;
794 }
795
796 static int lfsck_fid_init(struct lfsck_instance *lfsck)
797 {
798         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
799         struct seq_server_site  *ss;
800         char                    *prefix;
801         int                      rc     = 0;
802         ENTRY;
803
804         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
805         if (unlikely(ss == NULL))
806                 RETURN(-ENXIO);
807
808         OBD_ALLOC_PTR(lfsck->li_seq);
809         if (lfsck->li_seq == NULL)
810                 RETURN(-ENOMEM);
811
812         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
813         if (prefix == NULL)
814                 GOTO(out, rc = -ENOMEM);
815
816         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
817         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
818                              ss->ss_server_seq);
819         OBD_FREE(prefix, MAX_OBD_NAME + 7);
820         if (rc != 0)
821                 GOTO(out, rc);
822
823         if (fid_is_sane(&bk->lb_last_fid))
824                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
825
826         RETURN(0);
827
828 out:
829         OBD_FREE_PTR(lfsck->li_seq);
830         lfsck->li_seq = NULL;
831
832         return rc;
833 }
834
835 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
836 {
837         if (lfsck->li_seq != NULL) {
838                 seq_client_fini(lfsck->li_seq);
839                 OBD_FREE_PTR(lfsck->li_seq);
840                 lfsck->li_seq = NULL;
841         }
842 }
843
844 void lfsck_instance_cleanup(const struct lu_env *env,
845                             struct lfsck_instance *lfsck)
846 {
847         struct ptlrpc_thread    *thread = &lfsck->li_thread;
848         struct lfsck_component  *com;
849         struct lfsck_component  *next;
850         ENTRY;
851
852         LASSERT(list_empty(&lfsck->li_link));
853         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
854
855         if (lfsck->li_obj_oit != NULL) {
856                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
857                 lfsck->li_obj_oit = NULL;
858         }
859
860         LASSERT(lfsck->li_obj_dir == NULL);
861
862         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
863                 lfsck_component_cleanup(env, com);
864         }
865
866         LASSERT(list_empty(&lfsck->li_list_dir));
867
868         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
869                                  lc_link) {
870                 lfsck_component_cleanup(env, com);
871         }
872
873         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
874                 lfsck_component_cleanup(env, com);
875         }
876
877         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
878         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
879
880         if (lfsck->li_bookmark_obj != NULL) {
881                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
882                 lfsck->li_bookmark_obj = NULL;
883         }
884
885         if (lfsck->li_lpf_obj != NULL) {
886                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
887                 lfsck->li_lpf_obj = NULL;
888         }
889
890         if (lfsck->li_los != NULL) {
891                 local_oid_storage_fini(env, lfsck->li_los);
892                 lfsck->li_los = NULL;
893         }
894
895         lfsck_fid_fini(lfsck);
896
897         OBD_FREE_PTR(lfsck);
898 }
899
900 static inline struct lfsck_instance *
901 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
902 {
903         struct lfsck_instance *lfsck;
904
905         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
906                 if (lfsck->li_bottom == key) {
907                         if (ref)
908                                 lfsck_instance_get(lfsck);
909                         if (unlink)
910                                 list_del_init(&lfsck->li_link);
911
912                         return lfsck;
913                 }
914         }
915
916         return NULL;
917 }
918
919 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
920                                            bool unlink)
921 {
922         struct lfsck_instance *lfsck;
923
924         spin_lock(&lfsck_instance_lock);
925         lfsck = __lfsck_instance_find(key, ref, unlink);
926         spin_unlock(&lfsck_instance_lock);
927
928         return lfsck;
929 }
930
931 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
932 {
933         struct lfsck_instance *tmp;
934
935         spin_lock(&lfsck_instance_lock);
936         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
937                 if (lfsck->li_bottom == tmp->li_bottom) {
938                         spin_unlock(&lfsck_instance_lock);
939                         return -EEXIST;
940                 }
941         }
942
943         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
944         spin_unlock(&lfsck_instance_lock);
945         return 0;
946 }
947
948 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
949                     const char *prefix)
950 {
951         int flag;
952         int i;
953         bool newline = (bits != 0 ? false : true);
954
955         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
956
957         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
958                 if (flag & bits) {
959                         bits &= ~flag;
960                         if (names[i] != NULL) {
961                                 if (bits == 0)
962                                         newline = true;
963
964                                 seq_printf(m, "%s%c", names[i],
965                                            newline ? '\n' : ',');
966                         }
967                 }
968         }
969
970         if (!newline)
971                 seq_printf(m, "\n");
972         return 0;
973 }
974
975 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
976 {
977         if (time != 0)
978                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
979                           cfs_time_current_sec() - time);
980         else
981                 seq_printf(m, "%s: N/A\n", prefix);
982         return 0;
983 }
984
985 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
986                    const char *prefix)
987 {
988         if (fid_is_zero(&pos->lp_dir_parent)) {
989                 if (pos->lp_oit_cookie == 0)
990                         seq_printf(m, "%s: N/A, N/A, N/A\n",
991                                    prefix);
992                 else
993                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
994                                    prefix, pos->lp_oit_cookie);
995         } else {
996                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
997                            prefix, pos->lp_oit_cookie,
998                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
999         }
1000         return 0;
1001 }
1002
1003 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1004                     struct lfsck_position *pos, bool init)
1005 {
1006         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1007
1008         if (unlikely(lfsck->li_di_oit == NULL)) {
1009                 memset(pos, 0, sizeof(*pos));
1010                 return;
1011         }
1012
1013         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1014         if (!lfsck->li_current_oit_processed && !init)
1015                 pos->lp_oit_cookie--;
1016
1017         LASSERT(pos->lp_oit_cookie > 0);
1018
1019         if (lfsck->li_di_dir != NULL) {
1020                 struct dt_object *dto = lfsck->li_obj_dir;
1021
1022                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1023                                                         lfsck->li_di_dir);
1024
1025                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1026                         fid_zero(&pos->lp_dir_parent);
1027                         pos->lp_dir_cookie = 0;
1028                 } else {
1029                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1030                 }
1031         } else {
1032                 fid_zero(&pos->lp_dir_parent);
1033                 pos->lp_dir_cookie = 0;
1034         }
1035 }
1036
1037 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1038 {
1039         bool dirty = false;
1040
1041         if (limit != LFSCK_SPEED_NO_LIMIT) {
1042                 if (limit > HZ) {
1043                         lfsck->li_sleep_rate = limit / HZ;
1044                         lfsck->li_sleep_jif = 1;
1045                 } else {
1046                         lfsck->li_sleep_rate = 1;
1047                         lfsck->li_sleep_jif = HZ / limit;
1048                 }
1049         } else {
1050                 lfsck->li_sleep_jif = 0;
1051                 lfsck->li_sleep_rate = 0;
1052         }
1053
1054         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1055                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1056                 dirty = true;
1057         }
1058
1059         return dirty;
1060 }
1061
1062 void lfsck_control_speed(struct lfsck_instance *lfsck)
1063 {
1064         struct ptlrpc_thread *thread = &lfsck->li_thread;
1065         struct l_wait_info    lwi;
1066
1067         if (lfsck->li_sleep_jif > 0 &&
1068             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1069                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1070                                        LWI_ON_SIGNAL_NOOP, NULL);
1071
1072                 l_wait_event(thread->t_ctl_waitq,
1073                              !thread_is_running(thread),
1074                              &lwi);
1075                 lfsck->li_new_scanned = 0;
1076         }
1077 }
1078
1079 void lfsck_control_speed_by_self(struct lfsck_component *com)
1080 {
1081         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1082         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1083         struct l_wait_info       lwi;
1084
1085         if (lfsck->li_sleep_jif > 0 &&
1086             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1087                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1088                                        LWI_ON_SIGNAL_NOOP, NULL);
1089
1090                 l_wait_event(thread->t_ctl_waitq,
1091                              !thread_is_running(thread),
1092                              &lwi);
1093                 com->lc_new_scanned = 0;
1094         }
1095 }
1096
1097 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
1098                             struct lu_fid *fid)
1099 {
1100         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
1101                      !dt_try_as_dir(env, obj)))
1102                 return -ENOTDIR;
1103
1104         return dt_lookup(env, obj, (struct dt_rec *)fid,
1105                          (const struct dt_key *)"..", BYPASS_CAPA);
1106 }
1107
1108 static int lfsck_needs_scan_dir(const struct lu_env *env,
1109                                 struct lfsck_instance *lfsck,
1110                                 struct dt_object *obj)
1111 {
1112         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
1113         int            depth = 0;
1114         int            rc;
1115
1116         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
1117             list_empty(&lfsck->li_list_dir))
1118                RETURN(0);
1119
1120         while (1) {
1121                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
1122                  *      which is the agent directory to manage the objects
1123                  *      which name entries reside on remote MDTs. Related
1124                  *      consistency verification will be processed in LFSCK
1125                  *      phase III. */
1126                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
1127                         if (depth > 0)
1128                                 lfsck_object_put(env, obj);
1129                         return 1;
1130                 }
1131
1132                 /* No need to check .lustre and its children. */
1133                 if (fid_seq_is_dot_lustre(fid_seq(lfsck_dto2fid(obj)))) {
1134                         if (depth > 0)
1135                                 lfsck_object_put(env, obj);
1136                         return 0;
1137                 }
1138
1139                 dt_read_lock(env, obj, MOR_TGT_CHILD);
1140                 if (unlikely(lfsck_is_dead_obj(obj))) {
1141                         dt_read_unlock(env, obj);
1142                         if (depth > 0)
1143                                 lfsck_object_put(env, obj);
1144                         return 0;
1145                 }
1146
1147                 rc = dt_xattr_get(env, obj,
1148                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
1149                                   BYPASS_CAPA);
1150                 dt_read_unlock(env, obj);
1151                 if (rc >= 0) {
1152                         if (depth > 0)
1153                                 lfsck_object_put(env, obj);
1154                         return 1;
1155                 }
1156
1157                 if (rc < 0 && rc != -ENODATA) {
1158                         if (depth > 0)
1159                                 lfsck_object_put(env, obj);
1160                         return rc;
1161                 }
1162
1163                 rc = lfsck_parent_fid(env, obj, fid);
1164                 if (depth > 0)
1165                         lfsck_object_put(env, obj);
1166                 if (rc != 0)
1167                         return rc;
1168
1169                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
1170                         return 0;
1171
1172                 obj = lfsck_object_find(env, lfsck, fid);
1173                 if (obj == NULL)
1174                         return 0;
1175                 else if (IS_ERR(obj))
1176                         return PTR_ERR(obj);
1177
1178                 if (!dt_object_exists(obj)) {
1179                         lfsck_object_put(env, obj);
1180                         return 0;
1181                 }
1182
1183                 if (dt_object_remote(obj)) {
1184                         /* .lustre/lost+found/MDTxxx can be remote directory. */
1185                         if (fid_seq_is_dot_lustre(fid_seq(lfsck_dto2fid(obj))))
1186                                 rc = 0;
1187                         else
1188                                 /* Other remote directory should be client
1189                                  * visible and need to be checked. */
1190                                 rc = 1;
1191                         lfsck_object_put(env, obj);
1192                         return rc;
1193                 }
1194
1195                 depth++;
1196         }
1197         return 0;
1198 }
1199
1200 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1201                                                  struct lfsck_component *com,
1202                                                  struct lfsck_start_param *lsp)
1203 {
1204         struct lfsck_thread_args *lta;
1205         int                       rc;
1206
1207         OBD_ALLOC_PTR(lta);
1208         if (lta == NULL)
1209                 return ERR_PTR(-ENOMEM);
1210
1211         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1212         if (rc != 0) {
1213                 OBD_FREE_PTR(lta);
1214                 return ERR_PTR(rc);
1215         }
1216
1217         lta->lta_lfsck = lfsck_instance_get(lfsck);
1218         if (com != NULL)
1219                 lta->lta_com = lfsck_component_get(com);
1220
1221         lta->lta_lsp = lsp;
1222
1223         return lta;
1224 }
1225
1226 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1227 {
1228         if (lta->lta_com != NULL)
1229                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1230         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1231         lu_env_fini(&lta->lta_env);
1232         OBD_FREE_PTR(lta);
1233 }
1234
1235 /* LFSCK wrap functions */
1236
1237 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
1238                 bool new_checked)
1239 {
1240         struct lfsck_component *com;
1241
1242         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1243                 com->lc_ops->lfsck_fail(env, com, new_checked);
1244         }
1245 }
1246
1247 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
1248 {
1249         struct lfsck_component *com;
1250         int                     rc  = 0;
1251         int                     rc1 = 0;
1252
1253         if (likely(cfs_time_beforeq(cfs_time_current(),
1254                                     lfsck->li_time_next_checkpoint)))
1255                 return 0;
1256
1257         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1258         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1259                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
1260                 if (rc != 0)
1261                         rc1 = rc;
1262         }
1263
1264         lfsck->li_time_last_checkpoint = cfs_time_current();
1265         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1266                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1267         return rc1 != 0 ? rc1 : rc;
1268 }
1269
1270 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
1271                struct lfsck_start_param *lsp)
1272 {
1273         struct dt_object       *obj     = NULL;
1274         struct lfsck_component *com;
1275         struct lfsck_component *next;
1276         struct lfsck_position  *pos     = NULL;
1277         const struct dt_it_ops *iops    =
1278                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
1279         struct dt_it           *di;
1280         int                     rc;
1281         ENTRY;
1282
1283         LASSERT(lfsck->li_obj_dir == NULL);
1284         LASSERT(lfsck->li_di_dir == NULL);
1285
1286         lfsck->li_current_oit_processed = 0;
1287         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1288                 com->lc_new_checked = 0;
1289                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1290                         com->lc_journal = 0;
1291
1292                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
1293                 if (rc != 0)
1294                         GOTO(out, rc);
1295
1296                 if ((pos == NULL) ||
1297                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
1298                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
1299                         pos = &com->lc_pos_start;
1300         }
1301
1302         /* Init otable-based iterator. */
1303         if (pos == NULL) {
1304                 rc = iops->load(env, lfsck->li_di_oit, 0);
1305                 if (rc > 0) {
1306                         lfsck->li_oit_over = 1;
1307                         rc = 0;
1308                 }
1309
1310                 GOTO(out, rc);
1311         }
1312
1313         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
1314         if (rc < 0)
1315                 GOTO(out, rc);
1316         else if (rc > 0)
1317                 lfsck->li_oit_over = 1;
1318
1319         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
1320                 GOTO(out, rc = 0);
1321
1322         /* Find the directory for namespace-based traverse. */
1323         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
1324         if (obj == NULL)
1325                 GOTO(out, rc = 0);
1326         else if (IS_ERR(obj))
1327                 RETURN(PTR_ERR(obj));
1328
1329         /* XXX: Currently, skip remote object, the consistency for
1330          *      remote object will be processed in LFSCK phase III. */
1331         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
1332             unlikely(!S_ISDIR(lfsck_object_type(obj))))
1333                 GOTO(out, rc = 0);
1334
1335         if (unlikely(!dt_try_as_dir(env, obj)))
1336                 GOTO(out, rc = -ENOTDIR);
1337
1338         /* Init the namespace-based directory traverse. */
1339         iops = &obj->do_index_ops->dio_it;
1340         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1341         if (IS_ERR(di))
1342                 GOTO(out, rc = PTR_ERR(di));
1343
1344         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
1345
1346         rc = iops->load(env, di, pos->lp_dir_cookie);
1347         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
1348                 rc = iops->next(env, di);
1349         else if (rc > 0)
1350                 rc = 0;
1351
1352         if (rc != 0) {
1353                 iops->put(env, di);
1354                 iops->fini(env, di);
1355                 GOTO(out, rc);
1356         }
1357
1358         lfsck->li_obj_dir = lfsck_object_get(obj);
1359         lfsck->li_cookie_dir = iops->store(env, di);
1360         spin_lock(&lfsck->li_lock);
1361         lfsck->li_di_dir = di;
1362         spin_unlock(&lfsck->li_lock);
1363
1364         GOTO(out, rc = 0);
1365
1366 out:
1367         if (obj != NULL)
1368                 lfsck_object_put(env, obj);
1369
1370         if (rc < 0) {
1371                 list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1372                                          lc_link)
1373                         com->lc_ops->lfsck_post(env, com, rc, true);
1374
1375                 return rc;
1376         }
1377
1378         rc = 0;
1379         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
1380         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1381                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
1382                 if (rc != 0)
1383                         break;
1384         }
1385
1386         lfsck->li_time_last_checkpoint = cfs_time_current();
1387         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1388                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1389         return rc;
1390 }
1391
1392 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
1393                    struct dt_object *obj)
1394 {
1395         struct lfsck_component *com;
1396         const struct dt_it_ops *iops;
1397         struct dt_it           *di;
1398         int                     rc;
1399         ENTRY;
1400
1401         LASSERT(lfsck->li_obj_dir == NULL);
1402
1403         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1404                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
1405                 if (rc != 0)
1406                         RETURN(rc);
1407         }
1408
1409         rc = lfsck_needs_scan_dir(env, lfsck, obj);
1410         if (rc <= 0)
1411                 GOTO(out, rc);
1412
1413         if (unlikely(!dt_try_as_dir(env, obj)))
1414                 GOTO(out, rc = -ENOTDIR);
1415
1416         iops = &obj->do_index_ops->dio_it;
1417         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
1418         if (IS_ERR(di))
1419                 GOTO(out, rc = PTR_ERR(di));
1420
1421         rc = iops->load(env, di, 0);
1422         if (rc == 0)
1423                 rc = iops->next(env, di);
1424         else if (rc > 0)
1425                 rc = 0;
1426
1427         if (rc != 0) {
1428                 iops->put(env, di);
1429                 iops->fini(env, di);
1430                 GOTO(out, rc);
1431         }
1432
1433         lfsck->li_obj_dir = lfsck_object_get(obj);
1434         lfsck->li_cookie_dir = iops->store(env, di);
1435         spin_lock(&lfsck->li_lock);
1436         lfsck->li_di_dir = di;
1437         spin_unlock(&lfsck->li_lock);
1438
1439         GOTO(out, rc = 0);
1440
1441 out:
1442         if (rc < 0)
1443                 lfsck_fail(env, lfsck, false);
1444         return (rc > 0 ? 0 : rc);
1445 }
1446
1447 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
1448                    struct dt_object *obj, struct lu_dirent *ent)
1449 {
1450         struct lfsck_component *com;
1451         int                     rc;
1452
1453         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1454                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
1455                 if (rc != 0)
1456                         return rc;
1457         }
1458         return 0;
1459 }
1460
1461 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
1462                int result)
1463 {
1464         struct lfsck_component *com;
1465         struct lfsck_component *next;
1466         int                     rc  = 0;
1467         int                     rc1 = 0;
1468
1469         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
1470         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1471                 rc = com->lc_ops->lfsck_post(env, com, result, false);
1472                 if (rc != 0)
1473                         rc1 = rc;
1474         }
1475
1476         lfsck->li_time_last_checkpoint = cfs_time_current();
1477         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1478                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1479
1480         /* Ignore some component post failure to make other can go ahead. */
1481         return result;
1482 }
1483
1484 static void lfsck_interpret(const struct lu_env *env,
1485                             struct lfsck_instance *lfsck,
1486                             struct ptlrpc_request *req, void *args, int result)
1487 {
1488         struct lfsck_async_interpret_args *laia = args;
1489         struct lfsck_component            *com;
1490
1491         LASSERT(laia->laia_com == NULL);
1492         LASSERT(laia->laia_shared);
1493
1494         spin_lock(&lfsck->li_lock);
1495         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1496                 if (com->lc_ops->lfsck_interpret != NULL) {
1497                         laia->laia_com = com;
1498                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1499                 }
1500         }
1501
1502         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1503                 if (com->lc_ops->lfsck_interpret != NULL) {
1504                         laia->laia_com = com;
1505                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1506                 }
1507         }
1508         spin_unlock(&lfsck->li_lock);
1509 }
1510
1511 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
1512 {
1513         struct lfsck_component *com;
1514         struct lfsck_component *next;
1515         struct l_wait_info      lwi = { 0 };
1516         int                     rc  = 0;
1517         int                     rc1 = 0;
1518
1519         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1520                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1521                         com->lc_journal = 0;
1522
1523                 rc = com->lc_ops->lfsck_double_scan(env, com);
1524                 if (rc != 0)
1525                         rc1 = rc;
1526         }
1527
1528         l_wait_event(lfsck->li_thread.t_ctl_waitq,
1529                      atomic_read(&lfsck->li_double_scan_count) == 0,
1530                      &lwi);
1531
1532         if (lfsck->li_status != LS_PAUSED &&
1533             lfsck->li_status != LS_CO_PAUSED) {
1534                 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1535                                          lc_link) {
1536                         spin_lock(&lfsck->li_lock);
1537                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1538                         spin_unlock(&lfsck->li_lock);
1539                 }
1540         }
1541
1542         return rc1 != 0 ? rc1 : rc;
1543 }
1544
1545 static int lfsck_stop_notify(const struct lu_env *env,
1546                              struct lfsck_instance *lfsck,
1547                              struct lfsck_tgt_descs *ltds,
1548                              struct lfsck_tgt_desc *ltd, __u16 type)
1549 {
1550         struct ptlrpc_request_set *set;
1551         struct lfsck_component    *com;
1552         int                        rc  = 0;
1553         ENTRY;
1554
1555         spin_lock(&lfsck->li_lock);
1556         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1557         if (com == NULL)
1558                 com = __lfsck_component_find(lfsck, type,
1559                                              &lfsck->li_list_double_scan);
1560         if (com != NULL)
1561                 lfsck_component_get(com);
1562         spin_unlock(&lfsck->li_lock);
1563
1564         if (com != NULL) {
1565                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1566                         set = ptlrpc_prep_set();
1567                         if (set == NULL) {
1568                                 lfsck_component_put(env, com);
1569
1570                                 RETURN(-ENOMEM);
1571                         }
1572
1573                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1574                                                             ltd, set);
1575                         if (rc == 0)
1576                                 rc = ptlrpc_set_wait(set);
1577
1578                         ptlrpc_set_destroy(set);
1579                 }
1580
1581                 lfsck_component_put(env, com);
1582         }
1583
1584         RETURN(rc);
1585 }
1586
1587 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
1588 {
1589         struct lfsck_component *com;
1590         struct lfsck_component *next;
1591
1592         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1593                                  lc_link) {
1594                 if (com->lc_ops->lfsck_quit != NULL)
1595                         com->lc_ops->lfsck_quit(env, com);
1596
1597                 spin_lock(&lfsck->li_lock);
1598                 list_del_init(&com->lc_link_dir);
1599                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1600                 spin_unlock(&lfsck->li_lock);
1601         }
1602
1603         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1604                                  lc_link) {
1605                 if (com->lc_ops->lfsck_quit != NULL)
1606                         com->lc_ops->lfsck_quit(env, com);
1607
1608                 spin_lock(&lfsck->li_lock);
1609                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1610                 spin_unlock(&lfsck->li_lock);
1611         }
1612 }
1613
1614 static int lfsck_async_interpret(const struct lu_env *env,
1615                                  struct ptlrpc_request *req,
1616                                  void *args, int rc)
1617 {
1618         struct lfsck_async_interpret_args *laia = args;
1619         struct lfsck_instance             *lfsck;
1620
1621         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1622                               li_mdt_descs);
1623         lfsck_interpret(env, lfsck, req, laia, rc);
1624         lfsck_tgt_put(laia->laia_ltd);
1625         if (rc != 0 && laia->laia_result != -EALREADY)
1626                 laia->laia_result = rc;
1627
1628         return 0;
1629 }
1630
1631 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1632                         struct lfsck_request *lr,
1633                         struct ptlrpc_request_set *set,
1634                         ptlrpc_interpterer_t interpreter,
1635                         void *args, int request)
1636 {
1637         struct lfsck_async_interpret_args *laia;
1638         struct ptlrpc_request             *req;
1639         struct lfsck_request              *tmp;
1640         struct req_format                 *format;
1641         int                                rc;
1642
1643         switch (request) {
1644         case LFSCK_NOTIFY:
1645                 format = &RQF_LFSCK_NOTIFY;
1646                 break;
1647         case LFSCK_QUERY:
1648                 format = &RQF_LFSCK_QUERY;
1649                 break;
1650         default:
1651                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
1652                        exp->exp_obd->obd_name, request, -EINVAL);
1653                 return -EINVAL;
1654         }
1655
1656         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1657         if (req == NULL)
1658                 return -ENOMEM;
1659
1660         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1661         if (rc != 0) {
1662                 ptlrpc_request_free(req);
1663
1664                 return rc;
1665         }
1666
1667         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1668         *tmp = *lr;
1669         ptlrpc_request_set_replen(req);
1670
1671         laia = ptlrpc_req_async_args(req);
1672         *laia = *(struct lfsck_async_interpret_args *)args;
1673         if (laia->laia_com != NULL)
1674                 lfsck_component_get(laia->laia_com);
1675         req->rq_interpret_reply = interpreter;
1676         ptlrpc_set_add_req(set, req);
1677
1678         return 0;
1679 }
1680
1681 /* external interfaces */
1682
1683 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
1684 {
1685         struct lu_env           env;
1686         struct lfsck_instance  *lfsck;
1687         int                     rc;
1688         ENTRY;
1689
1690         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1691         if (rc != 0)
1692                 RETURN(rc);
1693
1694         lfsck = lfsck_instance_find(key, true, false);
1695         if (likely(lfsck != NULL)) {
1696                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
1697                 lfsck_instance_put(&env, lfsck);
1698         } else {
1699                 rc = -ENXIO;
1700         }
1701
1702         lu_env_fini(&env);
1703
1704         RETURN(rc);
1705 }
1706 EXPORT_SYMBOL(lfsck_get_speed);
1707
1708 int lfsck_set_speed(struct dt_device *key, int val)
1709 {
1710         struct lu_env           env;
1711         struct lfsck_instance  *lfsck;
1712         int                     rc;
1713         ENTRY;
1714
1715         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1716         if (rc != 0)
1717                 RETURN(rc);
1718
1719         lfsck = lfsck_instance_find(key, true, false);
1720         if (likely(lfsck != NULL)) {
1721                 mutex_lock(&lfsck->li_mutex);
1722                 if (__lfsck_set_speed(lfsck, val))
1723                         rc = lfsck_bookmark_store(&env, lfsck);
1724                 mutex_unlock(&lfsck->li_mutex);
1725                 lfsck_instance_put(&env, lfsck);
1726         } else {
1727                 rc = -ENXIO;
1728         }
1729
1730         lu_env_fini(&env);
1731
1732         RETURN(rc);
1733 }
1734 EXPORT_SYMBOL(lfsck_set_speed);
1735
1736 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
1737 {
1738         struct lu_env           env;
1739         struct lfsck_instance  *lfsck;
1740         int                     rc;
1741         ENTRY;
1742
1743         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1744         if (rc != 0)
1745                 RETURN(rc);
1746
1747         lfsck = lfsck_instance_find(key, true, false);
1748         if (likely(lfsck != NULL)) {
1749                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
1750                 lfsck_instance_put(&env, lfsck);
1751         } else {
1752                 rc = -ENXIO;
1753         }
1754
1755         lu_env_fini(&env);
1756
1757         RETURN(rc);
1758 }
1759 EXPORT_SYMBOL(lfsck_get_windows);
1760
1761 int lfsck_set_windows(struct dt_device *key, int val)
1762 {
1763         struct lu_env           env;
1764         struct lfsck_instance  *lfsck;
1765         int                     rc;
1766         ENTRY;
1767
1768         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1769         if (rc != 0)
1770                 RETURN(rc);
1771
1772         lfsck = lfsck_instance_find(key, true, false);
1773         if (likely(lfsck != NULL)) {
1774                 if (val > LFSCK_ASYNC_WIN_MAX) {
1775                         CWARN("%s: Too large async window size, which "
1776                               "may cause memory issues. The valid range "
1777                               "is [0 - %u]. If you do not want to restrict "
1778                               "the window size for async requests pipeline, "
1779                               "just set it as 0.\n",
1780                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1781                         rc = -EINVAL;
1782                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1783                         mutex_lock(&lfsck->li_mutex);
1784                         lfsck->li_bookmark_ram.lb_async_windows = val;
1785                         rc = lfsck_bookmark_store(&env, lfsck);
1786                         mutex_unlock(&lfsck->li_mutex);
1787                 }
1788                 lfsck_instance_put(&env, lfsck);
1789         } else {
1790                 rc = -ENXIO;
1791         }
1792
1793         lu_env_fini(&env);
1794
1795         RETURN(rc);
1796 }
1797 EXPORT_SYMBOL(lfsck_set_windows);
1798
1799 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
1800 {
1801         struct lu_env           env;
1802         struct lfsck_instance  *lfsck;
1803         struct lfsck_component *com;
1804         int                     rc;
1805         ENTRY;
1806
1807         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1808         if (rc != 0)
1809                 RETURN(rc);
1810
1811         lfsck = lfsck_instance_find(key, true, false);
1812         if (likely(lfsck != NULL)) {
1813                 com = lfsck_component_find(lfsck, type);
1814                 if (likely(com != NULL)) {
1815                         rc = com->lc_ops->lfsck_dump(&env, com, m);
1816                         lfsck_component_put(&env, com);
1817                 } else {
1818                         rc = -ENOTSUPP;
1819                 }
1820
1821                 lfsck_instance_put(&env, lfsck);
1822         } else {
1823                 rc = -ENXIO;
1824         }
1825
1826         lu_env_fini(&env);
1827
1828         RETURN(rc);
1829 }
1830 EXPORT_SYMBOL(lfsck_dump);
1831
1832 static int lfsck_stop_all(const struct lu_env *env,
1833                           struct lfsck_instance *lfsck,
1834                           struct lfsck_stop *stop)
1835 {
1836         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1837         struct lfsck_request              *lr     = &info->lti_lr;
1838         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1839         struct ptlrpc_request_set         *set;
1840         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1841         struct lfsck_tgt_desc             *ltd;
1842         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1843         __u32                              idx;
1844         int                                rc     = 0;
1845         int                                rc1    = 0;
1846         ENTRY;
1847
1848         LASSERT(stop->ls_flags & LPF_BROADCAST);
1849
1850         set = ptlrpc_prep_set();
1851         if (unlikely(set == NULL))
1852                 RETURN(-ENOMEM);
1853
1854         memset(lr, 0, sizeof(*lr));
1855         lr->lr_event = LE_STOP;
1856         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1857         lr->lr_status = stop->ls_status;
1858         lr->lr_version = bk->lb_version;
1859         lr->lr_active = LFSCK_TYPES_ALL;
1860         lr->lr_param = stop->ls_flags;
1861
1862         laia->laia_com = NULL;
1863         laia->laia_ltds = ltds;
1864         laia->laia_lr = lr;
1865         laia->laia_result = 0;
1866         laia->laia_shared = 1;
1867
1868         down_read(&ltds->ltd_rw_sem);
1869         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1870                 ltd = lfsck_tgt_get(ltds, idx);
1871                 LASSERT(ltd != NULL);
1872
1873                 laia->laia_ltd = ltd;
1874                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1875                                          lfsck_async_interpret, laia,
1876                                          LFSCK_NOTIFY);
1877                 if (rc != 0) {
1878                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1879                         lfsck_tgt_put(ltd);
1880                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
1881                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1882                         rc1 = rc;
1883                 }
1884         }
1885         up_read(&ltds->ltd_rw_sem);
1886
1887         rc = ptlrpc_set_wait(set);
1888         ptlrpc_set_destroy(set);
1889
1890         if (rc == 0)
1891                 rc = laia->laia_result;
1892
1893         if (rc == -EALREADY)
1894                 rc = 0;
1895
1896         if (rc != 0)
1897                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1898                        lfsck_lfsck2name(lfsck), rc);
1899
1900         RETURN(rc != 0 ? rc : rc1);
1901 }
1902
1903 static int lfsck_start_all(const struct lu_env *env,
1904                            struct lfsck_instance *lfsck,
1905                            struct lfsck_start *start)
1906 {
1907         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1908         struct lfsck_request              *lr     = &info->lti_lr;
1909         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1910         struct ptlrpc_request_set         *set;
1911         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1912         struct lfsck_tgt_desc             *ltd;
1913         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1914         __u32                              idx;
1915         int                                rc     = 0;
1916         ENTRY;
1917
1918         LASSERT(start->ls_flags & LPF_BROADCAST);
1919
1920         set = ptlrpc_prep_set();
1921         if (unlikely(set == NULL))
1922                 RETURN(-ENOMEM);
1923
1924         memset(lr, 0, sizeof(*lr));
1925         lr->lr_event = LE_START;
1926         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1927         lr->lr_speed = bk->lb_speed_limit;
1928         lr->lr_version = bk->lb_version;
1929         lr->lr_active = start->ls_active;
1930         lr->lr_param = start->ls_flags;
1931         lr->lr_async_windows = bk->lb_async_windows;
1932         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1933                        LSV_ASYNC_WINDOWS;
1934
1935         laia->laia_com = NULL;
1936         laia->laia_ltds = ltds;
1937         laia->laia_lr = lr;
1938         laia->laia_result = 0;
1939         laia->laia_shared = 1;
1940
1941         down_read(&ltds->ltd_rw_sem);
1942         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1943                 ltd = lfsck_tgt_get(ltds, idx);
1944                 LASSERT(ltd != NULL);
1945
1946                 laia->laia_ltd = ltd;
1947                 ltd->ltd_layout_done = 0;
1948                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1949                                          lfsck_async_interpret, laia,
1950                                          LFSCK_NOTIFY);
1951                 if (rc != 0) {
1952                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1953                         lfsck_tgt_put(ltd);
1954                         CERROR("%s: cannot notify MDT %x for LFSCK "
1955                                "start, failout: rc = %d\n",
1956                                lfsck_lfsck2name(lfsck), idx, rc);
1957                         break;
1958                 }
1959         }
1960         up_read(&ltds->ltd_rw_sem);
1961
1962         if (rc != 0) {
1963                 ptlrpc_set_destroy(set);
1964
1965                 RETURN(rc);
1966         }
1967
1968         rc = ptlrpc_set_wait(set);
1969         ptlrpc_set_destroy(set);
1970
1971         if (rc == 0)
1972                 rc = laia->laia_result;
1973
1974         if (rc != 0) {
1975                 struct lfsck_stop *stop = &info->lti_stop;
1976
1977                 CERROR("%s: cannot start LFSCK on some MDTs, "
1978                        "stop all: rc = %d\n",
1979                        lfsck_lfsck2name(lfsck), rc);
1980                 if (rc != -EALREADY) {
1981                         stop->ls_status = LS_FAILED;
1982                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1983                         lfsck_stop_all(env, lfsck, stop);
1984                 }
1985         }
1986
1987         RETURN(rc);
1988 }
1989
1990 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1991                 struct lfsck_start_param *lsp)
1992 {
1993         struct lfsck_start              *start  = lsp->lsp_start;
1994         struct lfsck_instance           *lfsck;
1995         struct lfsck_bookmark           *bk;
1996         struct ptlrpc_thread            *thread;
1997         struct lfsck_component          *com;
1998         struct l_wait_info               lwi    = { 0 };
1999         struct lfsck_thread_args        *lta;
2000         struct task_struct              *task;
2001         int                              rc     = 0;
2002         __u16                            valid  = 0;
2003         __u16                            flags  = 0;
2004         __u16                            type   = 1;
2005         ENTRY;
2006
2007         lfsck = lfsck_instance_find(key, true, false);
2008         if (unlikely(lfsck == NULL))
2009                 RETURN(-ENXIO);
2010
2011         /* System is not ready, try again later. */
2012         if (unlikely(lfsck->li_namespace == NULL))
2013                 GOTO(put, rc = -EAGAIN);
2014
2015         /* start == NULL means auto trigger paused LFSCK. */
2016         if ((start == NULL) &&
2017             (list_empty(&lfsck->li_list_scan) ||
2018              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2019                 GOTO(put, rc = 0);
2020
2021         bk = &lfsck->li_bookmark_ram;
2022         thread = &lfsck->li_thread;
2023         mutex_lock(&lfsck->li_mutex);
2024         spin_lock(&lfsck->li_lock);
2025         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2026                 rc = -EALREADY;
2027                 if (unlikely(start == NULL)) {
2028                         spin_unlock(&lfsck->li_lock);
2029                         GOTO(out, rc);
2030                 }
2031
2032                 while (start->ls_active != 0) {
2033                         if (!(type & start->ls_active)) {
2034                                 type <<= 1;
2035                                 continue;
2036                         }
2037
2038                         com = __lfsck_component_find(lfsck, type,
2039                                                      &lfsck->li_list_scan);
2040                         if (com == NULL)
2041                                 com = __lfsck_component_find(lfsck, type,
2042                                                 &lfsck->li_list_double_scan);
2043                         if (com == NULL) {
2044                                 rc = -EOPNOTSUPP;
2045                                 break;
2046                         }
2047
2048                         if (com->lc_ops->lfsck_join != NULL) {
2049                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2050                                 if (rc != 0 && rc != -EALREADY)
2051                                         break;
2052                         }
2053                         start->ls_active &= ~type;
2054                         type <<= 1;
2055                 }
2056                 spin_unlock(&lfsck->li_lock);
2057                 GOTO(out, rc);
2058         }
2059         spin_unlock(&lfsck->li_lock);
2060
2061         lfsck->li_status = 0;
2062         lfsck->li_oit_over = 0;
2063         lfsck->li_start_unplug = 0;
2064         lfsck->li_drop_dryrun = 0;
2065         lfsck->li_new_scanned = 0;
2066
2067         /* For auto trigger. */
2068         if (start == NULL)
2069                 goto trigger;
2070
2071         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2072                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2073                        lfsck_lfsck2name(lfsck));
2074
2075                 GOTO(out, rc = -EPERM);
2076         }
2077
2078         start->ls_version = bk->lb_version;
2079
2080         if (start->ls_active != 0) {
2081                 struct lfsck_component *next;
2082
2083                 if (start->ls_active == LFSCK_TYPES_ALL)
2084                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2085
2086                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2087                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2088                         GOTO(out, rc = -ENOTSUPP);
2089                 }
2090
2091                 list_for_each_entry_safe(com, next,
2092                                          &lfsck->li_list_scan, lc_link) {
2093                         if (!(com->lc_type & start->ls_active)) {
2094                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2095                                                              false);
2096                                 if (rc != 0)
2097                                         GOTO(out, rc);
2098                         }
2099                 }
2100
2101                 while (start->ls_active != 0) {
2102                         if (type & start->ls_active) {
2103                                 com = __lfsck_component_find(lfsck, type,
2104                                                         &lfsck->li_list_idle);
2105                                 if (com != NULL)
2106                                         /* The component status will be updated
2107                                          * when its prep() is called later by
2108                                          * the LFSCK main engine. */
2109                                         list_move_tail(&com->lc_link,
2110                                                        &lfsck->li_list_scan);
2111                                 start->ls_active &= ~type;
2112                         }
2113                         type <<= 1;
2114                 }
2115         }
2116
2117         if (list_empty(&lfsck->li_list_scan)) {
2118                 /* The speed limit will be used to control both the LFSCK and
2119                  * low layer scrub (if applied), need to be handled firstly. */
2120                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2121                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2122                                 rc = lfsck_bookmark_store(env, lfsck);
2123                                 if (rc != 0)
2124                                         GOTO(out, rc);
2125                         }
2126                 }
2127
2128                 goto trigger;
2129         }
2130
2131         if (start->ls_flags & LPF_RESET)
2132                 flags |= DOIF_RESET;
2133
2134         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2135         if (rc != 0)
2136                 GOTO(out, rc);
2137
2138         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2139                 start->ls_active |= com->lc_type;
2140                 if (flags & DOIF_RESET) {
2141                         rc = com->lc_ops->lfsck_reset(env, com, false);
2142                         if (rc != 0)
2143                                 GOTO(out, rc);
2144                 }
2145         }
2146
2147 trigger:
2148         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
2149         if (bk->lb_param & LPF_DRYRUN)
2150                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2151
2152         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2153                 valid |= DOIV_ERROR_HANDLE;
2154                 if (start->ls_flags & LPF_FAILOUT)
2155                         flags |= DOIF_FAILOUT;
2156         }
2157
2158         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2159                 valid |= DOIV_DRYRUN;
2160                 if (start->ls_flags & LPF_DRYRUN)
2161                         flags |= DOIF_DRYRUN;
2162         }
2163
2164         if (!list_empty(&lfsck->li_list_scan))
2165                 flags |= DOIF_OUTUSED;
2166
2167         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2168         thread_set_flags(thread, 0);
2169         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2170         if (IS_ERR(lta))
2171                 GOTO(out, rc = PTR_ERR(lta));
2172
2173         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2174         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2175         if (IS_ERR(task)) {
2176                 rc = PTR_ERR(task);
2177                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2178                        lfsck_lfsck2name(lfsck), rc);
2179                 lfsck_thread_args_fini(lta);
2180
2181                 GOTO(out, rc);
2182         }
2183
2184         l_wait_event(thread->t_ctl_waitq,
2185                      thread_is_running(thread) ||
2186                      thread_is_stopped(thread),
2187                      &lwi);
2188         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2189                 lfsck->li_start_unplug = 1;
2190                 wake_up_all(&thread->t_ctl_waitq);
2191
2192                 GOTO(out, rc = 0);
2193         }
2194
2195         /* release lfsck::li_mutex to avoid deadlock. */
2196         mutex_unlock(&lfsck->li_mutex);
2197         rc = lfsck_start_all(env, lfsck, start);
2198         if (rc != 0) {
2199                 spin_lock(&lfsck->li_lock);
2200                 if (thread_is_stopped(thread)) {
2201                         spin_unlock(&lfsck->li_lock);
2202                 } else {
2203                         lfsck->li_status = LS_FAILED;
2204                         lfsck->li_flags = 0;
2205                         thread_set_flags(thread, SVC_STOPPING);
2206                         spin_unlock(&lfsck->li_lock);
2207
2208                         lfsck->li_start_unplug = 1;
2209                         wake_up_all(&thread->t_ctl_waitq);
2210                         l_wait_event(thread->t_ctl_waitq,
2211                                      thread_is_stopped(thread),
2212                                      &lwi);
2213                 }
2214         } else {
2215                 lfsck->li_start_unplug = 1;
2216                 wake_up_all(&thread->t_ctl_waitq);
2217         }
2218
2219         GOTO(put, rc);
2220
2221 out:
2222         mutex_unlock(&lfsck->li_mutex);
2223
2224 put:
2225         lfsck_instance_put(env, lfsck);
2226
2227         return rc < 0 ? rc : 0;
2228 }
2229 EXPORT_SYMBOL(lfsck_start);
2230
2231 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2232                struct lfsck_stop *stop)
2233 {
2234         struct lfsck_instance   *lfsck;
2235         struct ptlrpc_thread    *thread;
2236         struct l_wait_info       lwi    = { 0 };
2237         int                      rc     = 0;
2238         int                      rc1    = 0;
2239         ENTRY;
2240
2241         lfsck = lfsck_instance_find(key, true, false);
2242         if (unlikely(lfsck == NULL))
2243                 RETURN(-ENXIO);
2244
2245         thread = &lfsck->li_thread;
2246         /* release lfsck::li_mutex to avoid deadlock. */
2247         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2248                 if (!lfsck->li_master) {
2249                         CERROR("%s: only allow to specify '-A' via MDS\n",
2250                                lfsck_lfsck2name(lfsck));
2251
2252                         GOTO(out, rc = -EPERM);
2253                 }
2254
2255                 rc1 = lfsck_stop_all(env, lfsck, stop);
2256         }
2257
2258         mutex_lock(&lfsck->li_mutex);
2259         spin_lock(&lfsck->li_lock);
2260         /* no error if LFSCK is already stopped, or was never started */
2261         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2262                 spin_unlock(&lfsck->li_lock);
2263                 GOTO(out, rc = 0);
2264         }
2265
2266         if (stop != NULL) {
2267                 lfsck->li_status = stop->ls_status;
2268                 lfsck->li_flags = stop->ls_flags;
2269         } else {
2270                 lfsck->li_status = LS_STOPPED;
2271                 lfsck->li_flags = 0;
2272         }
2273
2274         thread_set_flags(thread, SVC_STOPPING);
2275         spin_unlock(&lfsck->li_lock);
2276
2277         wake_up_all(&thread->t_ctl_waitq);
2278         l_wait_event(thread->t_ctl_waitq,
2279                      thread_is_stopped(thread),
2280                      &lwi);
2281
2282         GOTO(out, rc = 0);
2283
2284 out:
2285         mutex_unlock(&lfsck->li_mutex);
2286         lfsck_instance_put(env, lfsck);
2287
2288         return rc != 0 ? rc : rc1;
2289 }
2290 EXPORT_SYMBOL(lfsck_stop);
2291
2292 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2293                     struct lfsck_request *lr)
2294 {
2295         int rc = -EOPNOTSUPP;
2296         ENTRY;
2297
2298         switch (lr->lr_event) {
2299         case LE_START: {
2300                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2301                 struct lfsck_start_param  lsp;
2302
2303                 memset(start, 0, sizeof(*start));
2304                 start->ls_valid = lr->lr_valid;
2305                 start->ls_speed_limit = lr->lr_speed;
2306                 start->ls_version = lr->lr_version;
2307                 start->ls_active = lr->lr_active;
2308                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2309                 start->ls_async_windows = lr->lr_async_windows;
2310
2311                 lsp.lsp_start = start;
2312                 lsp.lsp_index = lr->lr_index;
2313                 lsp.lsp_index_valid = 1;
2314                 rc = lfsck_start(env, key, &lsp);
2315                 break;
2316         }
2317         case LE_STOP: {
2318                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2319
2320                 memset(stop, 0, sizeof(*stop));
2321                 stop->ls_status = lr->lr_status;
2322                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2323                 rc = lfsck_stop(env, key, stop);
2324                 break;
2325         }
2326         case LE_PHASE1_DONE:
2327         case LE_PHASE2_DONE:
2328         case LE_FID_ACCESSED:
2329         case LE_PEER_EXIT:
2330         case LE_CONDITIONAL_DESTROY:
2331         case LE_PAIRS_VERIFY: {
2332                 struct lfsck_instance  *lfsck;
2333                 struct lfsck_component *com;
2334
2335                 lfsck = lfsck_instance_find(key, true, false);
2336                 if (unlikely(lfsck == NULL))
2337                         RETURN(-ENXIO);
2338
2339                 com = lfsck_component_find(lfsck, lr->lr_active);
2340                 if (likely(com != NULL)) {
2341                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2342                         lfsck_component_put(env, com);
2343                 }
2344
2345                 lfsck_instance_put(env, lfsck);
2346                 break;
2347         }
2348         default:
2349                 break;
2350         }
2351
2352         RETURN(rc);
2353 }
2354 EXPORT_SYMBOL(lfsck_in_notify);
2355
2356 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2357                 struct lfsck_request *lr)
2358 {
2359         struct lfsck_instance  *lfsck;
2360         struct lfsck_component *com;
2361         int                     rc;
2362         ENTRY;
2363
2364         lfsck = lfsck_instance_find(key, true, false);
2365         if (unlikely(lfsck == NULL))
2366                 RETURN(-ENXIO);
2367
2368         com = lfsck_component_find(lfsck, lr->lr_active);
2369         if (likely(com != NULL)) {
2370                 rc = com->lc_ops->lfsck_query(env, com);
2371                 lfsck_component_put(env, com);
2372         } else {
2373                 rc = -ENOTSUPP;
2374         }
2375
2376         lfsck_instance_put(env, lfsck);
2377
2378         RETURN(rc);
2379 }
2380 EXPORT_SYMBOL(lfsck_query);
2381
2382 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2383                              struct ldlm_namespace *ns)
2384 {
2385         struct lfsck_instance  *lfsck;
2386         int                     rc      = -ENXIO;
2387
2388         lfsck = lfsck_instance_find(key, true, false);
2389         if (likely(lfsck != NULL)) {
2390                 lfsck->li_namespace = ns;
2391                 lfsck_instance_put(env, lfsck);
2392                 rc = 0;
2393         }
2394
2395         return rc;
2396 }
2397 EXPORT_SYMBOL(lfsck_register_namespace);
2398
2399 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2400                    struct dt_device *next, struct obd_device *obd,
2401                    lfsck_out_notify notify, void *notify_data, bool master)
2402 {
2403         struct lfsck_instance   *lfsck;
2404         struct dt_object        *root  = NULL;
2405         struct dt_object        *obj   = NULL;
2406         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2407         int                      rc;
2408         ENTRY;
2409
2410         lfsck = lfsck_instance_find(key, false, false);
2411         if (unlikely(lfsck != NULL))
2412                 RETURN(-EEXIST);
2413
2414         OBD_ALLOC_PTR(lfsck);
2415         if (lfsck == NULL)
2416                 RETURN(-ENOMEM);
2417
2418         mutex_init(&lfsck->li_mutex);
2419         spin_lock_init(&lfsck->li_lock);
2420         INIT_LIST_HEAD(&lfsck->li_link);
2421         INIT_LIST_HEAD(&lfsck->li_list_scan);
2422         INIT_LIST_HEAD(&lfsck->li_list_dir);
2423         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2424         INIT_LIST_HEAD(&lfsck->li_list_idle);
2425         atomic_set(&lfsck->li_ref, 1);
2426         atomic_set(&lfsck->li_double_scan_count, 0);
2427         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2428         lfsck->li_out_notify = notify;
2429         lfsck->li_out_notify_data = notify_data;
2430         lfsck->li_next = next;
2431         lfsck->li_bottom = key;
2432         lfsck->li_obd = obd;
2433
2434         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2435         if (rc != 0)
2436                 GOTO(out, rc);
2437
2438         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2439         if (rc != 0)
2440                 GOTO(out, rc);
2441
2442         fid->f_seq = FID_SEQ_LOCAL_NAME;
2443         fid->f_oid = 1;
2444         fid->f_ver = 0;
2445         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
2446         if (rc != 0)
2447                 GOTO(out, rc);
2448
2449         rc = dt_root_get(env, key, fid);
2450         if (rc != 0)
2451                 GOTO(out, rc);
2452
2453         root = dt_locate(env, key, fid);
2454         if (IS_ERR(root))
2455                 GOTO(out, rc = PTR_ERR(root));
2456
2457         if (unlikely(!dt_try_as_dir(env, root)))
2458                 GOTO(out, rc = -ENOTDIR);
2459
2460         lfsck->li_local_root_fid = *fid;
2461         if (master) {
2462                 lfsck->li_master = 1;
2463                 if (lfsck_dev_idx(key) == 0) {
2464                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
2465                         const struct lu_name *cname;
2466
2467                         rc = dt_lookup(env, root,
2468                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2469                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2470                         if (rc != 0)
2471                                 GOTO(out, rc);
2472
2473                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
2474                         if (IS_ERR(obj))
2475                                 GOTO(out, rc = PTR_ERR(obj));
2476
2477                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2478                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
2479                         if (rc != 0)
2480                                 GOTO(out, rc);
2481
2482                         lu_object_put(env, &obj->do_lu);
2483                         obj = dt_locate(env, key, fid);
2484                         if (IS_ERR(obj))
2485                                 GOTO(out, rc = PTR_ERR(obj));
2486
2487                         cname = lfsck_name_get_const(env, dotlustre,
2488                                                      strlen(dotlustre));
2489                         rc = lfsck_verify_linkea(env, key, obj, cname,
2490                                                  &lfsck->li_global_root_fid);
2491                         if (rc != 0)
2492                                 GOTO(out, rc);
2493
2494                         *pfid = *fid;
2495                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2496                                        (const struct dt_key *)lostfound,
2497                                        BYPASS_CAPA);
2498                         if (rc != 0)
2499                                 GOTO(out, rc);
2500
2501                         lu_object_put(env, &obj->do_lu);
2502                         obj = dt_locate(env, key, fid);
2503                         if (IS_ERR(obj))
2504                                 GOTO(out, rc = PTR_ERR(obj));
2505
2506                         cname = lfsck_name_get_const(env, lostfound,
2507                                                      strlen(lostfound));
2508                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
2509                         if (rc != 0)
2510                                 GOTO(out, rc);
2511
2512                         lu_object_put(env, &obj->do_lu);
2513                         obj = NULL;
2514                 }
2515         }
2516
2517         fid->f_seq = FID_SEQ_LOCAL_FILE;
2518         fid->f_oid = OTABLE_IT_OID;
2519         fid->f_ver = 0;
2520         obj = dt_locate(env, key, fid);
2521         if (IS_ERR(obj))
2522                 GOTO(out, rc = PTR_ERR(obj));
2523
2524         lu_object_get(&obj->do_lu);
2525         lfsck->li_obj_oit = obj;
2526         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2527         if (rc != 0)
2528                 GOTO(out, rc);
2529
2530         rc = lfsck_bookmark_setup(env, lfsck);
2531         if (rc != 0)
2532                 GOTO(out, rc);
2533
2534         if (master) {
2535                 rc = lfsck_fid_init(lfsck);
2536                 if (rc < 0)
2537                         GOTO(out, rc);
2538
2539                 rc = lfsck_namespace_setup(env, lfsck);
2540                 if (rc < 0)
2541                         GOTO(out, rc);
2542         }
2543
2544         rc = lfsck_layout_setup(env, lfsck);
2545         if (rc < 0)
2546                 GOTO(out, rc);
2547
2548         /* XXX: more LFSCK components initialization to be added here. */
2549
2550         rc = lfsck_instance_add(lfsck);
2551         if (rc == 0)
2552                 rc = lfsck_add_target_from_orphan(env, lfsck);
2553 out:
2554         if (obj != NULL && !IS_ERR(obj))
2555                 lu_object_put(env, &obj->do_lu);
2556         if (root != NULL && !IS_ERR(root))
2557                 lu_object_put(env, &root->do_lu);
2558         if (rc != 0)
2559                 lfsck_instance_cleanup(env, lfsck);
2560         return rc;
2561 }
2562 EXPORT_SYMBOL(lfsck_register);
2563
2564 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2565 {
2566         struct lfsck_instance *lfsck;
2567
2568         lfsck = lfsck_instance_find(key, false, true);
2569         if (lfsck != NULL)
2570                 lfsck_instance_put(env, lfsck);
2571 }
2572 EXPORT_SYMBOL(lfsck_degister);
2573
2574 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2575                      struct dt_device *tgt, struct obd_export *exp,
2576                      __u32 index, bool for_ost)
2577 {
2578         struct lfsck_instance   *lfsck;
2579         struct lfsck_tgt_desc   *ltd;
2580         int                      rc;
2581         ENTRY;
2582
2583         OBD_ALLOC_PTR(ltd);
2584         if (ltd == NULL)
2585                 RETURN(-ENOMEM);
2586
2587         ltd->ltd_tgt = tgt;
2588         ltd->ltd_key = key;
2589         ltd->ltd_exp = exp;
2590         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2591         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2592         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2593         atomic_set(&ltd->ltd_ref, 1);
2594         ltd->ltd_index = index;
2595
2596         spin_lock(&lfsck_instance_lock);
2597         lfsck = __lfsck_instance_find(key, true, false);
2598         if (lfsck == NULL) {
2599                 if (for_ost)
2600                         list_add_tail(&ltd->ltd_orphan_list,
2601                                       &lfsck_ost_orphan_list);
2602                 else
2603                         list_add_tail(&ltd->ltd_orphan_list,
2604                                       &lfsck_mdt_orphan_list);
2605                 spin_unlock(&lfsck_instance_lock);
2606
2607                 RETURN(0);
2608         }
2609         spin_unlock(&lfsck_instance_lock);
2610
2611         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2612         if (rc != 0)
2613                 lfsck_tgt_put(ltd);
2614
2615         lfsck_instance_put(env, lfsck);
2616
2617         RETURN(rc);
2618 }
2619 EXPORT_SYMBOL(lfsck_add_target);
2620
2621 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2622                       struct dt_device *tgt, __u32 index, bool for_ost)
2623 {
2624         struct lfsck_instance   *lfsck;
2625         struct lfsck_tgt_descs  *ltds;
2626         struct lfsck_tgt_desc   *ltd;
2627         struct list_head        *head;
2628
2629         if (for_ost)
2630                 head = &lfsck_ost_orphan_list;
2631         else
2632                 head = &lfsck_mdt_orphan_list;
2633
2634         spin_lock(&lfsck_instance_lock);
2635         list_for_each_entry(ltd, head, ltd_orphan_list) {
2636                 if (ltd->ltd_tgt == tgt) {
2637                         list_del_init(&ltd->ltd_orphan_list);
2638                         spin_unlock(&lfsck_instance_lock);
2639                         lfsck_tgt_put(ltd);
2640
2641                         return;
2642                 }
2643         }
2644
2645         ltd = NULL;
2646         lfsck = __lfsck_instance_find(key, true, false);
2647         spin_unlock(&lfsck_instance_lock);
2648         if (unlikely(lfsck == NULL))
2649                 return;
2650
2651         if (for_ost)
2652                 ltds = &lfsck->li_ost_descs;
2653         else
2654                 ltds = &lfsck->li_mdt_descs;
2655
2656         down_write(&ltds->ltd_rw_sem);
2657         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2658
2659         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2660                 goto unlock;
2661
2662         ltd = LTD_TGT(ltds, index);
2663         if (unlikely(ltd == NULL))
2664                 goto unlock;
2665
2666         LASSERT(ltds->ltd_tgtnr > 0);
2667
2668         ltds->ltd_tgtnr--;
2669         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2670         LTD_TGT(ltds, index) = NULL;
2671
2672 unlock:
2673         if (ltd == NULL) {
2674                 if (for_ost)
2675                         head = &lfsck->li_ost_descs.ltd_orphan;
2676                 else
2677                         head = &lfsck->li_mdt_descs.ltd_orphan;
2678
2679                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2680                         if (ltd->ltd_tgt == tgt) {
2681                                 list_del_init(&ltd->ltd_orphan_list);
2682                                 break;
2683                         }
2684                 }
2685         }
2686
2687         up_write(&ltds->ltd_rw_sem);
2688         if (ltd != NULL) {
2689                 spin_lock(&ltds->ltd_lock);
2690                 ltd->ltd_dead = 1;
2691                 spin_unlock(&ltds->ltd_lock);
2692                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
2693                 lfsck_tgt_put(ltd);
2694         }
2695
2696         lfsck_instance_put(env, lfsck);
2697 }
2698 EXPORT_SYMBOL(lfsck_del_target);
2699
2700 static int __init lfsck_init(void)
2701 {
2702         int rc;
2703
2704         INIT_LIST_HEAD(&lfsck_instance_list);
2705         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2706         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2707         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2708         rc = lu_context_key_register(&lfsck_thread_key);
2709         if (rc == 0) {
2710                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2711                 tgt_register_lfsck_query(lfsck_query);
2712         }
2713
2714         return rc;
2715 }
2716
2717 static void __exit lfsck_exit(void)
2718 {
2719         struct lfsck_tgt_desc *ltd;
2720         struct lfsck_tgt_desc *next;
2721
2722         LASSERT(list_empty(&lfsck_instance_list));
2723
2724         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2725                                  ltd_orphan_list) {
2726                 list_del_init(&ltd->ltd_orphan_list);
2727                 lfsck_tgt_put(ltd);
2728         }
2729
2730         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2731                                  ltd_orphan_list) {
2732                 list_del_init(&ltd->ltd_orphan_list);
2733                 lfsck_tgt_put(ltd);
2734         }
2735
2736         lu_context_key_degister(&lfsck_thread_key);
2737 }
2738
2739 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2740 MODULE_DESCRIPTION("LFSCK");
2741 MODULE_LICENSE("GPL");
2742
2743 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);