Whamcloud - gitweb
LU-3950 lfsck: control all LFSCK nodes via single command (2)
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static CFS_LIST_HEAD(lfsck_instance_list);
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         NULL
96 };
97
98 const char *lfsck_status2names(enum lfsck_status status)
99 {
100         if (unlikely(status < 0 || status >= LS_MAX))
101                 return "unknown";
102
103         return lfsck_status_names[status];
104 }
105
106 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
107 {
108         spin_lock_init(&ltds->ltd_lock);
109         init_rwsem(&ltds->ltd_rw_sem);
110         INIT_LIST_HEAD(&ltds->ltd_orphan);
111         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
112         if (ltds->ltd_tgts_bitmap == NULL)
113                 return -ENOMEM;
114
115         return 0;
116 }
117
118 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
119 {
120         struct lfsck_tgt_desc   *ltd;
121         struct lfsck_tgt_desc   *next;
122         int                      idx;
123
124         down_write(&ltds->ltd_rw_sem);
125
126         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
127                                  ltd_orphan_list) {
128                 list_del_init(&ltd->ltd_orphan_list);
129                 lfsck_tgt_put(ltd);
130         }
131
132         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
133                 up_write(&ltds->ltd_rw_sem);
134
135                 return;
136         }
137
138         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
139                 ltd = LTD_TGT(ltds, idx);
140                 if (likely(ltd != NULL)) {
141                         LASSERT(list_empty(&ltd->ltd_layout_list));
142                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
143
144                         ltds->ltd_tgtnr--;
145                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
146                         LTD_TGT(ltds, idx) = NULL;
147                         lfsck_tgt_put(ltd);
148                 }
149         }
150
151         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
152                  ltds->ltd_tgtnr);
153
154         for (idx = 0; idx < TGT_PTRS; idx++) {
155                 if (ltds->ltd_tgts_idx[idx] != NULL) {
156                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
157                         ltds->ltd_tgts_idx[idx] = NULL;
158                 }
159         }
160
161         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
162         ltds->ltd_tgts_bitmap = NULL;
163         up_write(&ltds->ltd_rw_sem);
164 }
165
166 static int __lfsck_add_target(const struct lu_env *env,
167                               struct lfsck_instance *lfsck,
168                               struct lfsck_tgt_desc *ltd,
169                               bool for_ost, bool locked)
170 {
171         struct lfsck_tgt_descs *ltds;
172         __u32                   index = ltd->ltd_index;
173         int                     rc    = 0;
174         ENTRY;
175
176         if (for_ost)
177                 ltds = &lfsck->li_ost_descs;
178         else
179                 ltds = &lfsck->li_mdt_descs;
180
181         if (!locked)
182                 down_write(&ltds->ltd_rw_sem);
183
184         LASSERT(ltds->ltd_tgts_bitmap != NULL);
185
186         if (index >= ltds->ltd_tgts_bitmap->size) {
187                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
188                                     (__u32)BITS_PER_LONG);
189                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
190                 cfs_bitmap_t *new_bitmap;
191
192                 while (newsize < index + 1)
193                         newsize <<= 1;
194
195                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
196                 if (new_bitmap == NULL)
197                         GOTO(unlock, rc = -ENOMEM);
198
199                 if (ltds->ltd_tgtnr > 0)
200                         cfs_bitmap_copy(new_bitmap, old_bitmap);
201                 ltds->ltd_tgts_bitmap = new_bitmap;
202                 CFS_FREE_BITMAP(old_bitmap);
203         }
204
205         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
206                 CERROR("%s: the device %s (%u) is registered already\n",
207                        lfsck_lfsck2name(lfsck),
208                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
209                 GOTO(unlock, rc = -EEXIST);
210         }
211
212         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
213                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
214                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
215                         GOTO(unlock, rc = -ENOMEM);
216         }
217
218         LTD_TGT(ltds, index) = ltd;
219         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
220         ltds->ltd_tgtnr++;
221
222         GOTO(unlock, rc = 0);
223
224 unlock:
225         if (!locked)
226                 up_write(&ltds->ltd_rw_sem);
227
228         return rc;
229 }
230
231 static int lfsck_add_target_from_orphan(const struct lu_env *env,
232                                         struct lfsck_instance *lfsck)
233 {
234         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
235         struct lfsck_tgt_desc   *ltd;
236         struct lfsck_tgt_desc   *next;
237         struct list_head        *head    = &lfsck_ost_orphan_list;
238         int                      rc;
239         bool                     for_ost = true;
240
241 again:
242         spin_lock(&lfsck_instance_lock);
243         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
244                 if (ltd->ltd_key == lfsck->li_bottom) {
245                         list_del_init(&ltd->ltd_orphan_list);
246                         list_add_tail(&ltd->ltd_orphan_list,
247                                       &ltds->ltd_orphan);
248                 }
249         }
250         spin_unlock(&lfsck_instance_lock);
251
252         down_write(&ltds->ltd_rw_sem);
253         while (!list_empty(&ltds->ltd_orphan)) {
254                 ltd = list_entry(ltds->ltd_orphan.next,
255                                  struct lfsck_tgt_desc,
256                                  ltd_orphan_list);
257                 list_del_init(&ltd->ltd_orphan_list);
258                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
259                 /* Do not hold the semaphore for too long time. */
260                 up_write(&ltds->ltd_rw_sem);
261                 if (rc != 0)
262                         return rc;
263
264                 down_write(&ltds->ltd_rw_sem);
265         }
266         up_write(&ltds->ltd_rw_sem);
267
268         if (for_ost) {
269                 ltds = &lfsck->li_mdt_descs;
270                 head = &lfsck_mdt_orphan_list;
271                 for_ost = false;
272                 goto again;
273         }
274
275         return 0;
276 }
277
278 static inline struct lfsck_component *
279 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
280 {
281         struct lfsck_component *com;
282
283         cfs_list_for_each_entry(com, list, lc_link) {
284                 if (com->lc_type == type)
285                         return com;
286         }
287         return NULL;
288 }
289
290 static struct lfsck_component *
291 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
292 {
293         struct lfsck_component *com;
294
295         spin_lock(&lfsck->li_lock);
296         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
297         if (com != NULL)
298                 goto unlock;
299
300         com = __lfsck_component_find(lfsck, type,
301                                      &lfsck->li_list_double_scan);
302         if (com != NULL)
303                 goto unlock;
304
305         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
306
307 unlock:
308         if (com != NULL)
309                 lfsck_component_get(com);
310         spin_unlock(&lfsck->li_lock);
311         return com;
312 }
313
314 void lfsck_component_cleanup(const struct lu_env *env,
315                              struct lfsck_component *com)
316 {
317         if (!cfs_list_empty(&com->lc_link))
318                 cfs_list_del_init(&com->lc_link);
319         if (!cfs_list_empty(&com->lc_link_dir))
320                 cfs_list_del_init(&com->lc_link_dir);
321
322         lfsck_component_put(env, com);
323 }
324
325 void lfsck_instance_cleanup(const struct lu_env *env,
326                             struct lfsck_instance *lfsck)
327 {
328         struct ptlrpc_thread    *thread = &lfsck->li_thread;
329         struct lfsck_component  *com;
330         ENTRY;
331
332         LASSERT(list_empty(&lfsck->li_link));
333         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
334
335         if (lfsck->li_obj_oit != NULL) {
336                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
337                 lfsck->li_obj_oit = NULL;
338         }
339
340         LASSERT(lfsck->li_obj_dir == NULL);
341
342         while (!cfs_list_empty(&lfsck->li_list_scan)) {
343                 com = cfs_list_entry(lfsck->li_list_scan.next,
344                                      struct lfsck_component,
345                                      lc_link);
346                 lfsck_component_cleanup(env, com);
347         }
348
349         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
350
351         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
352                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
353                                      struct lfsck_component,
354                                      lc_link);
355                 lfsck_component_cleanup(env, com);
356         }
357
358         while (!cfs_list_empty(&lfsck->li_list_idle)) {
359                 com = cfs_list_entry(lfsck->li_list_idle.next,
360                                      struct lfsck_component,
361                                      lc_link);
362                 lfsck_component_cleanup(env, com);
363         }
364
365         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
366         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
367
368         if (lfsck->li_bookmark_obj != NULL) {
369                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
370                 lfsck->li_bookmark_obj = NULL;
371         }
372
373         if (lfsck->li_los != NULL) {
374                 local_oid_storage_fini(env, lfsck->li_los);
375                 lfsck->li_los = NULL;
376         }
377
378         OBD_FREE_PTR(lfsck);
379 }
380
381 static inline struct lfsck_instance *
382 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
383 {
384         struct lfsck_instance *lfsck;
385
386         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
387                 if (lfsck->li_bottom == key) {
388                         if (ref)
389                                 lfsck_instance_get(lfsck);
390                         if (unlink)
391                                 list_del_init(&lfsck->li_link);
392
393                         return lfsck;
394                 }
395         }
396
397         return NULL;
398 }
399
400 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
401                                                          bool ref, bool unlink)
402 {
403         struct lfsck_instance *lfsck;
404
405         spin_lock(&lfsck_instance_lock);
406         lfsck = __lfsck_instance_find(key, ref, unlink);
407         spin_unlock(&lfsck_instance_lock);
408
409         return lfsck;
410 }
411
412 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
413 {
414         struct lfsck_instance *tmp;
415
416         spin_lock(&lfsck_instance_lock);
417         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
418                 if (lfsck->li_bottom == tmp->li_bottom) {
419                         spin_unlock(&lfsck_instance_lock);
420                         return -EEXIST;
421                 }
422         }
423
424         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
425         spin_unlock(&lfsck_instance_lock);
426         return 0;
427 }
428
429 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
430                     const char *prefix)
431 {
432         int save = *len;
433         int flag;
434         int rc;
435         int i;
436
437         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
438         if (rc <= 0)
439                 return -ENOSPC;
440
441         *buf += rc;
442         *len -= rc;
443         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
444                 if (flag & bits) {
445                         bits &= ~flag;
446                         if (names[i] != NULL) {
447                                 rc = snprintf(*buf, *len, "%s%c", names[i],
448                                               bits != 0 ? ',' : '\n');
449                                 if (rc <= 0)
450                                         return -ENOSPC;
451
452                                 *buf += rc;
453                                 *len -= rc;
454                         }
455                 }
456         }
457         return save - *len;
458 }
459
460 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
461 {
462         int rc;
463
464         if (time != 0)
465                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
466                               cfs_time_current_sec() - time);
467         else
468                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
469         if (rc <= 0)
470                 return -ENOSPC;
471
472         *buf += rc;
473         *len -= rc;
474         return rc;
475 }
476
477 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
478                    const char *prefix)
479 {
480         int rc;
481
482         if (fid_is_zero(&pos->lp_dir_parent)) {
483                 if (pos->lp_oit_cookie == 0)
484                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
485                                       prefix);
486                 else
487                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
488                                       prefix, pos->lp_oit_cookie);
489         } else {
490                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
491                               prefix, pos->lp_oit_cookie,
492                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
493         }
494         if (rc <= 0)
495                 return -ENOSPC;
496
497         *buf += rc;
498         *len -= rc;
499         return rc;
500 }
501
502 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
503                     struct lfsck_position *pos, bool init)
504 {
505         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
506
507         if (unlikely(lfsck->li_di_oit == NULL)) {
508                 memset(pos, 0, sizeof(*pos));
509                 return;
510         }
511
512         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
513         if (!lfsck->li_current_oit_processed && !init)
514                 pos->lp_oit_cookie--;
515
516         LASSERT(pos->lp_oit_cookie > 0);
517
518         if (lfsck->li_di_dir != NULL) {
519                 struct dt_object *dto = lfsck->li_obj_dir;
520
521                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
522                                                         lfsck->li_di_dir);
523
524                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
525                         fid_zero(&pos->lp_dir_parent);
526                         pos->lp_dir_cookie = 0;
527                 } else {
528                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
529                 }
530         } else {
531                 fid_zero(&pos->lp_dir_parent);
532                 pos->lp_dir_cookie = 0;
533         }
534 }
535
536 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
537 {
538         lfsck->li_bookmark_ram.lb_speed_limit = limit;
539         if (limit != LFSCK_SPEED_NO_LIMIT) {
540                 if (limit > HZ) {
541                         lfsck->li_sleep_rate = limit / HZ;
542                         lfsck->li_sleep_jif = 1;
543                 } else {
544                         lfsck->li_sleep_rate = 1;
545                         lfsck->li_sleep_jif = HZ / limit;
546                 }
547         } else {
548                 lfsck->li_sleep_jif = 0;
549                 lfsck->li_sleep_rate = 0;
550         }
551 }
552
553 void lfsck_control_speed(struct lfsck_instance *lfsck)
554 {
555         struct ptlrpc_thread *thread = &lfsck->li_thread;
556         struct l_wait_info    lwi;
557
558         if (lfsck->li_sleep_jif > 0 &&
559             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
560                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
561                                        LWI_ON_SIGNAL_NOOP, NULL);
562
563                 l_wait_event(thread->t_ctl_waitq,
564                              !thread_is_running(thread),
565                              &lwi);
566                 lfsck->li_new_scanned = 0;
567         }
568 }
569
570 void lfsck_control_speed_by_self(struct lfsck_component *com)
571 {
572         struct lfsck_instance   *lfsck  = com->lc_lfsck;
573         struct ptlrpc_thread    *thread = &lfsck->li_thread;
574         struct l_wait_info       lwi;
575
576         if (lfsck->li_sleep_jif > 0 &&
577             com->lc_new_scanned >= lfsck->li_sleep_rate) {
578                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
579                                        LWI_ON_SIGNAL_NOOP, NULL);
580
581                 l_wait_event(thread->t_ctl_waitq,
582                              !thread_is_running(thread),
583                              &lwi);
584                 com->lc_new_scanned = 0;
585         }
586 }
587
588 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
589                             struct lu_fid *fid)
590 {
591         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
592                      !dt_try_as_dir(env, obj)))
593                 return -ENOTDIR;
594
595         return dt_lookup(env, obj, (struct dt_rec *)fid,
596                          (const struct dt_key *)"..", BYPASS_CAPA);
597 }
598
599 static int lfsck_needs_scan_dir(const struct lu_env *env,
600                                 struct lfsck_instance *lfsck,
601                                 struct dt_object *obj)
602 {
603         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
604         int            depth = 0;
605         int            rc;
606
607         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
608             cfs_list_empty(&lfsck->li_list_dir))
609                RETURN(0);
610
611         while (1) {
612                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
613                  *      which is the agent directory to manage the objects
614                  *      which name entries reside on remote MDTs. Related
615                  *      consistency verification will be processed in LFSCK
616                  *      phase III. */
617                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
618                         if (depth > 0)
619                                 lfsck_object_put(env, obj);
620                         return 1;
621                 }
622
623                 /* .lustre doesn't contain "real" user objects, no need lfsck */
624                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
625                         if (depth > 0)
626                                 lfsck_object_put(env, obj);
627                         return 0;
628                 }
629
630                 dt_read_lock(env, obj, MOR_TGT_CHILD);
631                 if (unlikely(lfsck_is_dead_obj(obj))) {
632                         dt_read_unlock(env, obj);
633                         if (depth > 0)
634                                 lfsck_object_put(env, obj);
635                         return 0;
636                 }
637
638                 rc = dt_xattr_get(env, obj,
639                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
640                                   BYPASS_CAPA);
641                 dt_read_unlock(env, obj);
642                 if (rc >= 0) {
643                         if (depth > 0)
644                                 lfsck_object_put(env, obj);
645                         return 1;
646                 }
647
648                 if (rc < 0 && rc != -ENODATA) {
649                         if (depth > 0)
650                                 lfsck_object_put(env, obj);
651                         return rc;
652                 }
653
654                 rc = lfsck_parent_fid(env, obj, fid);
655                 if (depth > 0)
656                         lfsck_object_put(env, obj);
657                 if (rc != 0)
658                         return rc;
659
660                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
661                         return 0;
662
663                 obj = lfsck_object_find(env, lfsck, fid);
664                 if (obj == NULL)
665                         return 0;
666                 else if (IS_ERR(obj))
667                         return PTR_ERR(obj);
668
669                 if (!dt_object_exists(obj)) {
670                         lfsck_object_put(env, obj);
671                         return 0;
672                 }
673
674                 /* Currently, only client visible directory can be remote. */
675                 if (dt_object_remote(obj)) {
676                         lfsck_object_put(env, obj);
677                         return 1;
678                 }
679
680                 depth++;
681         }
682         return 0;
683 }
684
685 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
686                                                  struct lfsck_component *com,
687                                                  struct lfsck_start_param *lsp)
688 {
689         struct lfsck_thread_args *lta;
690         int                       rc;
691
692         OBD_ALLOC_PTR(lta);
693         if (lta == NULL)
694                 return ERR_PTR(-ENOMEM);
695
696         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
697         if (rc != 0) {
698                 OBD_FREE_PTR(lta);
699                 return ERR_PTR(rc);
700         }
701
702         lta->lta_lfsck = lfsck_instance_get(lfsck);
703         if (com != NULL)
704                 lta->lta_com = lfsck_component_get(com);
705
706         lta->lta_lsp = lsp;
707
708         return lta;
709 }
710
711 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
712 {
713         if (lta->lta_com != NULL)
714                 lfsck_component_put(&lta->lta_env, lta->lta_com);
715         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
716         lu_env_fini(&lta->lta_env);
717         OBD_FREE_PTR(lta);
718 }
719
720 /* LFSCK wrap functions */
721
722 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
723                 bool new_checked)
724 {
725         struct lfsck_component *com;
726
727         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
728                 com->lc_ops->lfsck_fail(env, com, new_checked);
729         }
730 }
731
732 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
733 {
734         struct lfsck_component *com;
735         int                     rc  = 0;
736         int                     rc1 = 0;
737
738         if (likely(cfs_time_beforeq(cfs_time_current(),
739                                     lfsck->li_time_next_checkpoint)))
740                 return 0;
741
742         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
743         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
744                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
745                 if (rc != 0)
746                         rc1 = rc;
747         }
748
749         lfsck->li_time_last_checkpoint = cfs_time_current();
750         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
751                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
752         return rc1 != 0 ? rc1 : rc;
753 }
754
755 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
756                struct lfsck_start_param *lsp)
757 {
758         struct dt_object       *obj     = NULL;
759         struct lfsck_component *com;
760         struct lfsck_component *next;
761         struct lfsck_position  *pos     = NULL;
762         const struct dt_it_ops *iops    =
763                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
764         struct dt_it           *di;
765         int                     rc;
766         ENTRY;
767
768         LASSERT(lfsck->li_obj_dir == NULL);
769         LASSERT(lfsck->li_di_dir == NULL);
770
771         lfsck->li_current_oit_processed = 0;
772         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
773                 com->lc_new_checked = 0;
774                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
775                         com->lc_journal = 0;
776
777                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
778                 if (rc != 0)
779                         GOTO(out, rc);
780
781                 if ((pos == NULL) ||
782                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
783                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
784                         pos = &com->lc_pos_start;
785         }
786
787         /* Init otable-based iterator. */
788         if (pos == NULL) {
789                 rc = iops->load(env, lfsck->li_di_oit, 0);
790                 if (rc > 0) {
791                         lfsck->li_oit_over = 1;
792                         rc = 0;
793                 }
794
795                 GOTO(out, rc);
796         }
797
798         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
799         if (rc < 0)
800                 GOTO(out, rc);
801         else if (rc > 0)
802                 lfsck->li_oit_over = 1;
803
804         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
805                 GOTO(out, rc = 0);
806
807         /* Find the directory for namespace-based traverse. */
808         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
809         if (obj == NULL)
810                 GOTO(out, rc = 0);
811         else if (IS_ERR(obj))
812                 RETURN(PTR_ERR(obj));
813
814         /* XXX: Currently, skip remote object, the consistency for
815          *      remote object will be processed in LFSCK phase III. */
816         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
817             unlikely(!S_ISDIR(lfsck_object_type(obj))))
818                 GOTO(out, rc = 0);
819
820         if (unlikely(!dt_try_as_dir(env, obj)))
821                 GOTO(out, rc = -ENOTDIR);
822
823         /* Init the namespace-based directory traverse. */
824         iops = &obj->do_index_ops->dio_it;
825         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
826         if (IS_ERR(di))
827                 GOTO(out, rc = PTR_ERR(di));
828
829         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
830
831         rc = iops->load(env, di, pos->lp_dir_cookie);
832         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
833                 rc = iops->next(env, di);
834         else if (rc > 0)
835                 rc = 0;
836
837         if (rc != 0) {
838                 iops->put(env, di);
839                 iops->fini(env, di);
840                 GOTO(out, rc);
841         }
842
843         lfsck->li_obj_dir = lfsck_object_get(obj);
844         lfsck->li_cookie_dir = iops->store(env, di);
845         spin_lock(&lfsck->li_lock);
846         lfsck->li_di_dir = di;
847         spin_unlock(&lfsck->li_lock);
848
849         GOTO(out, rc = 0);
850
851 out:
852         if (obj != NULL)
853                 lfsck_object_put(env, obj);
854
855         if (rc < 0) {
856                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
857                                              lc_link)
858                         com->lc_ops->lfsck_post(env, com, rc, true);
859
860                 return rc;
861         }
862
863         rc = 0;
864         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
865         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
866                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
867                 if (rc != 0)
868                         break;
869         }
870
871         lfsck->li_time_last_checkpoint = cfs_time_current();
872         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
873                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
874         return rc;
875 }
876
877 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
878                    struct dt_object *obj)
879 {
880         struct lfsck_component *com;
881         const struct dt_it_ops *iops;
882         struct dt_it           *di;
883         int                     rc;
884         ENTRY;
885
886         LASSERT(lfsck->li_obj_dir == NULL);
887
888         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
889                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
890                 if (rc != 0)
891                         RETURN(rc);
892         }
893
894         rc = lfsck_needs_scan_dir(env, lfsck, obj);
895         if (rc <= 0)
896                 GOTO(out, rc);
897
898         if (unlikely(!dt_try_as_dir(env, obj)))
899                 GOTO(out, rc = -ENOTDIR);
900
901         iops = &obj->do_index_ops->dio_it;
902         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
903         if (IS_ERR(di))
904                 GOTO(out, rc = PTR_ERR(di));
905
906         rc = iops->load(env, di, 0);
907         if (rc == 0)
908                 rc = iops->next(env, di);
909         else if (rc > 0)
910                 rc = 0;
911
912         if (rc != 0) {
913                 iops->put(env, di);
914                 iops->fini(env, di);
915                 GOTO(out, rc);
916         }
917
918         lfsck->li_obj_dir = lfsck_object_get(obj);
919         lfsck->li_cookie_dir = iops->store(env, di);
920         spin_lock(&lfsck->li_lock);
921         lfsck->li_di_dir = di;
922         spin_unlock(&lfsck->li_lock);
923
924         GOTO(out, rc = 0);
925
926 out:
927         if (rc < 0)
928                 lfsck_fail(env, lfsck, false);
929         return (rc > 0 ? 0 : rc);
930 }
931
932 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
933                    struct dt_object *obj, struct lu_dirent *ent)
934 {
935         struct lfsck_component *com;
936         int                     rc;
937
938         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
939                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
940                 if (rc != 0)
941                         return rc;
942         }
943         return 0;
944 }
945
946 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
947                int result)
948 {
949         struct lfsck_component *com;
950         struct lfsck_component *next;
951         int                     rc  = 0;
952         int                     rc1 = 0;
953
954         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
955         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
956                 rc = com->lc_ops->lfsck_post(env, com, result, false);
957                 if (rc != 0)
958                         rc1 = rc;
959         }
960
961         lfsck->li_time_last_checkpoint = cfs_time_current();
962         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
963                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
964
965         /* Ignore some component post failure to make other can go ahead. */
966         return result;
967 }
968
969 static void lfsck_interpret(const struct lu_env *env,
970                             struct lfsck_instance *lfsck,
971                             struct ptlrpc_request *req, void *args, int result)
972 {
973         struct lfsck_async_interpret_args *laia = args;
974         struct lfsck_component            *com;
975
976         LASSERT(laia->laia_shared);
977
978         spin_lock(&lfsck->li_lock);
979         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
980                 if (com->lc_ops->lfsck_interpret != NULL) {
981                         laia->laia_com = com;
982                         com->lc_ops->lfsck_interpret(env, req, laia, result);
983                 }
984         }
985
986         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
987                 if (com->lc_ops->lfsck_interpret != NULL) {
988                         laia->laia_com = com;
989                         com->lc_ops->lfsck_interpret(env, req, laia, result);
990                 }
991         }
992         spin_unlock(&lfsck->li_lock);
993 }
994
995 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
996 {
997         struct lfsck_component *com;
998         struct lfsck_component *next;
999         struct l_wait_info      lwi = { 0 };
1000         int                     rc  = 0;
1001         int                     rc1 = 0;
1002
1003         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1004                                      lc_link) {
1005                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1006                         com->lc_journal = 0;
1007
1008                 rc = com->lc_ops->lfsck_double_scan(env, com);
1009                 if (rc != 0)
1010                         rc1 = rc;
1011         }
1012
1013         l_wait_event(lfsck->li_thread.t_ctl_waitq,
1014                      atomic_read(&lfsck->li_double_scan_count) == 0,
1015                      &lwi);
1016
1017         return rc1 != 0 ? rc1 : rc;
1018 }
1019
1020 static int lfsck_stop_notify(const struct lu_env *env,
1021                              struct lfsck_instance *lfsck,
1022                              struct lfsck_tgt_descs *ltds,
1023                              struct lfsck_tgt_desc *ltd, __u16 type)
1024 {
1025         struct ptlrpc_request_set *set;
1026         struct lfsck_component    *com;
1027         int                        rc  = 0;
1028         ENTRY;
1029
1030         spin_lock(&lfsck->li_lock);
1031         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1032         if (com == NULL)
1033                 com = __lfsck_component_find(lfsck, type,
1034                                              &lfsck->li_list_double_scan);
1035         if (com != NULL)
1036                 lfsck_component_get(com);
1037         spin_lock(&lfsck->li_lock);
1038
1039         if (com != NULL) {
1040                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1041                         set = ptlrpc_prep_set();
1042                         if (set == NULL) {
1043                                 lfsck_component_put(env, com);
1044
1045                                 RETURN(-ENOMEM);
1046                         }
1047
1048                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1049                                                             ltd, set);
1050                         if (rc == 0)
1051                                 rc = ptlrpc_set_wait(set);
1052
1053                         ptlrpc_set_destroy(set);
1054                 }
1055
1056                 lfsck_component_put(env, com);
1057         }
1058
1059         RETURN(rc);
1060 }
1061
1062 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
1063 {
1064         struct lfsck_component *com;
1065         struct lfsck_component *next;
1066
1067         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1068                                  lc_link) {
1069                 if (com->lc_ops->lfsck_quit != NULL)
1070                         com->lc_ops->lfsck_quit(env, com);
1071         }
1072
1073         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1074                                  lc_link) {
1075                 if (com->lc_ops->lfsck_quit != NULL)
1076                         com->lc_ops->lfsck_quit(env, com);
1077         }
1078 }
1079
1080 static int lfsck_async_interpret(const struct lu_env *env,
1081                                  struct ptlrpc_request *req,
1082                                  void *args, int rc)
1083 {
1084         struct lfsck_async_interpret_args *laia = args;
1085         struct lfsck_instance             *lfsck;
1086
1087         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1088                               li_mdt_descs);
1089         lfsck_interpret(env, lfsck, req, laia, rc);
1090         lfsck_tgt_put(laia->laia_ltd);
1091         if (rc != 0 && laia->laia_result != -EALREADY)
1092                 laia->laia_result = rc;
1093
1094         return 0;
1095 }
1096
1097 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1098                         struct lfsck_request *lr,
1099                         struct ptlrpc_request_set *set,
1100                         ptlrpc_interpterer_t interpreter,
1101                         void *args, int request)
1102 {
1103         struct lfsck_async_interpret_args *laia;
1104         struct ptlrpc_request             *req;
1105         struct lfsck_request              *tmp;
1106         struct req_format                 *format;
1107         int                                rc;
1108
1109         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
1110                 return -EOPNOTSUPP;
1111
1112         switch (request) {
1113         case LFSCK_NOTIFY:
1114                 format = &RQF_LFSCK_NOTIFY;
1115                 break;
1116         case LFSCK_QUERY:
1117                 format = &RQF_LFSCK_QUERY;
1118                 break;
1119         default:
1120                 CERROR("%s: unknown async request: opc = %d\n",
1121                        exp->exp_obd->obd_name, request);
1122                 return -EINVAL;
1123         }
1124
1125         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1126         if (req == NULL)
1127                 return -ENOMEM;
1128
1129         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1130         if (rc != 0) {
1131                 ptlrpc_request_free(req);
1132
1133                 return rc;
1134         }
1135
1136         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1137         *tmp = *lr;
1138         ptlrpc_request_set_replen(req);
1139
1140         laia = ptlrpc_req_async_args(req);
1141         *laia = *(struct lfsck_async_interpret_args *)args;
1142         if (laia->laia_com != NULL)
1143                 lfsck_component_get(laia->laia_com);
1144         req->rq_interpret_reply = interpreter;
1145         ptlrpc_set_add_req(set, req);
1146
1147         return 0;
1148 }
1149
1150 /* external interfaces */
1151
1152 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
1153 {
1154         struct lu_env           env;
1155         struct lfsck_instance  *lfsck;
1156         int                     rc;
1157         ENTRY;
1158
1159         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1160         if (rc != 0)
1161                 RETURN(rc);
1162
1163         lfsck = lfsck_instance_find(key, true, false);
1164         if (likely(lfsck != NULL)) {
1165                 rc = snprintf(buf, len, "%u\n",
1166                               lfsck->li_bookmark_ram.lb_speed_limit);
1167                 lfsck_instance_put(&env, lfsck);
1168         } else {
1169                 rc = -ENODEV;
1170         }
1171
1172         lu_env_fini(&env);
1173
1174         RETURN(rc);
1175 }
1176 EXPORT_SYMBOL(lfsck_get_speed);
1177
1178 int lfsck_set_speed(struct dt_device *key, int val)
1179 {
1180         struct lu_env           env;
1181         struct lfsck_instance  *lfsck;
1182         int                     rc;
1183         ENTRY;
1184
1185         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1186         if (rc != 0)
1187                 RETURN(rc);
1188
1189         lfsck = lfsck_instance_find(key, true, false);
1190         if (likely(lfsck != NULL)) {
1191                 mutex_lock(&lfsck->li_mutex);
1192                 __lfsck_set_speed(lfsck, val);
1193                 rc = lfsck_bookmark_store(&env, lfsck);
1194                 mutex_unlock(&lfsck->li_mutex);
1195                 lfsck_instance_put(&env, lfsck);
1196         } else {
1197                 rc = -ENODEV;
1198         }
1199
1200         lu_env_fini(&env);
1201
1202         RETURN(rc);
1203 }
1204 EXPORT_SYMBOL(lfsck_set_speed);
1205
1206 int lfsck_get_windows(struct dt_device *key, void *buf, int len)
1207 {
1208         struct lu_env           env;
1209         struct lfsck_instance  *lfsck;
1210         int                     rc;
1211         ENTRY;
1212
1213         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1214         if (rc != 0)
1215                 RETURN(rc);
1216
1217         lfsck = lfsck_instance_find(key, true, false);
1218         if (likely(lfsck != NULL)) {
1219                 rc = snprintf(buf, len, "%u\n",
1220                               lfsck->li_bookmark_ram.lb_async_windows);
1221                 lfsck_instance_put(&env, lfsck);
1222         } else {
1223                 rc = -ENODEV;
1224         }
1225
1226         lu_env_fini(&env);
1227
1228         RETURN(rc);
1229 }
1230 EXPORT_SYMBOL(lfsck_get_windows);
1231
1232 int lfsck_set_windows(struct dt_device *key, int val)
1233 {
1234         struct lu_env           env;
1235         struct lfsck_instance  *lfsck;
1236         int                     rc;
1237         ENTRY;
1238
1239         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1240         if (rc != 0)
1241                 RETURN(rc);
1242
1243         lfsck = lfsck_instance_find(key, true, false);
1244         if (likely(lfsck != NULL)) {
1245                 if (val > LFSCK_ASYNC_WIN_MAX) {
1246                         CERROR("%s: Too large async windows size, which "
1247                                "may cause memory issues. The valid range "
1248                                "is [0 - %u]. If you do not want to restrict "
1249                                "the windows size for async requests pipeline, "
1250                                "just set it as 0.\n",
1251                                lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1252                         rc = -EINVAL;
1253                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1254                         mutex_lock(&lfsck->li_mutex);
1255                         lfsck->li_bookmark_ram.lb_async_windows = val;
1256                         rc = lfsck_bookmark_store(&env, lfsck);
1257                         mutex_unlock(&lfsck->li_mutex);
1258                 }
1259                 lfsck_instance_put(&env, lfsck);
1260         } else {
1261                 rc = -ENODEV;
1262         }
1263
1264         lu_env_fini(&env);
1265
1266         RETURN(rc);
1267 }
1268 EXPORT_SYMBOL(lfsck_set_windows);
1269
1270 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
1271 {
1272         struct lu_env           env;
1273         struct lfsck_instance  *lfsck;
1274         struct lfsck_component *com;
1275         int                     rc;
1276         ENTRY;
1277
1278         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1279         if (rc != 0)
1280                 RETURN(rc);
1281
1282         lfsck = lfsck_instance_find(key, true, false);
1283         if (likely(lfsck != NULL)) {
1284                 com = lfsck_component_find(lfsck, type);
1285                 if (likely(com != NULL)) {
1286                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
1287                         lfsck_component_put(&env, com);
1288                 } else {
1289                         rc = -ENOTSUPP;
1290                 }
1291
1292                 lfsck_instance_put(&env, lfsck);
1293         } else {
1294                 rc = -ENODEV;
1295         }
1296
1297         lu_env_fini(&env);
1298
1299         RETURN(rc);
1300 }
1301 EXPORT_SYMBOL(lfsck_dump);
1302
1303 static int lfsck_stop_all(const struct lu_env *env,
1304                           struct lfsck_instance *lfsck,
1305                           struct lfsck_stop *stop)
1306 {
1307         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1308         struct lfsck_request              *lr     = &info->lti_lr;
1309         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1310         struct ptlrpc_request_set         *set;
1311         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1312         struct lfsck_tgt_desc             *ltd;
1313         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1314         __u32                              idx;
1315         int                                rc     = 0;
1316         int                                rc1    = 0;
1317         ENTRY;
1318
1319         LASSERT(stop->ls_flags & LPF_BROADCAST);
1320
1321         set = ptlrpc_prep_set();
1322         if (unlikely(set == NULL)) {
1323                 CERROR("%s: cannot allocate memory for stop LFSCK on "
1324                        "all targets\n", lfsck_lfsck2name(lfsck));
1325
1326                 RETURN(-ENOMEM);
1327         }
1328
1329         memset(lr, 0, sizeof(*lr));
1330         lr->lr_event = LE_STOP;
1331         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1332         lr->lr_status = stop->ls_status;
1333         lr->lr_version = bk->lb_version;
1334         lr->lr_active = LFSCK_TYPES_ALL;
1335         lr->lr_param = stop->ls_flags;
1336
1337         laia->laia_com = NULL;
1338         laia->laia_ltds = ltds;
1339         laia->laia_lr = lr;
1340         laia->laia_result = 0;
1341         laia->laia_shared = 1;
1342
1343         down_read(&ltds->ltd_rw_sem);
1344         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1345                 ltd = lfsck_tgt_get(ltds, idx);
1346                 LASSERT(ltd != NULL);
1347
1348                 laia->laia_ltd = ltd;
1349                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1350                                          lfsck_async_interpret, laia,
1351                                          LFSCK_NOTIFY);
1352                 if (rc != 0) {
1353                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1354                         lfsck_tgt_put(ltd);
1355                         CWARN("%s: cannot notify MDT %x for LFSCK stop: "
1356                               "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1357                         rc1 = rc;
1358                 }
1359         }
1360         up_read(&ltds->ltd_rw_sem);
1361
1362         rc = ptlrpc_set_wait(set);
1363         ptlrpc_set_destroy(set);
1364
1365         if (rc == 0)
1366                 rc = laia->laia_result;
1367
1368         if (rc == -EALREADY)
1369                 rc = 0;
1370
1371         if (rc != 0)
1372                 CWARN("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1373                       lfsck_lfsck2name(lfsck), rc);
1374
1375         RETURN(rc != 0 ? rc : rc1);
1376 }
1377
1378 static int lfsck_start_all(const struct lu_env *env,
1379                            struct lfsck_instance *lfsck,
1380                            struct lfsck_start *start)
1381 {
1382         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1383         struct lfsck_request              *lr     = &info->lti_lr;
1384         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1385         struct ptlrpc_request_set         *set;
1386         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1387         struct lfsck_tgt_desc             *ltd;
1388         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1389         __u32                              idx;
1390         int                                rc     = 0;
1391         ENTRY;
1392
1393         LASSERT(start->ls_flags & LPF_BROADCAST);
1394
1395         set = ptlrpc_prep_set();
1396         if (unlikely(set == NULL)) {
1397                 if (bk->lb_param & LPF_FAILOUT) {
1398                         CERROR("%s: cannot allocate memory for start LFSCK on "
1399                                "all targets, failout.\n",
1400                                lfsck_lfsck2name(lfsck));
1401
1402                         RETURN(-ENOMEM);
1403                 } else {
1404                         CWARN("%s: cannot allocate memory for start LFSCK on "
1405                               "all targets, partly scan.\n",
1406                               lfsck_lfsck2name(lfsck));
1407
1408                         RETURN(0);
1409                 }
1410         }
1411
1412         memset(lr, 0, sizeof(*lr));
1413         lr->lr_event = LE_START;
1414         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1415         lr->lr_speed = bk->lb_speed_limit;
1416         lr->lr_version = bk->lb_version;
1417         lr->lr_active = start->ls_active;
1418         lr->lr_param = start->ls_flags;
1419         lr->lr_async_windows = bk->lb_async_windows;
1420         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1421                        LSV_ASYNC_WINDOWS;
1422
1423         laia->laia_com = NULL;
1424         laia->laia_ltds = ltds;
1425         laia->laia_lr = lr;
1426         laia->laia_result = 0;
1427         laia->laia_shared = 1;
1428
1429         down_read(&ltds->ltd_rw_sem);
1430         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1431                 ltd = lfsck_tgt_get(ltds, idx);
1432                 LASSERT(ltd != NULL);
1433
1434                 laia->laia_ltd = ltd;
1435                 ltd->ltd_layout_done = 0;
1436                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1437                                          lfsck_async_interpret, laia,
1438                                          LFSCK_NOTIFY);
1439                 if (rc != 0) {
1440                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1441                         lfsck_tgt_put(ltd);
1442                         if (bk->lb_param & LPF_FAILOUT) {
1443                                 CERROR("%s: cannot notify MDT %x for LFSCK "
1444                                        "start, failout: rc = %d\n",
1445                                        lfsck_lfsck2name(lfsck), idx, rc);
1446                                 break;
1447                         } else {
1448                                 CWARN("%s: cannot notify MDT %x for LFSCK "
1449                                       "start, partly scan: rc = %d\n",
1450                                       lfsck_lfsck2name(lfsck), idx, rc);
1451                                 rc = 0;
1452                         }
1453                 }
1454         }
1455         up_read(&ltds->ltd_rw_sem);
1456
1457         if (rc != 0) {
1458                 ptlrpc_set_destroy(set);
1459
1460                 RETURN(rc);
1461         }
1462
1463         rc = ptlrpc_set_wait(set);
1464         ptlrpc_set_destroy(set);
1465
1466         if (rc == 0)
1467                 rc = laia->laia_result;
1468
1469         if (rc != 0) {
1470                 if (bk->lb_param & LPF_FAILOUT) {
1471                         struct lfsck_stop *stop = &info->lti_stop;
1472
1473                         CERROR("%s: cannot start LFSCK on some MDTs, "
1474                                "stop all: rc = %d\n",
1475                                lfsck_lfsck2name(lfsck), rc);
1476                         if (rc != -EALREADY) {
1477                                 stop->ls_status = LS_FAILED;
1478                                 stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1479                                 lfsck_stop_all(env, lfsck, stop);
1480                         }
1481                 } else {
1482                         CWARN("%s: cannot start LFSCK on some MDTs, "
1483                               "partly scan: rc = %d\n",
1484                               lfsck_lfsck2name(lfsck), rc);
1485                         rc = 0;
1486                 }
1487         }
1488
1489         RETURN(rc);
1490 }
1491
1492 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1493                 struct lfsck_start_param *lsp)
1494 {
1495         struct lfsck_start              *start  = lsp->lsp_start;
1496         struct lfsck_instance           *lfsck;
1497         struct lfsck_bookmark           *bk;
1498         struct ptlrpc_thread            *thread;
1499         struct lfsck_component          *com;
1500         struct l_wait_info               lwi    = { 0 };
1501         struct lfsck_thread_args        *lta;
1502         bool                             dirty  = false;
1503         long                             rc     = 0;
1504         __u16                            valid  = 0;
1505         __u16                            flags  = 0;
1506         __u16                            type   = 1;
1507         ENTRY;
1508
1509         lfsck = lfsck_instance_find(key, true, false);
1510         if (unlikely(lfsck == NULL))
1511                 RETURN(-ENODEV);
1512
1513         /* System is not ready, try again later. */
1514         if (unlikely(lfsck->li_namespace == NULL))
1515                 GOTO(put, rc = -EAGAIN);
1516
1517         /* start == NULL means auto trigger paused LFSCK. */
1518         if ((start == NULL) &&
1519             (cfs_list_empty(&lfsck->li_list_scan) ||
1520              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
1521                 GOTO(put, rc = 0);
1522
1523         bk = &lfsck->li_bookmark_ram;
1524         thread = &lfsck->li_thread;
1525         mutex_lock(&lfsck->li_mutex);
1526         spin_lock(&lfsck->li_lock);
1527         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1528                 rc = -EALREADY;
1529                 while (start->ls_active != 0) {
1530                         if (!(type & start->ls_active)) {
1531                                 type <<= 1;
1532                                 continue;
1533                         }
1534
1535                         com = __lfsck_component_find(lfsck, type,
1536                                                      &lfsck->li_list_scan);
1537                         if (com == NULL)
1538                                 com = __lfsck_component_find(lfsck, type,
1539                                                 &lfsck->li_list_double_scan);
1540                         if (com == NULL) {
1541                                 rc = -EOPNOTSUPP;
1542                                 break;
1543                         }
1544
1545                         if (com->lc_ops->lfsck_join != NULL) {
1546                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
1547                                 if (rc != 0 && rc != -EALREADY)
1548                                         break;
1549                         }
1550                         start->ls_active &= ~type;
1551                         type <<= 1;
1552                 }
1553                 spin_unlock(&lfsck->li_lock);
1554                 GOTO(out, rc);
1555         }
1556         spin_unlock(&lfsck->li_lock);
1557
1558         lfsck->li_status = 0;
1559         lfsck->li_oit_over = 0;
1560         lfsck->li_start_unplug = 0;
1561         lfsck->li_drop_dryrun = 0;
1562         lfsck->li_new_scanned = 0;
1563
1564         /* For auto trigger. */
1565         if (start == NULL)
1566                 goto trigger;
1567
1568         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
1569                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
1570                        lfsck_lfsck2name(lfsck));
1571
1572                 GOTO(out, rc = -EPERM);
1573         }
1574
1575         start->ls_version = bk->lb_version;
1576         if (start->ls_valid & LSV_SPEED_LIMIT) {
1577                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
1578                 dirty = true;
1579         }
1580
1581         if (start->ls_valid & LSV_ASYNC_WINDOWS &&
1582             bk->lb_async_windows != start->ls_async_windows) {
1583                 bk->lb_async_windows = start->ls_async_windows;
1584                 dirty = true;
1585         }
1586
1587         if (start->ls_valid & LSV_ERROR_HANDLE) {
1588                 valid |= DOIV_ERROR_HANDLE;
1589                 if (start->ls_flags & LPF_FAILOUT)
1590                         flags |= DOIF_FAILOUT;
1591
1592                 if ((start->ls_flags & LPF_FAILOUT) &&
1593                     !(bk->lb_param & LPF_FAILOUT)) {
1594                         bk->lb_param |= LPF_FAILOUT;
1595                         dirty = true;
1596                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
1597                            (bk->lb_param & LPF_FAILOUT)) {
1598                         bk->lb_param &= ~LPF_FAILOUT;
1599                         dirty = true;
1600                 }
1601         }
1602
1603         if (start->ls_valid & LSV_DRYRUN) {
1604                 valid |= DOIV_DRYRUN;
1605                 if (start->ls_flags & LPF_DRYRUN)
1606                         flags |= DOIF_DRYRUN;
1607
1608                 if ((start->ls_flags & LPF_DRYRUN) &&
1609                     !(bk->lb_param & LPF_DRYRUN)) {
1610                         bk->lb_param |= LPF_DRYRUN;
1611                         dirty = true;
1612                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
1613                            (bk->lb_param & LPF_DRYRUN)) {
1614                         bk->lb_param &= ~LPF_DRYRUN;
1615                         lfsck->li_drop_dryrun = 1;
1616                         dirty = true;
1617                 }
1618         }
1619
1620         if (bk->lb_param & LPF_ALL_TGT &&
1621             !(start->ls_flags & LPF_ALL_TGT)) {
1622                 bk->lb_param &= ~LPF_ALL_TGT;
1623                 dirty = true;
1624         } else if (!(bk->lb_param & LPF_ALL_TGT) &&
1625                    start->ls_flags & LPF_ALL_TGT) {
1626                 bk->lb_param |= LPF_ALL_TGT;
1627                 dirty = true;
1628         }
1629
1630         if (bk->lb_param & LPF_ORPHAN &&
1631             !(start->ls_flags & LPF_ORPHAN)) {
1632                 bk->lb_param &= ~LPF_ORPHAN;
1633                 dirty = true;
1634         } else if (!(bk->lb_param & LPF_ORPHAN) &&
1635                    start->ls_flags & LPF_ORPHAN) {
1636                 bk->lb_param |= LPF_ORPHAN;
1637                 dirty = true;
1638         }
1639
1640         if (dirty) {
1641                 rc = lfsck_bookmark_store(env, lfsck);
1642                 if (rc != 0)
1643                         GOTO(out, rc);
1644         }
1645
1646         if (start->ls_flags & LPF_RESET)
1647                 flags |= DOIF_RESET;
1648
1649         if (start->ls_active != 0) {
1650                 struct lfsck_component *next;
1651
1652                 if (start->ls_active == LFSCK_TYPES_ALL)
1653                         start->ls_active = LFSCK_TYPES_SUPPORTED;
1654
1655                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
1656                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
1657                         GOTO(out, rc = -ENOTSUPP);
1658                 }
1659
1660                 cfs_list_for_each_entry_safe(com, next,
1661                                              &lfsck->li_list_scan, lc_link) {
1662                         if (!(com->lc_type & start->ls_active)) {
1663                                 rc = com->lc_ops->lfsck_post(env, com, 0,
1664                                                              false);
1665                                 if (rc != 0)
1666                                         GOTO(out, rc);
1667                         }
1668                 }
1669
1670                 while (start->ls_active != 0) {
1671                         if (type & start->ls_active) {
1672                                 com = __lfsck_component_find(lfsck, type,
1673                                                         &lfsck->li_list_idle);
1674                                 if (com != NULL) {
1675                                         /* The component status will be updated
1676                                          * when its prep() is called later by
1677                                          * the LFSCK main engine. */
1678                                         cfs_list_del_init(&com->lc_link);
1679                                         cfs_list_add_tail(&com->lc_link,
1680                                                           &lfsck->li_list_scan);
1681                                 }
1682                                 start->ls_active &= ~type;
1683                         }
1684                         type <<= 1;
1685                 }
1686         }
1687
1688         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1689                 start->ls_active |= com->lc_type;
1690                 if (flags & DOIF_RESET) {
1691                         rc = com->lc_ops->lfsck_reset(env, com, false);
1692                         if (rc != 0)
1693                                 GOTO(out, rc);
1694                 }
1695         }
1696
1697 trigger:
1698         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1699         if (bk->lb_param & LPF_DRYRUN) {
1700                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1701                 valid |= DOIV_DRYRUN;
1702                 flags |= DOIF_DRYRUN;
1703         }
1704
1705         if (bk->lb_param & LPF_FAILOUT) {
1706                 valid |= DOIV_ERROR_HANDLE;
1707                 flags |= DOIF_FAILOUT;
1708         }
1709
1710         if (!cfs_list_empty(&lfsck->li_list_scan))
1711                 flags |= DOIF_OUTUSED;
1712
1713         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1714         thread_set_flags(thread, 0);
1715         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
1716         if (IS_ERR(lta))
1717                 GOTO(out, rc = PTR_ERR(lta));
1718
1719         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
1720         if (IS_ERR_VALUE(rc)) {
1721                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
1722                        lfsck_lfsck2name(lfsck), rc);
1723                 lfsck_thread_args_fini(lta);
1724
1725                 GOTO(out, rc);
1726         }
1727
1728         l_wait_event(thread->t_ctl_waitq,
1729                      thread_is_running(thread) ||
1730                      thread_is_stopped(thread),
1731                      &lwi);
1732         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
1733                 lfsck->li_start_unplug = 1;
1734                 wake_up_all(&thread->t_ctl_waitq);
1735
1736                 GOTO(out, rc = 0);
1737         }
1738
1739         /* release lfsck::li_mutex to avoid deadlock. */
1740         mutex_unlock(&lfsck->li_mutex);
1741         rc = lfsck_start_all(env, lfsck, start);
1742         if (rc != 0) {
1743                 spin_lock(&lfsck->li_lock);
1744                 if (thread_is_stopped(thread)) {
1745                         spin_unlock(&lfsck->li_lock);
1746                 } else {
1747                         lfsck->li_status = LS_FAILED;
1748                         lfsck->li_flags = 0;
1749                         thread_set_flags(thread, SVC_STOPPING);
1750                         spin_unlock(&lfsck->li_lock);
1751
1752                         lfsck->li_start_unplug = 1;
1753                         wake_up_all(&thread->t_ctl_waitq);
1754                         l_wait_event(thread->t_ctl_waitq,
1755                                      thread_is_stopped(thread),
1756                                      &lwi);
1757                 }
1758         } else {
1759                 lfsck->li_start_unplug = 1;
1760                 wake_up_all(&thread->t_ctl_waitq);
1761         }
1762
1763         GOTO(put, rc);
1764
1765 out:
1766         mutex_unlock(&lfsck->li_mutex);
1767
1768 put:
1769         lfsck_instance_put(env, lfsck);
1770
1771         return rc < 0 ? rc : 0;
1772 }
1773 EXPORT_SYMBOL(lfsck_start);
1774
1775 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
1776                struct lfsck_stop *stop)
1777 {
1778         struct lfsck_instance   *lfsck;
1779         struct ptlrpc_thread    *thread;
1780         struct l_wait_info       lwi    = { 0 };
1781         int                      rc     = 0;
1782         int                      rc1    = 0;
1783         ENTRY;
1784
1785         lfsck = lfsck_instance_find(key, true, false);
1786         if (unlikely(lfsck == NULL))
1787                 RETURN(-ENODEV);
1788
1789         thread = &lfsck->li_thread;
1790         /* release lfsck::li_mutex to avoid deadlock. */
1791         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
1792                 if (!lfsck->li_master) {
1793                         CERROR("%s: only allow to specify '-A' via MDS\n",
1794                                lfsck_lfsck2name(lfsck));
1795
1796                         GOTO(out, rc = -EPERM);
1797                 }
1798
1799                 rc1 = lfsck_stop_all(env, lfsck, stop);
1800         }
1801
1802         mutex_lock(&lfsck->li_mutex);
1803         spin_lock(&lfsck->li_lock);
1804         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1805                 spin_unlock(&lfsck->li_lock);
1806                 GOTO(out, rc = -EALREADY);
1807         }
1808
1809         if (stop != NULL) {
1810                 lfsck->li_status = stop->ls_status;
1811                 lfsck->li_flags = stop->ls_flags;
1812         } else {
1813                 lfsck->li_status = LS_STOPPED;
1814                 lfsck->li_flags = 0;
1815         }
1816
1817         thread_set_flags(thread, SVC_STOPPING);
1818         spin_unlock(&lfsck->li_lock);
1819
1820         wake_up_all(&thread->t_ctl_waitq);
1821         l_wait_event(thread->t_ctl_waitq,
1822                      thread_is_stopped(thread),
1823                      &lwi);
1824
1825         GOTO(out, rc = 0);
1826
1827 out:
1828         mutex_unlock(&lfsck->li_mutex);
1829         lfsck_instance_put(env, lfsck);
1830
1831         return rc != 0 ? rc : rc1;
1832 }
1833 EXPORT_SYMBOL(lfsck_stop);
1834
1835 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
1836                     struct lfsck_request *lr)
1837 {
1838         int rc = -EOPNOTSUPP;
1839         ENTRY;
1840
1841         switch (lr->lr_event) {
1842         case LE_START: {
1843                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
1844                 struct lfsck_start_param  lsp;
1845
1846                 memset(start, 0, sizeof(*start));
1847                 start->ls_valid = lr->lr_valid;
1848                 start->ls_speed_limit = lr->lr_speed;
1849                 start->ls_version = lr->lr_version;
1850                 start->ls_active = lr->lr_active;
1851                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1852                 start->ls_async_windows = lr->lr_async_windows;
1853
1854                 lsp.lsp_start = start;
1855                 lsp.lsp_index = lr->lr_index;
1856                 lsp.lsp_index_valid = 1;
1857                 rc = lfsck_start(env, key, &lsp);
1858                 break;
1859         }
1860         case LE_STOP: {
1861                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
1862
1863                 memset(stop, 0, sizeof(*stop));
1864                 stop->ls_status = lr->lr_status;
1865                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1866                 rc = lfsck_stop(env, key, stop);
1867                 break;
1868         }
1869         case LE_PHASE1_DONE:
1870         case LE_PHASE2_DONE:
1871         case LE_FID_ACCESSED:
1872         case LE_PEER_EXIT: {
1873                 struct lfsck_instance  *lfsck;
1874                 struct lfsck_component *com;
1875
1876                 lfsck = lfsck_instance_find(key, true, false);
1877                 if (unlikely(lfsck == NULL))
1878                         RETURN(-ENODEV);
1879
1880                 com = lfsck_component_find(lfsck, lr->lr_active);
1881                 if (likely(com != NULL)) {
1882                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
1883                         lfsck_component_put(env, com);
1884                 }
1885
1886                 lfsck_instance_put(env, lfsck);
1887                 break;
1888         }
1889         default:
1890                 break;
1891         }
1892
1893         RETURN(rc);
1894 }
1895 EXPORT_SYMBOL(lfsck_in_notify);
1896
1897 int lfsck_query(const struct lu_env *env, struct dt_device *key,
1898                 struct lfsck_request *lr)
1899 {
1900         struct lfsck_instance  *lfsck;
1901         struct lfsck_component *com;
1902         int                     rc;
1903         ENTRY;
1904
1905         lfsck = lfsck_instance_find(key, true, false);
1906         if (unlikely(lfsck == NULL))
1907                 RETURN(-ENODEV);
1908
1909         com = lfsck_component_find(lfsck, lr->lr_active);
1910         if (likely(com != NULL)) {
1911                 rc = com->lc_ops->lfsck_query(env, com);
1912                 lfsck_component_put(env, com);
1913         } else {
1914                 rc = -ENOTSUPP;
1915         }
1916
1917         lfsck_instance_put(env, lfsck);
1918
1919         RETURN(rc);
1920 }
1921 EXPORT_SYMBOL(lfsck_query);
1922
1923 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
1924                              struct ldlm_namespace *ns)
1925 {
1926         struct lfsck_instance  *lfsck;
1927         int                     rc      = -ENODEV;
1928
1929         lfsck = lfsck_instance_find(key, true, false);
1930         if (likely(lfsck != NULL)) {
1931                 lfsck->li_namespace = ns;
1932                 lfsck_instance_put(env, lfsck);
1933                 rc = 0;
1934         }
1935
1936         return rc;
1937 }
1938 EXPORT_SYMBOL(lfsck_register_namespace);
1939
1940 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1941                    struct dt_device *next, struct obd_device *obd,
1942                    lfsck_out_notify notify, void *notify_data, bool master)
1943 {
1944         struct lfsck_instance   *lfsck;
1945         struct dt_object        *root  = NULL;
1946         struct dt_object        *obj;
1947         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1948         int                      rc;
1949         ENTRY;
1950
1951         lfsck = lfsck_instance_find(key, false, false);
1952         if (unlikely(lfsck != NULL))
1953                 RETURN(-EEXIST);
1954
1955         OBD_ALLOC_PTR(lfsck);
1956         if (lfsck == NULL)
1957                 RETURN(-ENOMEM);
1958
1959         mutex_init(&lfsck->li_mutex);
1960         spin_lock_init(&lfsck->li_lock);
1961         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1962         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1963         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1964         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1965         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1966         atomic_set(&lfsck->li_ref, 1);
1967         atomic_set(&lfsck->li_double_scan_count, 0);
1968         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
1969         lfsck->li_out_notify = notify;
1970         lfsck->li_out_notify_data = notify_data;
1971         lfsck->li_next = next;
1972         lfsck->li_bottom = key;
1973         lfsck->li_obd = obd;
1974
1975         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
1976         if (rc != 0)
1977                 GOTO(out, rc);
1978
1979         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
1980         if (rc != 0)
1981                 GOTO(out, rc);
1982
1983         fid->f_seq = FID_SEQ_LOCAL_NAME;
1984         fid->f_oid = 1;
1985         fid->f_ver = 0;
1986         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1987         if (rc != 0)
1988                 GOTO(out, rc);
1989
1990         rc = dt_root_get(env, key, fid);
1991         if (rc != 0)
1992                 GOTO(out, rc);
1993
1994         root = dt_locate(env, lfsck->li_bottom, fid);
1995         if (IS_ERR(root))
1996                 GOTO(out, rc = PTR_ERR(root));
1997
1998         if (unlikely(!dt_try_as_dir(env, root)))
1999                 GOTO(out, rc = -ENOTDIR);
2000
2001         lfsck->li_local_root_fid = *fid;
2002         if (master) {
2003                 lfsck->li_master = 1;
2004                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
2005                         rc = dt_lookup(env, root,
2006                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2007                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2008                         if (rc != 0)
2009                                 GOTO(out, rc);
2010                 }
2011         }
2012
2013         fid->f_seq = FID_SEQ_LOCAL_FILE;
2014         fid->f_oid = OTABLE_IT_OID;
2015         fid->f_ver = 0;
2016         obj = dt_locate(env, lfsck->li_bottom, fid);
2017         if (IS_ERR(obj))
2018                 GOTO(out, rc = PTR_ERR(obj));
2019
2020         lfsck->li_obj_oit = obj;
2021         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2022         if (rc != 0) {
2023                 if (rc == -ENOTSUPP)
2024                         GOTO(add, rc = 0);
2025
2026                 GOTO(out, rc);
2027         }
2028
2029         rc = lfsck_bookmark_setup(env, lfsck);
2030         if (rc != 0)
2031                 GOTO(out, rc);
2032
2033         if (master) {
2034                 rc = lfsck_namespace_setup(env, lfsck);
2035                 if (rc < 0)
2036                         GOTO(out, rc);
2037         }
2038
2039         rc = lfsck_layout_setup(env, lfsck);
2040         if (rc < 0)
2041                 GOTO(out, rc);
2042
2043         /* XXX: more LFSCK components initialization to be added here. */
2044
2045 add:
2046         rc = lfsck_instance_add(lfsck);
2047         if (rc == 0)
2048                 rc = lfsck_add_target_from_orphan(env, lfsck);
2049 out:
2050         if (root != NULL && !IS_ERR(root))
2051                 lu_object_put(env, &root->do_lu);
2052         if (rc != 0)
2053                 lfsck_instance_cleanup(env, lfsck);
2054         return rc;
2055 }
2056 EXPORT_SYMBOL(lfsck_register);
2057
2058 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2059 {
2060         struct lfsck_instance *lfsck;
2061
2062         lfsck = lfsck_instance_find(key, false, true);
2063         if (lfsck != NULL)
2064                 lfsck_instance_put(env, lfsck);
2065 }
2066 EXPORT_SYMBOL(lfsck_degister);
2067
2068 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2069                      struct dt_device *tgt, struct obd_export *exp,
2070                      __u32 index, bool for_ost)
2071 {
2072         struct lfsck_instance   *lfsck;
2073         struct lfsck_tgt_desc   *ltd;
2074         int                      rc;
2075         ENTRY;
2076
2077         OBD_ALLOC_PTR(ltd);
2078         if (ltd == NULL)
2079                 RETURN(-ENOMEM);
2080
2081         ltd->ltd_tgt = tgt;
2082         ltd->ltd_key = key;
2083         ltd->ltd_exp = exp;
2084         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2085         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2086         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2087         atomic_set(&ltd->ltd_ref, 1);
2088         ltd->ltd_index = index;
2089
2090         spin_lock(&lfsck_instance_lock);
2091         lfsck = __lfsck_instance_find(key, true, false);
2092         if (lfsck == NULL) {
2093                 if (for_ost)
2094                         list_add_tail(&ltd->ltd_orphan_list,
2095                                       &lfsck_ost_orphan_list);
2096                 else
2097                         list_add_tail(&ltd->ltd_orphan_list,
2098                                       &lfsck_mdt_orphan_list);
2099                 spin_unlock(&lfsck_instance_lock);
2100
2101                 RETURN(0);
2102         }
2103         spin_unlock(&lfsck_instance_lock);
2104
2105         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2106         if (rc != 0)
2107                 lfsck_tgt_put(ltd);
2108
2109         lfsck_instance_put(env, lfsck);
2110
2111         RETURN(rc);
2112 }
2113 EXPORT_SYMBOL(lfsck_add_target);
2114
2115 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2116                       struct dt_device *tgt, __u32 index, bool for_ost)
2117 {
2118         struct lfsck_instance   *lfsck;
2119         struct lfsck_tgt_descs  *ltds;
2120         struct lfsck_tgt_desc   *ltd    = NULL;
2121         struct list_head        *head;
2122
2123         if (for_ost)
2124                 head = &lfsck_ost_orphan_list;
2125         else
2126                 head = &lfsck_mdt_orphan_list;
2127
2128         spin_lock(&lfsck_instance_lock);
2129         list_for_each_entry(ltd, head, ltd_orphan_list) {
2130                 if (ltd->ltd_tgt == tgt) {
2131                         list_del_init(&ltd->ltd_orphan_list);
2132                         spin_unlock(&lfsck_instance_lock);
2133                         lfsck_tgt_put(ltd);
2134
2135                         return;
2136                 }
2137         }
2138
2139         lfsck = __lfsck_instance_find(key, true, false);
2140         spin_unlock(&lfsck_instance_lock);
2141         if (unlikely(lfsck == NULL))
2142                 return;
2143
2144         if (for_ost)
2145                 ltds = &lfsck->li_ost_descs;
2146         else
2147                 ltds = &lfsck->li_mdt_descs;
2148
2149         down_write(&ltds->ltd_rw_sem);
2150         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2151
2152         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2153                 goto unlock;
2154
2155         ltd = LTD_TGT(ltds, index);
2156         if (unlikely(ltd == NULL))
2157                 goto unlock;
2158
2159         LASSERT(ltds->ltd_tgtnr > 0);
2160
2161         ltds->ltd_tgtnr--;
2162         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2163         LTD_TGT(ltds, index) = NULL;
2164
2165 unlock:
2166         if (ltd == NULL) {
2167                 if (for_ost)
2168                         head = &lfsck->li_ost_descs.ltd_orphan;
2169                 else
2170                         head = &lfsck->li_ost_descs.ltd_orphan;
2171
2172                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2173                         if (ltd->ltd_tgt == tgt) {
2174                                 list_del_init(&ltd->ltd_orphan_list);
2175                                 break;
2176                         }
2177                 }
2178         }
2179
2180         up_write(&ltds->ltd_rw_sem);
2181         if (ltd != NULL) {
2182                 spin_lock(&ltds->ltd_lock);
2183                 ltd->ltd_dead = 1;
2184                 spin_unlock(&ltds->ltd_lock);
2185                 lfsck_stop_notify(env, lfsck, ltds, ltd, LT_LAYOUT);
2186                 lfsck_tgt_put(ltd);
2187         }
2188
2189         lfsck_instance_put(env, lfsck);
2190 }
2191 EXPORT_SYMBOL(lfsck_del_target);
2192
2193 static int __init lfsck_init(void)
2194 {
2195         int rc;
2196
2197         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2198         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2199         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2200         rc = lu_context_key_register(&lfsck_thread_key);
2201         if (rc == 0) {
2202                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2203                 tgt_register_lfsck_query(lfsck_query);
2204         }
2205
2206         return rc;
2207 }
2208
2209 static void __exit lfsck_exit(void)
2210 {
2211         struct lfsck_tgt_desc *ltd;
2212         struct lfsck_tgt_desc *next;
2213
2214         LASSERT(cfs_list_empty(&lfsck_instance_list));
2215
2216         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2217                                  ltd_orphan_list) {
2218                 list_del_init(&ltd->ltd_orphan_list);
2219                 lfsck_tgt_put(ltd);
2220         }
2221
2222         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2223                                  ltd_orphan_list) {
2224                 list_del_init(&ltd->ltd_orphan_list);
2225                 lfsck_tgt_put(ltd);
2226         }
2227
2228         lu_context_key_degister(&lfsck_thread_key);
2229 }
2230
2231 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2232 MODULE_DESCRIPTION("LFSCK");
2233 MODULE_LICENSE("GPL");
2234
2235 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);