Whamcloud - gitweb
LU-1267 lfsck: framework (3) for MDT-OST consistency
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         OBD_FREE_PTR(info);
55 }
56
57 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
58 LU_KEY_INIT_GENERIC(lfsck);
59
60 static CFS_LIST_HEAD(lfsck_instance_list);
61 static struct list_head lfsck_ost_orphan_list;
62 static struct list_head lfsck_mdt_orphan_list;
63 static DEFINE_SPINLOCK(lfsck_instance_lock);
64
65 static const char *lfsck_status_names[] = {
66         [LS_INIT]               = "init",
67         [LS_SCANNING_PHASE1]    = "scanning-phase1",
68         [LS_SCANNING_PHASE2]    = "scanning-phase2",
69         [LS_COMPLETED]          = "completed",
70         [LS_FAILED]             = "failed",
71         [LS_STOPPED]            = "stopped",
72         [LS_PAUSED]             = "paused",
73         [LS_CRASHED]            = "crashed",
74         [LS_PARTIAL]            = "partial",
75         [LS_CO_FAILED]          = "co-failed",
76         [LS_CO_STOPPED]         = "co-stopped",
77         [LS_CO_PAUSED]          = "co-paused"
78 };
79
80 const char *lfsck_flags_names[] = {
81         "scanned-once",
82         "inconsistent",
83         "upgrade",
84         "incomplete",
85         "crashed_lastid",
86         NULL
87 };
88
89 const char *lfsck_param_names[] = {
90         NULL,
91         "failout",
92         "dryrun",
93         NULL
94 };
95
96 const char *lfsck_status2names(enum lfsck_status status)
97 {
98         if (unlikely(status < 0 || status >= LS_MAX))
99                 return "unknown";
100
101         return lfsck_status_names[status];
102 }
103
104 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
105 {
106         spin_lock_init(&ltds->ltd_lock);
107         init_rwsem(&ltds->ltd_rw_sem);
108         INIT_LIST_HEAD(&ltds->ltd_orphan);
109         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
110         if (ltds->ltd_tgts_bitmap == NULL)
111                 return -ENOMEM;
112
113         return 0;
114 }
115
116 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
117 {
118         struct lfsck_tgt_desc   *ltd;
119         struct lfsck_tgt_desc   *next;
120         int                      idx;
121
122         down_write(&ltds->ltd_rw_sem);
123
124         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
125                                  ltd_orphan_list) {
126                 list_del_init(&ltd->ltd_orphan_list);
127                 lfsck_tgt_put(ltd);
128         }
129
130         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
131                 up_write(&ltds->ltd_rw_sem);
132
133                 return;
134         }
135
136         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
137                 ltd = LTD_TGT(ltds, idx);
138                 if (likely(ltd != NULL)) {
139                         LASSERT(list_empty(&ltd->ltd_layout_list));
140
141                         ltds->ltd_tgtnr--;
142                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
143                         LTD_TGT(ltds, idx) = NULL;
144                         lfsck_tgt_put(ltd);
145                 }
146         }
147
148         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
149                  ltds->ltd_tgtnr);
150
151         for (idx = 0; idx < TGT_PTRS; idx++) {
152                 if (ltds->ltd_tgts_idx[idx] != NULL) {
153                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
154                         ltds->ltd_tgts_idx[idx] = NULL;
155                 }
156         }
157
158         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
159         ltds->ltd_tgts_bitmap = NULL;
160         up_write(&ltds->ltd_rw_sem);
161 }
162
163 static int __lfsck_add_target(const struct lu_env *env,
164                               struct lfsck_instance *lfsck,
165                               struct lfsck_tgt_desc *ltd,
166                               bool for_ost, bool locked)
167 {
168         struct lfsck_tgt_descs *ltds;
169         __u32                   index = ltd->ltd_index;
170         int                     rc    = 0;
171         ENTRY;
172
173         if (for_ost)
174                 ltds = &lfsck->li_ost_descs;
175         else
176                 ltds = &lfsck->li_mdt_descs;
177
178         if (!locked)
179                 down_write(&ltds->ltd_rw_sem);
180
181         LASSERT(ltds->ltd_tgts_bitmap != NULL);
182
183         if (index >= ltds->ltd_tgts_bitmap->size) {
184                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
185                                     (__u32)BITS_PER_LONG);
186                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
187                 cfs_bitmap_t *new_bitmap;
188
189                 while (newsize < index + 1)
190                         newsize <<= 1;
191
192                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
193                 if (new_bitmap == NULL)
194                         GOTO(unlock, rc = -ENOMEM);
195
196                 if (ltds->ltd_tgtnr > 0)
197                         cfs_bitmap_copy(new_bitmap, old_bitmap);
198                 ltds->ltd_tgts_bitmap = new_bitmap;
199                 CFS_FREE_BITMAP(old_bitmap);
200         }
201
202         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
203                 CERROR("%s: the device %s (%u) is registered already\n",
204                        lfsck_lfsck2name(lfsck),
205                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
206                 GOTO(unlock, rc = -EEXIST);
207         }
208
209         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
210                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
211                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
212                         GOTO(unlock, rc = -ENOMEM);
213         }
214
215         LTD_TGT(ltds, index) = ltd;
216         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
217         ltds->ltd_tgtnr++;
218
219         GOTO(unlock, rc = 0);
220
221 unlock:
222         if (!locked)
223                 up_write(&ltds->ltd_rw_sem);
224
225         return rc;
226 }
227
228 static int lfsck_add_target_from_orphan(const struct lu_env *env,
229                                         struct lfsck_instance *lfsck)
230 {
231         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
232         struct lfsck_tgt_desc   *ltd;
233         struct lfsck_tgt_desc   *next;
234         struct list_head        *head    = &lfsck_ost_orphan_list;
235         int                      rc;
236         bool                     for_ost = true;
237
238 again:
239         spin_lock(&lfsck_instance_lock);
240         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
241                 if (ltd->ltd_key == lfsck->li_bottom) {
242                         list_del_init(&ltd->ltd_orphan_list);
243                         list_add_tail(&ltd->ltd_orphan_list,
244                                       &ltds->ltd_orphan);
245                 }
246         }
247         spin_unlock(&lfsck_instance_lock);
248
249         down_write(&ltds->ltd_rw_sem);
250         while (!list_empty(&ltds->ltd_orphan)) {
251                 ltd = list_entry(ltds->ltd_orphan.next,
252                                  struct lfsck_tgt_desc,
253                                  ltd_orphan_list);
254                 list_del_init(&ltd->ltd_orphan_list);
255                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
256                 /* Do not hold the semaphore for too long time. */
257                 up_write(&ltds->ltd_rw_sem);
258                 if (rc != 0)
259                         return rc;
260
261                 down_write(&ltds->ltd_rw_sem);
262         }
263         up_write(&ltds->ltd_rw_sem);
264
265         if (for_ost) {
266                 ltds = &lfsck->li_mdt_descs;
267                 head = &lfsck_mdt_orphan_list;
268                 for_ost = false;
269                 goto again;
270         }
271
272         return 0;
273 }
274
275 static inline struct lfsck_component *
276 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
277 {
278         struct lfsck_component *com;
279
280         cfs_list_for_each_entry(com, list, lc_link) {
281                 if (com->lc_type == type)
282                         return com;
283         }
284         return NULL;
285 }
286
287 static struct lfsck_component *
288 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
289 {
290         struct lfsck_component *com;
291
292         spin_lock(&lfsck->li_lock);
293         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
294         if (com != NULL)
295                 goto unlock;
296
297         com = __lfsck_component_find(lfsck, type,
298                                      &lfsck->li_list_double_scan);
299         if (com != NULL)
300                 goto unlock;
301
302         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
303
304 unlock:
305         if (com != NULL)
306                 lfsck_component_get(com);
307         spin_unlock(&lfsck->li_lock);
308         return com;
309 }
310
311 void lfsck_component_cleanup(const struct lu_env *env,
312                              struct lfsck_component *com)
313 {
314         if (!cfs_list_empty(&com->lc_link))
315                 cfs_list_del_init(&com->lc_link);
316         if (!cfs_list_empty(&com->lc_link_dir))
317                 cfs_list_del_init(&com->lc_link_dir);
318
319         lfsck_component_put(env, com);
320 }
321
322 void lfsck_instance_cleanup(const struct lu_env *env,
323                             struct lfsck_instance *lfsck)
324 {
325         struct ptlrpc_thread    *thread = &lfsck->li_thread;
326         struct lfsck_component  *com;
327         ENTRY;
328
329         LASSERT(list_empty(&lfsck->li_link));
330         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
331
332         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
333         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
334
335         if (lfsck->li_obj_oit != NULL) {
336                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
337                 lfsck->li_obj_oit = NULL;
338         }
339
340         LASSERT(lfsck->li_obj_dir == NULL);
341
342         while (!cfs_list_empty(&lfsck->li_list_scan)) {
343                 com = cfs_list_entry(lfsck->li_list_scan.next,
344                                      struct lfsck_component,
345                                      lc_link);
346                 lfsck_component_cleanup(env, com);
347         }
348
349         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
350
351         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
352                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
353                                      struct lfsck_component,
354                                      lc_link);
355                 lfsck_component_cleanup(env, com);
356         }
357
358         while (!cfs_list_empty(&lfsck->li_list_idle)) {
359                 com = cfs_list_entry(lfsck->li_list_idle.next,
360                                      struct lfsck_component,
361                                      lc_link);
362                 lfsck_component_cleanup(env, com);
363         }
364
365         if (lfsck->li_bookmark_obj != NULL) {
366                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
367                 lfsck->li_bookmark_obj = NULL;
368         }
369
370         if (lfsck->li_los != NULL) {
371                 local_oid_storage_fini(env, lfsck->li_los);
372                 lfsck->li_los = NULL;
373         }
374
375         OBD_FREE_PTR(lfsck);
376 }
377
378 static inline struct lfsck_instance *
379 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
380 {
381         struct lfsck_instance *lfsck;
382
383         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
384                 if (lfsck->li_bottom == key) {
385                         if (ref)
386                                 lfsck_instance_get(lfsck);
387                         if (unlink)
388                                 list_del_init(&lfsck->li_link);
389
390                         return lfsck;
391                 }
392         }
393
394         return NULL;
395 }
396
397 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
398                                                          bool ref, bool unlink)
399 {
400         struct lfsck_instance *lfsck;
401
402         spin_lock(&lfsck_instance_lock);
403         lfsck = __lfsck_instance_find(key, ref, unlink);
404         spin_unlock(&lfsck_instance_lock);
405
406         return lfsck;
407 }
408
409 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
410 {
411         struct lfsck_instance *tmp;
412
413         spin_lock(&lfsck_instance_lock);
414         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
415                 if (lfsck->li_bottom == tmp->li_bottom) {
416                         spin_unlock(&lfsck_instance_lock);
417                         return -EEXIST;
418                 }
419         }
420
421         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
422         spin_unlock(&lfsck_instance_lock);
423         return 0;
424 }
425
426 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
427                     const char *prefix)
428 {
429         int save = *len;
430         int flag;
431         int rc;
432         int i;
433
434         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
435         if (rc <= 0)
436                 return -ENOSPC;
437
438         *buf += rc;
439         *len -= rc;
440         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
441                 if (flag & bits) {
442                         bits &= ~flag;
443                         if (names[i] != NULL) {
444                                 rc = snprintf(*buf, *len, "%s%c", names[i],
445                                               bits != 0 ? ',' : '\n');
446                                 if (rc <= 0)
447                                         return -ENOSPC;
448
449                                 *buf += rc;
450                                 *len -= rc;
451                         }
452                 }
453         }
454         return save - *len;
455 }
456
457 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
458 {
459         int rc;
460
461         if (time != 0)
462                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
463                               cfs_time_current_sec() - time);
464         else
465                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
466         if (rc <= 0)
467                 return -ENOSPC;
468
469         *buf += rc;
470         *len -= rc;
471         return rc;
472 }
473
474 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
475                    const char *prefix)
476 {
477         int rc;
478
479         if (fid_is_zero(&pos->lp_dir_parent)) {
480                 if (pos->lp_oit_cookie == 0)
481                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
482                                       prefix);
483                 else
484                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
485                                       prefix, pos->lp_oit_cookie);
486         } else {
487                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
488                               prefix, pos->lp_oit_cookie,
489                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
490         }
491         if (rc <= 0)
492                 return -ENOSPC;
493
494         *buf += rc;
495         *len -= rc;
496         return rc;
497 }
498
499 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
500                     struct lfsck_position *pos, bool init)
501 {
502         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
503
504         if (unlikely(lfsck->li_di_oit == NULL)) {
505                 memset(pos, 0, sizeof(*pos));
506                 return;
507         }
508
509         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
510         if (!lfsck->li_current_oit_processed && !init)
511                 pos->lp_oit_cookie--;
512
513         LASSERT(pos->lp_oit_cookie > 0);
514
515         if (lfsck->li_di_dir != NULL) {
516                 struct dt_object *dto = lfsck->li_obj_dir;
517
518                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
519                                                         lfsck->li_di_dir);
520
521                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
522                         fid_zero(&pos->lp_dir_parent);
523                         pos->lp_dir_cookie = 0;
524                 } else {
525                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
526                 }
527         } else {
528                 fid_zero(&pos->lp_dir_parent);
529                 pos->lp_dir_cookie = 0;
530         }
531 }
532
533 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
534 {
535         lfsck->li_bookmark_ram.lb_speed_limit = limit;
536         if (limit != LFSCK_SPEED_NO_LIMIT) {
537                 if (limit > HZ) {
538                         lfsck->li_sleep_rate = limit / HZ;
539                         lfsck->li_sleep_jif = 1;
540                 } else {
541                         lfsck->li_sleep_rate = 1;
542                         lfsck->li_sleep_jif = HZ / limit;
543                 }
544         } else {
545                 lfsck->li_sleep_jif = 0;
546                 lfsck->li_sleep_rate = 0;
547         }
548 }
549
550 void lfsck_control_speed(struct lfsck_instance *lfsck)
551 {
552         struct ptlrpc_thread *thread = &lfsck->li_thread;
553         struct l_wait_info    lwi;
554
555         if (lfsck->li_sleep_jif > 0 &&
556             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
557                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
558                                        LWI_ON_SIGNAL_NOOP, NULL);
559
560                 l_wait_event(thread->t_ctl_waitq,
561                              !thread_is_running(thread),
562                              &lwi);
563                 lfsck->li_new_scanned = 0;
564         }
565 }
566
567 void lfsck_control_speed_by_self(struct lfsck_component *com)
568 {
569         struct lfsck_instance   *lfsck  = com->lc_lfsck;
570         struct ptlrpc_thread    *thread = &lfsck->li_thread;
571         struct l_wait_info       lwi;
572
573         if (lfsck->li_sleep_jif > 0 &&
574             com->lc_new_scanned >= lfsck->li_sleep_rate) {
575                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
576                                        LWI_ON_SIGNAL_NOOP, NULL);
577
578                 l_wait_event(thread->t_ctl_waitq,
579                              !thread_is_running(thread),
580                              &lwi);
581                 com->lc_new_scanned = 0;
582         }
583 }
584
585 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
586                             struct lu_fid *fid)
587 {
588         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
589                      !dt_try_as_dir(env, obj)))
590                 return -ENOTDIR;
591
592         return dt_lookup(env, obj, (struct dt_rec *)fid,
593                          (const struct dt_key *)"..", BYPASS_CAPA);
594 }
595
596 static int lfsck_needs_scan_dir(const struct lu_env *env,
597                                 struct lfsck_instance *lfsck,
598                                 struct dt_object *obj)
599 {
600         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
601         int            depth = 0;
602         int            rc;
603
604         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
605             cfs_list_empty(&lfsck->li_list_dir))
606                RETURN(0);
607
608         while (1) {
609                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
610                  *      which is the agent directory to manage the objects
611                  *      which name entries reside on remote MDTs. Related
612                  *      consistency verification will be processed in LFSCK
613                  *      phase III. */
614                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
615                         if (depth > 0)
616                                 lfsck_object_put(env, obj);
617                         return 1;
618                 }
619
620                 /* .lustre doesn't contain "real" user objects, no need lfsck */
621                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
622                         if (depth > 0)
623                                 lfsck_object_put(env, obj);
624                         return 0;
625                 }
626
627                 dt_read_lock(env, obj, MOR_TGT_CHILD);
628                 if (unlikely(lfsck_is_dead_obj(obj))) {
629                         dt_read_unlock(env, obj);
630                         if (depth > 0)
631                                 lfsck_object_put(env, obj);
632                         return 0;
633                 }
634
635                 rc = dt_xattr_get(env, obj,
636                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
637                                   BYPASS_CAPA);
638                 dt_read_unlock(env, obj);
639                 if (rc >= 0) {
640                         if (depth > 0)
641                                 lfsck_object_put(env, obj);
642                         return 1;
643                 }
644
645                 if (rc < 0 && rc != -ENODATA) {
646                         if (depth > 0)
647                                 lfsck_object_put(env, obj);
648                         return rc;
649                 }
650
651                 rc = lfsck_parent_fid(env, obj, fid);
652                 if (depth > 0)
653                         lfsck_object_put(env, obj);
654                 if (rc != 0)
655                         return rc;
656
657                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
658                         return 0;
659
660                 obj = lfsck_object_find(env, lfsck, fid);
661                 if (obj == NULL)
662                         return 0;
663                 else if (IS_ERR(obj))
664                         return PTR_ERR(obj);
665
666                 if (!dt_object_exists(obj)) {
667                         lfsck_object_put(env, obj);
668                         return 0;
669                 }
670
671                 /* Currently, only client visible directory can be remote. */
672                 if (dt_object_remote(obj)) {
673                         lfsck_object_put(env, obj);
674                         return 1;
675                 }
676
677                 depth++;
678         }
679         return 0;
680 }
681
682 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
683                                                  struct lfsck_component *com)
684 {
685         struct lfsck_thread_args *lta;
686         int                       rc;
687
688         OBD_ALLOC_PTR(lta);
689         if (lta == NULL)
690                 return ERR_PTR(-ENOMEM);
691
692         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
693         if (rc != 0) {
694                 OBD_FREE_PTR(lta);
695                 return ERR_PTR(rc);
696         }
697
698         lta->lta_lfsck = lfsck_instance_get(lfsck);
699         if (com != NULL)
700                 lta->lta_com = lfsck_component_get(com);
701
702         return lta;
703 }
704
705 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
706 {
707         if (lta->lta_com != NULL)
708                 lfsck_component_put(&lta->lta_env, lta->lta_com);
709         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
710         lu_env_fini(&lta->lta_env);
711         OBD_FREE_PTR(lta);
712 }
713
714 /* LFSCK wrap functions */
715
716 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
717                 bool new_checked)
718 {
719         struct lfsck_component *com;
720
721         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
722                 com->lc_ops->lfsck_fail(env, com, new_checked);
723         }
724 }
725
726 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
727 {
728         struct lfsck_component *com;
729         int                     rc  = 0;
730         int                     rc1 = 0;
731
732         if (likely(cfs_time_beforeq(cfs_time_current(),
733                                     lfsck->li_time_next_checkpoint)))
734                 return 0;
735
736         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
737         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
738                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
739                 if (rc != 0)
740                         rc1 = rc;
741         }
742
743         lfsck->li_time_last_checkpoint = cfs_time_current();
744         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
745                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
746         return rc1 != 0 ? rc1 : rc;
747 }
748
749 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
750 {
751         struct dt_object       *obj     = NULL;
752         struct lfsck_component *com;
753         struct lfsck_component *next;
754         struct lfsck_position  *pos     = NULL;
755         const struct dt_it_ops *iops    =
756                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
757         struct dt_it           *di;
758         int                     rc;
759         ENTRY;
760
761         LASSERT(lfsck->li_obj_dir == NULL);
762         LASSERT(lfsck->li_di_dir == NULL);
763
764         lfsck->li_current_oit_processed = 0;
765         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
766                 com->lc_new_checked = 0;
767                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
768                         com->lc_journal = 0;
769
770                 rc = com->lc_ops->lfsck_prep(env, com);
771                 if (rc != 0)
772                         GOTO(out, rc);
773
774                 if ((pos == NULL) ||
775                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
776                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
777                         pos = &com->lc_pos_start;
778         }
779
780         /* Init otable-based iterator. */
781         if (pos == NULL) {
782                 rc = iops->load(env, lfsck->li_di_oit, 0);
783                 if (rc > 0) {
784                         lfsck->li_oit_over = 1;
785                         rc = 0;
786                 }
787
788                 GOTO(out, rc);
789         }
790
791         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
792         if (rc < 0)
793                 GOTO(out, rc);
794         else if (rc > 0)
795                 lfsck->li_oit_over = 1;
796
797         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
798                 GOTO(out, rc = 0);
799
800         /* Find the directory for namespace-based traverse. */
801         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
802         if (obj == NULL)
803                 GOTO(out, rc = 0);
804         else if (IS_ERR(obj))
805                 RETURN(PTR_ERR(obj));
806
807         /* XXX: Currently, skip remote object, the consistency for
808          *      remote object will be processed in LFSCK phase III. */
809         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
810             unlikely(!S_ISDIR(lfsck_object_type(obj))))
811                 GOTO(out, rc = 0);
812
813         if (unlikely(!dt_try_as_dir(env, obj)))
814                 GOTO(out, rc = -ENOTDIR);
815
816         /* Init the namespace-based directory traverse. */
817         iops = &obj->do_index_ops->dio_it;
818         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
819         if (IS_ERR(di))
820                 GOTO(out, rc = PTR_ERR(di));
821
822         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
823
824         rc = iops->load(env, di, pos->lp_dir_cookie);
825         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
826                 rc = iops->next(env, di);
827         else if (rc > 0)
828                 rc = 0;
829
830         if (rc != 0) {
831                 iops->put(env, di);
832                 iops->fini(env, di);
833                 GOTO(out, rc);
834         }
835
836         lfsck->li_obj_dir = lfsck_object_get(obj);
837         lfsck->li_cookie_dir = iops->store(env, di);
838         spin_lock(&lfsck->li_lock);
839         lfsck->li_di_dir = di;
840         spin_unlock(&lfsck->li_lock);
841
842         GOTO(out, rc = 0);
843
844 out:
845         if (obj != NULL)
846                 lfsck_object_put(env, obj);
847
848         if (rc < 0) {
849                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
850                                              lc_link)
851                         com->lc_ops->lfsck_post(env, com, rc, true);
852
853                 return rc;
854         }
855
856         rc = 0;
857         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
858         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
859                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
860                 if (rc != 0)
861                         break;
862         }
863
864         lfsck->li_time_last_checkpoint = cfs_time_current();
865         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
866                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
867         return rc;
868 }
869
870 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
871                    struct dt_object *obj)
872 {
873         struct lfsck_component *com;
874         const struct dt_it_ops *iops;
875         struct dt_it           *di;
876         int                     rc;
877         ENTRY;
878
879         LASSERT(lfsck->li_obj_dir == NULL);
880
881         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
882                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
883                 if (rc != 0)
884                         RETURN(rc);
885         }
886
887         rc = lfsck_needs_scan_dir(env, lfsck, obj);
888         if (rc <= 0)
889                 GOTO(out, rc);
890
891         if (unlikely(!dt_try_as_dir(env, obj)))
892                 GOTO(out, rc = -ENOTDIR);
893
894         iops = &obj->do_index_ops->dio_it;
895         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
896         if (IS_ERR(di))
897                 GOTO(out, rc = PTR_ERR(di));
898
899         rc = iops->load(env, di, 0);
900         if (rc == 0)
901                 rc = iops->next(env, di);
902         else if (rc > 0)
903                 rc = 0;
904
905         if (rc != 0) {
906                 iops->put(env, di);
907                 iops->fini(env, di);
908                 GOTO(out, rc);
909         }
910
911         lfsck->li_obj_dir = lfsck_object_get(obj);
912         lfsck->li_cookie_dir = iops->store(env, di);
913         spin_lock(&lfsck->li_lock);
914         lfsck->li_di_dir = di;
915         spin_unlock(&lfsck->li_lock);
916
917         GOTO(out, rc = 0);
918
919 out:
920         if (rc < 0)
921                 lfsck_fail(env, lfsck, false);
922         return (rc > 0 ? 0 : rc);
923 }
924
925 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
926                    struct dt_object *obj, struct lu_dirent *ent)
927 {
928         struct lfsck_component *com;
929         int                     rc;
930
931         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
932                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
933                 if (rc != 0)
934                         return rc;
935         }
936         return 0;
937 }
938
939 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
940                int result)
941 {
942         struct lfsck_component *com;
943         struct lfsck_component *next;
944         int                     rc  = 0;
945         int                     rc1 = 0;
946
947         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
948         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
949                 rc = com->lc_ops->lfsck_post(env, com, result, false);
950                 if (rc != 0)
951                         rc1 = rc;
952         }
953
954         lfsck->li_time_last_checkpoint = cfs_time_current();
955         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
956                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
957
958         /* Ignore some component post failure to make other can go ahead. */
959         return result;
960 }
961
962 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
963 {
964         struct lfsck_component *com;
965         struct lfsck_component *next;
966         struct l_wait_info      lwi = { 0 };
967         int                     rc  = 0;
968         int                     rc1 = 0;
969
970         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
971                                      lc_link) {
972                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
973                         com->lc_journal = 0;
974
975                 rc = com->lc_ops->lfsck_double_scan(env, com);
976                 if (rc != 0)
977                         rc1 = rc;
978         }
979
980         l_wait_event(lfsck->li_thread.t_ctl_waitq,
981                      atomic_read(&lfsck->li_double_scan_count) == 0,
982                      &lwi);
983
984         return (rc1 != 0 ? rc1 : rc);
985 }
986
987 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
988 {
989         struct lfsck_component *com;
990         struct lfsck_component *next;
991
992         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
993                                  lc_link) {
994                 if (com->lc_ops->lfsck_quit != NULL)
995                         com->lc_ops->lfsck_quit(env, com);
996         }
997
998         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
999                                  lc_link) {
1000                 if (com->lc_ops->lfsck_quit != NULL)
1001                         com->lc_ops->lfsck_quit(env, com);
1002         }
1003 }
1004
1005 /* external interfaces */
1006
1007 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
1008 {
1009         struct lu_env           env;
1010         struct lfsck_instance  *lfsck;
1011         int                     rc;
1012         ENTRY;
1013
1014         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1015         if (rc != 0)
1016                 RETURN(rc);
1017
1018         lfsck = lfsck_instance_find(key, true, false);
1019         if (likely(lfsck != NULL)) {
1020                 rc = snprintf(buf, len, "%u\n",
1021                               lfsck->li_bookmark_ram.lb_speed_limit);
1022                 lfsck_instance_put(&env, lfsck);
1023         } else {
1024                 rc = -ENODEV;
1025         }
1026
1027         lu_env_fini(&env);
1028
1029         RETURN(rc);
1030 }
1031 EXPORT_SYMBOL(lfsck_get_speed);
1032
1033 int lfsck_set_speed(struct dt_device *key, int val)
1034 {
1035         struct lu_env           env;
1036         struct lfsck_instance  *lfsck;
1037         int                     rc;
1038         ENTRY;
1039
1040         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1041         if (rc != 0)
1042                 RETURN(rc);
1043
1044         lfsck = lfsck_instance_find(key, true, false);
1045         if (likely(lfsck != NULL)) {
1046                 mutex_lock(&lfsck->li_mutex);
1047                 __lfsck_set_speed(lfsck, val);
1048                 rc = lfsck_bookmark_store(&env, lfsck);
1049                 mutex_unlock(&lfsck->li_mutex);
1050                 lfsck_instance_put(&env, lfsck);
1051         } else {
1052                 rc = -ENODEV;
1053         }
1054
1055         lu_env_fini(&env);
1056
1057         RETURN(rc);
1058 }
1059 EXPORT_SYMBOL(lfsck_set_speed);
1060
1061 int lfsck_get_windows(struct dt_device *key, void *buf, int len)
1062 {
1063         struct lu_env           env;
1064         struct lfsck_instance  *lfsck;
1065         int                     rc;
1066         ENTRY;
1067
1068         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1069         if (rc != 0)
1070                 RETURN(rc);
1071
1072         lfsck = lfsck_instance_find(key, true, false);
1073         if (likely(lfsck != NULL)) {
1074                 rc = snprintf(buf, len, "%u\n",
1075                               lfsck->li_bookmark_ram.lb_async_windows);
1076                 lfsck_instance_put(&env, lfsck);
1077         } else {
1078                 rc = -ENODEV;
1079         }
1080
1081         lu_env_fini(&env);
1082
1083         RETURN(rc);
1084 }
1085 EXPORT_SYMBOL(lfsck_get_windows);
1086
1087 int lfsck_set_windows(struct dt_device *key, int val)
1088 {
1089         struct lu_env           env;
1090         struct lfsck_instance  *lfsck;
1091         int                     rc;
1092         ENTRY;
1093
1094         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1095         if (rc != 0)
1096                 RETURN(rc);
1097
1098         lfsck = lfsck_instance_find(key, true, false);
1099         if (likely(lfsck != NULL)) {
1100                 if (val > LFSCK_ASYNC_WIN_MAX) {
1101                         CERROR("%s: Too large async windows size, which "
1102                                "may cause memory issues. The valid range "
1103                                "is [0 - %u]. If you do not want to restrict "
1104                                "the windows size for async requests pipeline, "
1105                                "just set it as 0.\n",
1106                                lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1107                         rc = -EINVAL;
1108                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1109                         mutex_lock(&lfsck->li_mutex);
1110                         lfsck->li_bookmark_ram.lb_async_windows = val;
1111                         rc = lfsck_bookmark_store(&env, lfsck);
1112                         mutex_unlock(&lfsck->li_mutex);
1113                 }
1114                 lfsck_instance_put(&env, lfsck);
1115         } else {
1116                 rc = -ENODEV;
1117         }
1118
1119         lu_env_fini(&env);
1120
1121         RETURN(rc);
1122 }
1123 EXPORT_SYMBOL(lfsck_set_windows);
1124
1125 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
1126 {
1127         struct lu_env           env;
1128         struct lfsck_instance  *lfsck;
1129         struct lfsck_component *com;
1130         int                     rc;
1131         ENTRY;
1132
1133         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1134         if (rc != 0)
1135                 RETURN(rc);
1136
1137         lfsck = lfsck_instance_find(key, true, false);
1138         if (likely(lfsck != NULL)) {
1139                 com = lfsck_component_find(lfsck, type);
1140                 if (likely(com != NULL)) {
1141                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
1142                         lfsck_component_put(&env, com);
1143                 } else {
1144                         rc = -ENOTSUPP;
1145                 }
1146
1147                 lfsck_instance_put(&env, lfsck);
1148         } else {
1149                 rc = -ENODEV;
1150         }
1151
1152         lu_env_fini(&env);
1153
1154         RETURN(rc);
1155 }
1156 EXPORT_SYMBOL(lfsck_dump);
1157
1158 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1159                 struct lfsck_start_param *lsp)
1160 {
1161         struct lfsck_start              *start  = lsp->lsp_start;
1162         struct lfsck_instance           *lfsck;
1163         struct lfsck_bookmark           *bk;
1164         struct ptlrpc_thread            *thread;
1165         struct lfsck_component          *com;
1166         struct l_wait_info               lwi    = { 0 };
1167         struct lfsck_thread_args        *lta;
1168         bool                             dirty  = false;
1169         long                             rc     = 0;
1170         __u16                            valid  = 0;
1171         __u16                            flags  = 0;
1172         __u16                            type   = 1;
1173         ENTRY;
1174
1175         lfsck = lfsck_instance_find(key, true, false);
1176         if (unlikely(lfsck == NULL))
1177                 RETURN(-ENODEV);
1178
1179         /* start == NULL means auto trigger paused LFSCK. */
1180         if ((start == NULL) &&
1181             (cfs_list_empty(&lfsck->li_list_scan) ||
1182              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
1183                 GOTO(put, rc = 0);
1184
1185         bk = &lfsck->li_bookmark_ram;
1186         thread = &lfsck->li_thread;
1187         mutex_lock(&lfsck->li_mutex);
1188         spin_lock(&lfsck->li_lock);
1189         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1190                 rc = -EALREADY;
1191                 while (start->ls_active != 0) {
1192                         if (type & start->ls_active) {
1193                                 com = __lfsck_component_find(lfsck, type,
1194                                                         &lfsck->li_list_scan);
1195                                 if (com == NULL)
1196                                         com = __lfsck_component_find(lfsck,
1197                                                 type,
1198                                                 &lfsck->li_list_double_scan);
1199                                 if (com == NULL) {
1200                                         rc = -EBUSY;
1201                                         break;
1202                                 } else {
1203                                         start->ls_active &= ~type;
1204                                 }
1205                         }
1206                         type <<= 1;
1207                 }
1208                 spin_unlock(&lfsck->li_lock);
1209                 GOTO(out, rc);
1210         }
1211         spin_unlock(&lfsck->li_lock);
1212
1213         lfsck->li_namespace = lsp->lsp_namespace;
1214         lfsck->li_paused = 0;
1215         lfsck->li_oit_over = 0;
1216         lfsck->li_drop_dryrun = 0;
1217         lfsck->li_new_scanned = 0;
1218
1219         /* For auto trigger. */
1220         if (start == NULL)
1221                 goto trigger;
1222
1223         start->ls_version = bk->lb_version;
1224         if (start->ls_valid & LSV_SPEED_LIMIT) {
1225                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
1226                 dirty = true;
1227         }
1228
1229         if (start->ls_valid & LSV_ASYNC_WINDOWS &&
1230             bk->lb_async_windows != start->ls_async_windows) {
1231                 bk->lb_async_windows = start->ls_async_windows;
1232                 dirty = true;
1233         }
1234
1235         if (start->ls_valid & LSV_ERROR_HANDLE) {
1236                 valid |= DOIV_ERROR_HANDLE;
1237                 if (start->ls_flags & LPF_FAILOUT)
1238                         flags |= DOIF_FAILOUT;
1239
1240                 if ((start->ls_flags & LPF_FAILOUT) &&
1241                     !(bk->lb_param & LPF_FAILOUT)) {
1242                         bk->lb_param |= LPF_FAILOUT;
1243                         dirty = true;
1244                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
1245                            (bk->lb_param & LPF_FAILOUT)) {
1246                         bk->lb_param &= ~LPF_FAILOUT;
1247                         dirty = true;
1248                 }
1249         }
1250
1251         if (start->ls_valid & LSV_DRYRUN) {
1252                 valid |= DOIV_DRYRUN;
1253                 if (start->ls_flags & LPF_DRYRUN)
1254                         flags |= DOIF_DRYRUN;
1255
1256                 if ((start->ls_flags & LPF_DRYRUN) &&
1257                     !(bk->lb_param & LPF_DRYRUN)) {
1258                         bk->lb_param |= LPF_DRYRUN;
1259                         dirty = true;
1260                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
1261                            (bk->lb_param & LPF_DRYRUN)) {
1262                         bk->lb_param &= ~LPF_DRYRUN;
1263                         lfsck->li_drop_dryrun = 1;
1264                         dirty = true;
1265                 }
1266         }
1267
1268         if (dirty) {
1269                 rc = lfsck_bookmark_store(env, lfsck);
1270                 if (rc != 0)
1271                         GOTO(out, rc);
1272         }
1273
1274         if (start->ls_flags & LPF_RESET)
1275                 flags |= DOIF_RESET;
1276
1277         if (start->ls_active != 0) {
1278                 struct lfsck_component *next;
1279
1280                 if (start->ls_active == LFSCK_TYPES_ALL)
1281                         start->ls_active = LFSCK_TYPES_SUPPORTED;
1282
1283                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
1284                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
1285                         GOTO(out, rc = -ENOTSUPP);
1286                 }
1287
1288                 cfs_list_for_each_entry_safe(com, next,
1289                                              &lfsck->li_list_scan, lc_link) {
1290                         if (!(com->lc_type & start->ls_active)) {
1291                                 rc = com->lc_ops->lfsck_post(env, com, 0,
1292                                                              false);
1293                                 if (rc != 0)
1294                                         GOTO(out, rc);
1295                         }
1296                 }
1297
1298                 while (start->ls_active != 0) {
1299                         if (type & start->ls_active) {
1300                                 com = __lfsck_component_find(lfsck, type,
1301                                                         &lfsck->li_list_idle);
1302                                 if (com != NULL) {
1303                                         /* The component status will be updated
1304                                          * when its prep() is called later by
1305                                          * the LFSCK main engine. */
1306                                         cfs_list_del_init(&com->lc_link);
1307                                         cfs_list_add_tail(&com->lc_link,
1308                                                           &lfsck->li_list_scan);
1309                                 }
1310                                 start->ls_active &= ~type;
1311                         }
1312                         type <<= 1;
1313                 }
1314         }
1315
1316         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1317                 start->ls_active |= com->lc_type;
1318                 if (flags & DOIF_RESET) {
1319                         rc = com->lc_ops->lfsck_reset(env, com, false);
1320                         if (rc != 0)
1321                                 GOTO(out, rc);
1322                 }
1323         }
1324
1325 trigger:
1326         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1327         if (bk->lb_param & LPF_DRYRUN) {
1328                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1329                 valid |= DOIV_DRYRUN;
1330                 flags |= DOIF_DRYRUN;
1331         }
1332
1333         if (bk->lb_param & LPF_FAILOUT) {
1334                 valid |= DOIV_ERROR_HANDLE;
1335                 flags |= DOIF_FAILOUT;
1336         }
1337
1338         if (!cfs_list_empty(&lfsck->li_list_scan))
1339                 flags |= DOIF_OUTUSED;
1340
1341         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1342         thread_set_flags(thread, 0);
1343         lta = lfsck_thread_args_init(lfsck, NULL);
1344         if (IS_ERR(lta))
1345                 GOTO(out, rc = PTR_ERR(lta));
1346
1347         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
1348         if (IS_ERR_VALUE(rc)) {
1349                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
1350                        lfsck_lfsck2name(lfsck), rc);
1351                 lfsck_thread_args_fini(lta);
1352         } else {
1353                 rc = 0;
1354                 l_wait_event(thread->t_ctl_waitq,
1355                              thread_is_running(thread) ||
1356                              thread_is_stopped(thread),
1357                              &lwi);
1358         }
1359
1360         GOTO(out, rc);
1361
1362 out:
1363         mutex_unlock(&lfsck->li_mutex);
1364 put:
1365         lfsck_instance_put(env, lfsck);
1366         return (rc < 0 ? rc : 0);
1367 }
1368 EXPORT_SYMBOL(lfsck_start);
1369
1370 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1371 {
1372         struct lfsck_instance   *lfsck;
1373         struct ptlrpc_thread    *thread;
1374         struct l_wait_info       lwi    = { 0 };
1375         ENTRY;
1376
1377         lfsck = lfsck_instance_find(key, true, false);
1378         if (unlikely(lfsck == NULL))
1379                 RETURN(-ENODEV);
1380
1381         thread = &lfsck->li_thread;
1382         mutex_lock(&lfsck->li_mutex);
1383         spin_lock(&lfsck->li_lock);
1384         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1385                 spin_unlock(&lfsck->li_lock);
1386                 mutex_unlock(&lfsck->li_mutex);
1387                 lfsck_instance_put(env, lfsck);
1388                 RETURN(-EALREADY);
1389         }
1390
1391         if (pause)
1392                 lfsck->li_paused = 1;
1393         thread_set_flags(thread, SVC_STOPPING);
1394         spin_unlock(&lfsck->li_lock);
1395
1396         wake_up_all(&thread->t_ctl_waitq);
1397         l_wait_event(thread->t_ctl_waitq,
1398                      thread_is_stopped(thread),
1399                      &lwi);
1400         mutex_unlock(&lfsck->li_mutex);
1401         lfsck_instance_put(env, lfsck);
1402
1403         RETURN(0);
1404 }
1405 EXPORT_SYMBOL(lfsck_stop);
1406
1407 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1408                    struct dt_device *next, lfsck_out_notify notify,
1409                    void *notify_data, bool master)
1410 {
1411         struct lfsck_instance   *lfsck;
1412         struct dt_object        *root  = NULL;
1413         struct dt_object        *obj;
1414         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1415         int                      rc;
1416         ENTRY;
1417
1418         lfsck = lfsck_instance_find(key, false, false);
1419         if (unlikely(lfsck != NULL))
1420                 RETURN(-EEXIST);
1421
1422         OBD_ALLOC_PTR(lfsck);
1423         if (lfsck == NULL)
1424                 RETURN(-ENOMEM);
1425
1426         mutex_init(&lfsck->li_mutex);
1427         spin_lock_init(&lfsck->li_lock);
1428         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1429         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1430         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1431         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1432         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1433         atomic_set(&lfsck->li_ref, 1);
1434         atomic_set(&lfsck->li_double_scan_count, 0);
1435         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
1436         lfsck->li_out_notify = notify;
1437         lfsck->li_out_notify_data = notify_data;
1438         lfsck->li_next = next;
1439         lfsck->li_bottom = key;
1440
1441         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
1442         if (rc != 0)
1443                 GOTO(out, rc);
1444
1445         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
1446         if (rc != 0)
1447                 GOTO(out, rc);
1448
1449         fid->f_seq = FID_SEQ_LOCAL_NAME;
1450         fid->f_oid = 1;
1451         fid->f_ver = 0;
1452         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1453         if (rc != 0)
1454                 GOTO(out, rc);
1455
1456         rc = dt_root_get(env, key, fid);
1457         if (rc != 0)
1458                 GOTO(out, rc);
1459
1460         root = dt_locate(env, lfsck->li_bottom, fid);
1461         if (IS_ERR(root))
1462                 GOTO(out, rc = PTR_ERR(root));
1463
1464         if (unlikely(!dt_try_as_dir(env, root)))
1465                 GOTO(out, rc = -ENOTDIR);
1466
1467         lfsck->li_local_root_fid = *fid;
1468         if (master) {
1469                 lfsck->li_master = 1;
1470                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1471                         rc = dt_lookup(env, root,
1472                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1473                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1474                         if (rc != 0)
1475                                 GOTO(out, rc);
1476                 }
1477         }
1478
1479         fid->f_seq = FID_SEQ_LOCAL_FILE;
1480         fid->f_oid = OTABLE_IT_OID;
1481         fid->f_ver = 0;
1482         obj = dt_locate(env, lfsck->li_bottom, fid);
1483         if (IS_ERR(obj))
1484                 GOTO(out, rc = PTR_ERR(obj));
1485
1486         lfsck->li_obj_oit = obj;
1487         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1488         if (rc != 0) {
1489                 if (rc == -ENOTSUPP)
1490                         GOTO(add, rc = 0);
1491
1492                 GOTO(out, rc);
1493         }
1494
1495         rc = lfsck_bookmark_setup(env, lfsck);
1496         if (rc != 0)
1497                 GOTO(out, rc);
1498
1499         if (master) {
1500                 rc = lfsck_namespace_setup(env, lfsck);
1501                 if (rc < 0)
1502                         GOTO(out, rc);
1503         }
1504
1505         rc = lfsck_layout_setup(env, lfsck);
1506         if (rc < 0)
1507                 GOTO(out, rc);
1508
1509         /* XXX: more LFSCK components initialization to be added here. */
1510
1511 add:
1512         rc = lfsck_instance_add(lfsck);
1513         if (rc == 0)
1514                 rc = lfsck_add_target_from_orphan(env, lfsck);
1515 out:
1516         if (root != NULL && !IS_ERR(root))
1517                 lu_object_put(env, &root->do_lu);
1518         if (rc != 0)
1519                 lfsck_instance_cleanup(env, lfsck);
1520         return rc;
1521 }
1522 EXPORT_SYMBOL(lfsck_register);
1523
1524 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1525 {
1526         struct lfsck_instance *lfsck;
1527
1528         lfsck = lfsck_instance_find(key, false, true);
1529         if (lfsck != NULL)
1530                 lfsck_instance_put(env, lfsck);
1531 }
1532 EXPORT_SYMBOL(lfsck_degister);
1533
1534 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
1535                      struct dt_device *tgt, struct obd_export *exp,
1536                      __u32 index, bool for_ost)
1537 {
1538         struct lfsck_instance   *lfsck;
1539         struct lfsck_tgt_desc   *ltd;
1540         int                      rc;
1541         ENTRY;
1542
1543         OBD_ALLOC_PTR(ltd);
1544         if (ltd == NULL)
1545                 RETURN(-ENOMEM);
1546
1547         ltd->ltd_tgt = tgt;
1548         ltd->ltd_key = key;
1549         ltd->ltd_exp = exp;
1550         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
1551         INIT_LIST_HEAD(&ltd->ltd_layout_list);
1552         atomic_set(&ltd->ltd_ref, 1);
1553         ltd->ltd_index = index;
1554
1555         spin_lock(&lfsck_instance_lock);
1556         lfsck = __lfsck_instance_find(key, true, false);
1557         if (lfsck == NULL) {
1558                 if (for_ost)
1559                         list_add_tail(&ltd->ltd_orphan_list,
1560                                       &lfsck_ost_orphan_list);
1561                 else
1562                         list_add_tail(&ltd->ltd_orphan_list,
1563                                       &lfsck_mdt_orphan_list);
1564                 spin_unlock(&lfsck_instance_lock);
1565
1566                 RETURN(0);
1567         }
1568         spin_unlock(&lfsck_instance_lock);
1569
1570         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
1571         if (rc != 0)
1572                 lfsck_tgt_put(ltd);
1573
1574         lfsck_instance_put(env, lfsck);
1575
1576         RETURN(rc);
1577 }
1578 EXPORT_SYMBOL(lfsck_add_target);
1579
1580 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
1581                       struct dt_device *tgt, __u32 index, bool for_ost)
1582 {
1583         struct lfsck_instance   *lfsck;
1584         struct lfsck_tgt_descs  *ltds;
1585         struct lfsck_tgt_desc   *ltd;
1586         struct list_head        *head;
1587         bool                     found = false;
1588
1589         if (for_ost)
1590                 head = &lfsck_ost_orphan_list;
1591         else
1592                 head = &lfsck_mdt_orphan_list;
1593
1594         spin_lock(&lfsck_instance_lock);
1595         list_for_each_entry(ltd, head, ltd_orphan_list) {
1596                 if (ltd->ltd_tgt == tgt) {
1597                         list_del_init(&ltd->ltd_orphan_list);
1598                         spin_unlock(&lfsck_instance_lock);
1599                         lfsck_tgt_put(ltd);
1600
1601                         return;
1602                 }
1603         }
1604
1605         lfsck = __lfsck_instance_find(key, true, false);
1606         spin_unlock(&lfsck_instance_lock);
1607         if (unlikely(lfsck == NULL))
1608                 return;
1609
1610         if (for_ost)
1611                 ltds = &lfsck->li_ost_descs;
1612         else
1613                 ltds = &lfsck->li_mdt_descs;
1614
1615         down_write(&ltds->ltd_rw_sem);
1616
1617         LASSERT(ltds->ltd_tgts_bitmap != NULL);
1618
1619         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
1620                 goto unlock;
1621
1622         ltd = LTD_TGT(ltds, index);
1623         if (unlikely(ltd == NULL))
1624                 goto unlock;
1625
1626         found = true;
1627         if (!list_empty(&ltd->ltd_layout_list)) {
1628                 spin_lock(&ltds->ltd_lock);
1629                 list_del_init(&ltd->ltd_layout_list);
1630                 spin_unlock(&ltds->ltd_lock);
1631         }
1632
1633         LASSERT(ltds->ltd_tgtnr > 0);
1634
1635         ltds->ltd_tgtnr--;
1636         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
1637         LTD_TGT(ltds, index) = NULL;
1638         lfsck_tgt_put(ltd);
1639
1640 unlock:
1641         if (!found) {
1642                 if (for_ost)
1643                         head = &lfsck->li_ost_descs.ltd_orphan;
1644                 else
1645                         head = &lfsck->li_ost_descs.ltd_orphan;
1646
1647                 list_for_each_entry(ltd, head, ltd_orphan_list) {
1648                         if (ltd->ltd_tgt == tgt) {
1649                                 list_del_init(&ltd->ltd_orphan_list);
1650                                 lfsck_tgt_put(ltd);
1651                                 break;
1652                         }
1653                 }
1654         }
1655
1656         up_write(&ltds->ltd_rw_sem);
1657         lfsck_instance_put(env, lfsck);
1658 }
1659 EXPORT_SYMBOL(lfsck_del_target);
1660
1661 static int __init lfsck_init(void)
1662 {
1663         int rc;
1664
1665         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
1666         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
1667         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1668         rc = lu_context_key_register(&lfsck_thread_key);
1669
1670         return rc;
1671 }
1672
1673 static void __exit lfsck_exit(void)
1674 {
1675         struct lfsck_tgt_desc *ltd;
1676         struct lfsck_tgt_desc *next;
1677
1678         LASSERT(cfs_list_empty(&lfsck_instance_list));
1679
1680         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
1681                                  ltd_orphan_list) {
1682                 list_del_init(&ltd->ltd_orphan_list);
1683                 lfsck_tgt_put(ltd);
1684         }
1685
1686         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
1687                                  ltd_orphan_list) {
1688                 list_del_init(&ltd->ltd_orphan_list);
1689                 lfsck_tgt_put(ltd);
1690         }
1691
1692         lu_context_key_degister(&lfsck_thread_key);
1693 }
1694
1695 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1696 MODULE_DESCRIPTION("LFSCK");
1697 MODULE_LICENSE("GPL");
1698
1699 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);