Whamcloud - gitweb
LU-1267 lfsck: enhance RPCs (3) for MDT-OST consistency
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         OBD_FREE_PTR(info);
55 }
56
57 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
58 LU_KEY_INIT_GENERIC(lfsck);
59
60 static CFS_LIST_HEAD(lfsck_instance_list);
61 static struct list_head lfsck_ost_orphan_list;
62 static struct list_head lfsck_mdt_orphan_list;
63 static DEFINE_SPINLOCK(lfsck_instance_lock);
64
65 static const char *lfsck_status_names[] = {
66         [LS_INIT]               = "init",
67         [LS_SCANNING_PHASE1]    = "scanning-phase1",
68         [LS_SCANNING_PHASE2]    = "scanning-phase2",
69         [LS_COMPLETED]          = "completed",
70         [LS_FAILED]             = "failed",
71         [LS_STOPPED]            = "stopped",
72         [LS_PAUSED]             = "paused",
73         [LS_CRASHED]            = "crashed",
74         [LS_PARTIAL]            = "partial",
75         [LS_CO_FAILED]          = "co-failed",
76         [LS_CO_STOPPED]         = "co-stopped",
77         [LS_CO_PAUSED]          = "co-paused"
78 };
79
80 const char *lfsck_flags_names[] = {
81         "scanned-once",
82         "inconsistent",
83         "upgrade",
84         "incomplete",
85         "crashed_lastid",
86         NULL
87 };
88
89 const char *lfsck_param_names[] = {
90         NULL,
91         "failout",
92         "dryrun",
93         NULL
94 };
95
96 const char *lfsck_status2names(enum lfsck_status status)
97 {
98         if (unlikely(status < 0 || status >= LS_MAX))
99                 return "unknown";
100
101         return lfsck_status_names[status];
102 }
103
104 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
105 {
106         spin_lock_init(&ltds->ltd_lock);
107         init_rwsem(&ltds->ltd_rw_sem);
108         INIT_LIST_HEAD(&ltds->ltd_orphan);
109         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
110         if (ltds->ltd_tgts_bitmap == NULL)
111                 return -ENOMEM;
112
113         return 0;
114 }
115
116 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
117 {
118         struct lfsck_tgt_desc   *ltd;
119         struct lfsck_tgt_desc   *next;
120         int                      idx;
121
122         down_write(&ltds->ltd_rw_sem);
123
124         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
125                                  ltd_orphan_list) {
126                 list_del_init(&ltd->ltd_orphan_list);
127                 lfsck_tgt_put(ltd);
128         }
129
130         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
131                 up_write(&ltds->ltd_rw_sem);
132
133                 return;
134         }
135
136         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
137                 ltd = LTD_TGT(ltds, idx);
138                 if (likely(ltd != NULL)) {
139                         LASSERT(list_empty(&ltd->ltd_layout_list));
140                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
141
142                         ltds->ltd_tgtnr--;
143                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
144                         LTD_TGT(ltds, idx) = NULL;
145                         lfsck_tgt_put(ltd);
146                 }
147         }
148
149         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
150                  ltds->ltd_tgtnr);
151
152         for (idx = 0; idx < TGT_PTRS; idx++) {
153                 if (ltds->ltd_tgts_idx[idx] != NULL) {
154                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
155                         ltds->ltd_tgts_idx[idx] = NULL;
156                 }
157         }
158
159         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
160         ltds->ltd_tgts_bitmap = NULL;
161         up_write(&ltds->ltd_rw_sem);
162 }
163
164 static int __lfsck_add_target(const struct lu_env *env,
165                               struct lfsck_instance *lfsck,
166                               struct lfsck_tgt_desc *ltd,
167                               bool for_ost, bool locked)
168 {
169         struct lfsck_tgt_descs *ltds;
170         __u32                   index = ltd->ltd_index;
171         int                     rc    = 0;
172         ENTRY;
173
174         if (for_ost)
175                 ltds = &lfsck->li_ost_descs;
176         else
177                 ltds = &lfsck->li_mdt_descs;
178
179         if (!locked)
180                 down_write(&ltds->ltd_rw_sem);
181
182         LASSERT(ltds->ltd_tgts_bitmap != NULL);
183
184         if (index >= ltds->ltd_tgts_bitmap->size) {
185                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
186                                     (__u32)BITS_PER_LONG);
187                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
188                 cfs_bitmap_t *new_bitmap;
189
190                 while (newsize < index + 1)
191                         newsize <<= 1;
192
193                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
194                 if (new_bitmap == NULL)
195                         GOTO(unlock, rc = -ENOMEM);
196
197                 if (ltds->ltd_tgtnr > 0)
198                         cfs_bitmap_copy(new_bitmap, old_bitmap);
199                 ltds->ltd_tgts_bitmap = new_bitmap;
200                 CFS_FREE_BITMAP(old_bitmap);
201         }
202
203         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
204                 CERROR("%s: the device %s (%u) is registered already\n",
205                        lfsck_lfsck2name(lfsck),
206                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
207                 GOTO(unlock, rc = -EEXIST);
208         }
209
210         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
211                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
212                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
213                         GOTO(unlock, rc = -ENOMEM);
214         }
215
216         LTD_TGT(ltds, index) = ltd;
217         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
218         ltds->ltd_tgtnr++;
219
220         GOTO(unlock, rc = 0);
221
222 unlock:
223         if (!locked)
224                 up_write(&ltds->ltd_rw_sem);
225
226         return rc;
227 }
228
229 static int lfsck_add_target_from_orphan(const struct lu_env *env,
230                                         struct lfsck_instance *lfsck)
231 {
232         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
233         struct lfsck_tgt_desc   *ltd;
234         struct lfsck_tgt_desc   *next;
235         struct list_head        *head    = &lfsck_ost_orphan_list;
236         int                      rc;
237         bool                     for_ost = true;
238
239 again:
240         spin_lock(&lfsck_instance_lock);
241         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
242                 if (ltd->ltd_key == lfsck->li_bottom) {
243                         list_del_init(&ltd->ltd_orphan_list);
244                         list_add_tail(&ltd->ltd_orphan_list,
245                                       &ltds->ltd_orphan);
246                 }
247         }
248         spin_unlock(&lfsck_instance_lock);
249
250         down_write(&ltds->ltd_rw_sem);
251         while (!list_empty(&ltds->ltd_orphan)) {
252                 ltd = list_entry(ltds->ltd_orphan.next,
253                                  struct lfsck_tgt_desc,
254                                  ltd_orphan_list);
255                 list_del_init(&ltd->ltd_orphan_list);
256                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
257                 /* Do not hold the semaphore for too long time. */
258                 up_write(&ltds->ltd_rw_sem);
259                 if (rc != 0)
260                         return rc;
261
262                 down_write(&ltds->ltd_rw_sem);
263         }
264         up_write(&ltds->ltd_rw_sem);
265
266         if (for_ost) {
267                 ltds = &lfsck->li_mdt_descs;
268                 head = &lfsck_mdt_orphan_list;
269                 for_ost = false;
270                 goto again;
271         }
272
273         return 0;
274 }
275
276 static inline struct lfsck_component *
277 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
278 {
279         struct lfsck_component *com;
280
281         cfs_list_for_each_entry(com, list, lc_link) {
282                 if (com->lc_type == type)
283                         return com;
284         }
285         return NULL;
286 }
287
288 static struct lfsck_component *
289 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
290 {
291         struct lfsck_component *com;
292
293         spin_lock(&lfsck->li_lock);
294         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
295         if (com != NULL)
296                 goto unlock;
297
298         com = __lfsck_component_find(lfsck, type,
299                                      &lfsck->li_list_double_scan);
300         if (com != NULL)
301                 goto unlock;
302
303         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
304
305 unlock:
306         if (com != NULL)
307                 lfsck_component_get(com);
308         spin_unlock(&lfsck->li_lock);
309         return com;
310 }
311
312 void lfsck_component_cleanup(const struct lu_env *env,
313                              struct lfsck_component *com)
314 {
315         if (!cfs_list_empty(&com->lc_link))
316                 cfs_list_del_init(&com->lc_link);
317         if (!cfs_list_empty(&com->lc_link_dir))
318                 cfs_list_del_init(&com->lc_link_dir);
319
320         lfsck_component_put(env, com);
321 }
322
323 void lfsck_instance_cleanup(const struct lu_env *env,
324                             struct lfsck_instance *lfsck)
325 {
326         struct ptlrpc_thread    *thread = &lfsck->li_thread;
327         struct lfsck_component  *com;
328         ENTRY;
329
330         LASSERT(list_empty(&lfsck->li_link));
331         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
332
333         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
334         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
335
336         if (lfsck->li_obj_oit != NULL) {
337                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
338                 lfsck->li_obj_oit = NULL;
339         }
340
341         LASSERT(lfsck->li_obj_dir == NULL);
342
343         while (!cfs_list_empty(&lfsck->li_list_scan)) {
344                 com = cfs_list_entry(lfsck->li_list_scan.next,
345                                      struct lfsck_component,
346                                      lc_link);
347                 lfsck_component_cleanup(env, com);
348         }
349
350         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
351
352         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
353                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
354                                      struct lfsck_component,
355                                      lc_link);
356                 lfsck_component_cleanup(env, com);
357         }
358
359         while (!cfs_list_empty(&lfsck->li_list_idle)) {
360                 com = cfs_list_entry(lfsck->li_list_idle.next,
361                                      struct lfsck_component,
362                                      lc_link);
363                 lfsck_component_cleanup(env, com);
364         }
365
366         if (lfsck->li_bookmark_obj != NULL) {
367                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
368                 lfsck->li_bookmark_obj = NULL;
369         }
370
371         if (lfsck->li_los != NULL) {
372                 local_oid_storage_fini(env, lfsck->li_los);
373                 lfsck->li_los = NULL;
374         }
375
376         OBD_FREE_PTR(lfsck);
377 }
378
379 static inline struct lfsck_instance *
380 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
381 {
382         struct lfsck_instance *lfsck;
383
384         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
385                 if (lfsck->li_bottom == key) {
386                         if (ref)
387                                 lfsck_instance_get(lfsck);
388                         if (unlink)
389                                 list_del_init(&lfsck->li_link);
390
391                         return lfsck;
392                 }
393         }
394
395         return NULL;
396 }
397
398 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
399                                                          bool ref, bool unlink)
400 {
401         struct lfsck_instance *lfsck;
402
403         spin_lock(&lfsck_instance_lock);
404         lfsck = __lfsck_instance_find(key, ref, unlink);
405         spin_unlock(&lfsck_instance_lock);
406
407         return lfsck;
408 }
409
410 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
411 {
412         struct lfsck_instance *tmp;
413
414         spin_lock(&lfsck_instance_lock);
415         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
416                 if (lfsck->li_bottom == tmp->li_bottom) {
417                         spin_unlock(&lfsck_instance_lock);
418                         return -EEXIST;
419                 }
420         }
421
422         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
423         spin_unlock(&lfsck_instance_lock);
424         return 0;
425 }
426
427 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
428                     const char *prefix)
429 {
430         int save = *len;
431         int flag;
432         int rc;
433         int i;
434
435         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
436         if (rc <= 0)
437                 return -ENOSPC;
438
439         *buf += rc;
440         *len -= rc;
441         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
442                 if (flag & bits) {
443                         bits &= ~flag;
444                         if (names[i] != NULL) {
445                                 rc = snprintf(*buf, *len, "%s%c", names[i],
446                                               bits != 0 ? ',' : '\n');
447                                 if (rc <= 0)
448                                         return -ENOSPC;
449
450                                 *buf += rc;
451                                 *len -= rc;
452                         }
453                 }
454         }
455         return save - *len;
456 }
457
458 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
459 {
460         int rc;
461
462         if (time != 0)
463                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
464                               cfs_time_current_sec() - time);
465         else
466                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
467         if (rc <= 0)
468                 return -ENOSPC;
469
470         *buf += rc;
471         *len -= rc;
472         return rc;
473 }
474
475 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
476                    const char *prefix)
477 {
478         int rc;
479
480         if (fid_is_zero(&pos->lp_dir_parent)) {
481                 if (pos->lp_oit_cookie == 0)
482                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
483                                       prefix);
484                 else
485                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
486                                       prefix, pos->lp_oit_cookie);
487         } else {
488                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
489                               prefix, pos->lp_oit_cookie,
490                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
491         }
492         if (rc <= 0)
493                 return -ENOSPC;
494
495         *buf += rc;
496         *len -= rc;
497         return rc;
498 }
499
500 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
501                     struct lfsck_position *pos, bool init)
502 {
503         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
504
505         if (unlikely(lfsck->li_di_oit == NULL)) {
506                 memset(pos, 0, sizeof(*pos));
507                 return;
508         }
509
510         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
511         if (!lfsck->li_current_oit_processed && !init)
512                 pos->lp_oit_cookie--;
513
514         LASSERT(pos->lp_oit_cookie > 0);
515
516         if (lfsck->li_di_dir != NULL) {
517                 struct dt_object *dto = lfsck->li_obj_dir;
518
519                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
520                                                         lfsck->li_di_dir);
521
522                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
523                         fid_zero(&pos->lp_dir_parent);
524                         pos->lp_dir_cookie = 0;
525                 } else {
526                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
527                 }
528         } else {
529                 fid_zero(&pos->lp_dir_parent);
530                 pos->lp_dir_cookie = 0;
531         }
532 }
533
534 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
535 {
536         lfsck->li_bookmark_ram.lb_speed_limit = limit;
537         if (limit != LFSCK_SPEED_NO_LIMIT) {
538                 if (limit > HZ) {
539                         lfsck->li_sleep_rate = limit / HZ;
540                         lfsck->li_sleep_jif = 1;
541                 } else {
542                         lfsck->li_sleep_rate = 1;
543                         lfsck->li_sleep_jif = HZ / limit;
544                 }
545         } else {
546                 lfsck->li_sleep_jif = 0;
547                 lfsck->li_sleep_rate = 0;
548         }
549 }
550
551 void lfsck_control_speed(struct lfsck_instance *lfsck)
552 {
553         struct ptlrpc_thread *thread = &lfsck->li_thread;
554         struct l_wait_info    lwi;
555
556         if (lfsck->li_sleep_jif > 0 &&
557             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
558                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
559                                        LWI_ON_SIGNAL_NOOP, NULL);
560
561                 l_wait_event(thread->t_ctl_waitq,
562                              !thread_is_running(thread),
563                              &lwi);
564                 lfsck->li_new_scanned = 0;
565         }
566 }
567
568 void lfsck_control_speed_by_self(struct lfsck_component *com)
569 {
570         struct lfsck_instance   *lfsck  = com->lc_lfsck;
571         struct ptlrpc_thread    *thread = &lfsck->li_thread;
572         struct l_wait_info       lwi;
573
574         if (lfsck->li_sleep_jif > 0 &&
575             com->lc_new_scanned >= lfsck->li_sleep_rate) {
576                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
577                                        LWI_ON_SIGNAL_NOOP, NULL);
578
579                 l_wait_event(thread->t_ctl_waitq,
580                              !thread_is_running(thread),
581                              &lwi);
582                 com->lc_new_scanned = 0;
583         }
584 }
585
586 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
587                             struct lu_fid *fid)
588 {
589         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
590                      !dt_try_as_dir(env, obj)))
591                 return -ENOTDIR;
592
593         return dt_lookup(env, obj, (struct dt_rec *)fid,
594                          (const struct dt_key *)"..", BYPASS_CAPA);
595 }
596
597 static int lfsck_needs_scan_dir(const struct lu_env *env,
598                                 struct lfsck_instance *lfsck,
599                                 struct dt_object *obj)
600 {
601         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
602         int            depth = 0;
603         int            rc;
604
605         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
606             cfs_list_empty(&lfsck->li_list_dir))
607                RETURN(0);
608
609         while (1) {
610                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
611                  *      which is the agent directory to manage the objects
612                  *      which name entries reside on remote MDTs. Related
613                  *      consistency verification will be processed in LFSCK
614                  *      phase III. */
615                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
616                         if (depth > 0)
617                                 lfsck_object_put(env, obj);
618                         return 1;
619                 }
620
621                 /* .lustre doesn't contain "real" user objects, no need lfsck */
622                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
623                         if (depth > 0)
624                                 lfsck_object_put(env, obj);
625                         return 0;
626                 }
627
628                 dt_read_lock(env, obj, MOR_TGT_CHILD);
629                 if (unlikely(lfsck_is_dead_obj(obj))) {
630                         dt_read_unlock(env, obj);
631                         if (depth > 0)
632                                 lfsck_object_put(env, obj);
633                         return 0;
634                 }
635
636                 rc = dt_xattr_get(env, obj,
637                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
638                                   BYPASS_CAPA);
639                 dt_read_unlock(env, obj);
640                 if (rc >= 0) {
641                         if (depth > 0)
642                                 lfsck_object_put(env, obj);
643                         return 1;
644                 }
645
646                 if (rc < 0 && rc != -ENODATA) {
647                         if (depth > 0)
648                                 lfsck_object_put(env, obj);
649                         return rc;
650                 }
651
652                 rc = lfsck_parent_fid(env, obj, fid);
653                 if (depth > 0)
654                         lfsck_object_put(env, obj);
655                 if (rc != 0)
656                         return rc;
657
658                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
659                         return 0;
660
661                 obj = lfsck_object_find(env, lfsck, fid);
662                 if (obj == NULL)
663                         return 0;
664                 else if (IS_ERR(obj))
665                         return PTR_ERR(obj);
666
667                 if (!dt_object_exists(obj)) {
668                         lfsck_object_put(env, obj);
669                         return 0;
670                 }
671
672                 /* Currently, only client visible directory can be remote. */
673                 if (dt_object_remote(obj)) {
674                         lfsck_object_put(env, obj);
675                         return 1;
676                 }
677
678                 depth++;
679         }
680         return 0;
681 }
682
683 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
684                                                  struct lfsck_component *com,
685                                                  struct lfsck_start_param *lsp)
686 {
687         struct lfsck_thread_args *lta;
688         int                       rc;
689
690         OBD_ALLOC_PTR(lta);
691         if (lta == NULL)
692                 return ERR_PTR(-ENOMEM);
693
694         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
695         if (rc != 0) {
696                 OBD_FREE_PTR(lta);
697                 return ERR_PTR(rc);
698         }
699
700         lta->lta_lfsck = lfsck_instance_get(lfsck);
701         if (com != NULL)
702                 lta->lta_com = lfsck_component_get(com);
703
704         lta->lta_lsp = lsp;
705
706         return lta;
707 }
708
709 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
710 {
711         if (lta->lta_com != NULL)
712                 lfsck_component_put(&lta->lta_env, lta->lta_com);
713         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
714         lu_env_fini(&lta->lta_env);
715         OBD_FREE_PTR(lta);
716 }
717
718 /* LFSCK wrap functions */
719
720 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
721                 bool new_checked)
722 {
723         struct lfsck_component *com;
724
725         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
726                 com->lc_ops->lfsck_fail(env, com, new_checked);
727         }
728 }
729
730 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
731 {
732         struct lfsck_component *com;
733         int                     rc  = 0;
734         int                     rc1 = 0;
735
736         if (likely(cfs_time_beforeq(cfs_time_current(),
737                                     lfsck->li_time_next_checkpoint)))
738                 return 0;
739
740         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
741         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
742                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
743                 if (rc != 0)
744                         rc1 = rc;
745         }
746
747         lfsck->li_time_last_checkpoint = cfs_time_current();
748         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
749                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
750         return rc1 != 0 ? rc1 : rc;
751 }
752
753 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
754                struct lfsck_start_param *lsp)
755 {
756         struct dt_object       *obj     = NULL;
757         struct lfsck_component *com;
758         struct lfsck_component *next;
759         struct lfsck_position  *pos     = NULL;
760         const struct dt_it_ops *iops    =
761                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
762         struct dt_it           *di;
763         int                     rc;
764         ENTRY;
765
766         LASSERT(lfsck->li_obj_dir == NULL);
767         LASSERT(lfsck->li_di_dir == NULL);
768
769         lfsck->li_current_oit_processed = 0;
770         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
771                 com->lc_new_checked = 0;
772                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
773                         com->lc_journal = 0;
774
775                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
776                 if (rc != 0)
777                         GOTO(out, rc);
778
779                 if ((pos == NULL) ||
780                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
781                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
782                         pos = &com->lc_pos_start;
783         }
784
785         /* Init otable-based iterator. */
786         if (pos == NULL) {
787                 rc = iops->load(env, lfsck->li_di_oit, 0);
788                 if (rc > 0) {
789                         lfsck->li_oit_over = 1;
790                         rc = 0;
791                 }
792
793                 GOTO(out, rc);
794         }
795
796         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
797         if (rc < 0)
798                 GOTO(out, rc);
799         else if (rc > 0)
800                 lfsck->li_oit_over = 1;
801
802         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
803                 GOTO(out, rc = 0);
804
805         /* Find the directory for namespace-based traverse. */
806         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
807         if (obj == NULL)
808                 GOTO(out, rc = 0);
809         else if (IS_ERR(obj))
810                 RETURN(PTR_ERR(obj));
811
812         /* XXX: Currently, skip remote object, the consistency for
813          *      remote object will be processed in LFSCK phase III. */
814         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
815             unlikely(!S_ISDIR(lfsck_object_type(obj))))
816                 GOTO(out, rc = 0);
817
818         if (unlikely(!dt_try_as_dir(env, obj)))
819                 GOTO(out, rc = -ENOTDIR);
820
821         /* Init the namespace-based directory traverse. */
822         iops = &obj->do_index_ops->dio_it;
823         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
824         if (IS_ERR(di))
825                 GOTO(out, rc = PTR_ERR(di));
826
827         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
828
829         rc = iops->load(env, di, pos->lp_dir_cookie);
830         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
831                 rc = iops->next(env, di);
832         else if (rc > 0)
833                 rc = 0;
834
835         if (rc != 0) {
836                 iops->put(env, di);
837                 iops->fini(env, di);
838                 GOTO(out, rc);
839         }
840
841         lfsck->li_obj_dir = lfsck_object_get(obj);
842         lfsck->li_cookie_dir = iops->store(env, di);
843         spin_lock(&lfsck->li_lock);
844         lfsck->li_di_dir = di;
845         spin_unlock(&lfsck->li_lock);
846
847         GOTO(out, rc = 0);
848
849 out:
850         if (obj != NULL)
851                 lfsck_object_put(env, obj);
852
853         if (rc < 0) {
854                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
855                                              lc_link)
856                         com->lc_ops->lfsck_post(env, com, rc, true);
857
858                 return rc;
859         }
860
861         rc = 0;
862         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
863         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
864                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
865                 if (rc != 0)
866                         break;
867         }
868
869         lfsck->li_time_last_checkpoint = cfs_time_current();
870         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
871                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
872         return rc;
873 }
874
875 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
876                    struct dt_object *obj)
877 {
878         struct lfsck_component *com;
879         const struct dt_it_ops *iops;
880         struct dt_it           *di;
881         int                     rc;
882         ENTRY;
883
884         LASSERT(lfsck->li_obj_dir == NULL);
885
886         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
887                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
888                 if (rc != 0)
889                         RETURN(rc);
890         }
891
892         rc = lfsck_needs_scan_dir(env, lfsck, obj);
893         if (rc <= 0)
894                 GOTO(out, rc);
895
896         if (unlikely(!dt_try_as_dir(env, obj)))
897                 GOTO(out, rc = -ENOTDIR);
898
899         iops = &obj->do_index_ops->dio_it;
900         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
901         if (IS_ERR(di))
902                 GOTO(out, rc = PTR_ERR(di));
903
904         rc = iops->load(env, di, 0);
905         if (rc == 0)
906                 rc = iops->next(env, di);
907         else if (rc > 0)
908                 rc = 0;
909
910         if (rc != 0) {
911                 iops->put(env, di);
912                 iops->fini(env, di);
913                 GOTO(out, rc);
914         }
915
916         lfsck->li_obj_dir = lfsck_object_get(obj);
917         lfsck->li_cookie_dir = iops->store(env, di);
918         spin_lock(&lfsck->li_lock);
919         lfsck->li_di_dir = di;
920         spin_unlock(&lfsck->li_lock);
921
922         GOTO(out, rc = 0);
923
924 out:
925         if (rc < 0)
926                 lfsck_fail(env, lfsck, false);
927         return (rc > 0 ? 0 : rc);
928 }
929
930 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
931                    struct dt_object *obj, struct lu_dirent *ent)
932 {
933         struct lfsck_component *com;
934         int                     rc;
935
936         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
937                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
938                 if (rc != 0)
939                         return rc;
940         }
941         return 0;
942 }
943
944 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
945                int result)
946 {
947         struct lfsck_component *com;
948         struct lfsck_component *next;
949         int                     rc  = 0;
950         int                     rc1 = 0;
951
952         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
953         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
954                 rc = com->lc_ops->lfsck_post(env, com, result, false);
955                 if (rc != 0)
956                         rc1 = rc;
957         }
958
959         lfsck->li_time_last_checkpoint = cfs_time_current();
960         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
961                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
962
963         /* Ignore some component post failure to make other can go ahead. */
964         return result;
965 }
966
967 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
968 {
969         struct lfsck_component *com;
970         struct lfsck_component *next;
971         struct l_wait_info      lwi = { 0 };
972         int                     rc  = 0;
973         int                     rc1 = 0;
974
975         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
976                                      lc_link) {
977                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
978                         com->lc_journal = 0;
979
980                 rc = com->lc_ops->lfsck_double_scan(env, com);
981                 if (rc != 0)
982                         rc1 = rc;
983         }
984
985         l_wait_event(lfsck->li_thread.t_ctl_waitq,
986                      atomic_read(&lfsck->li_double_scan_count) == 0,
987                      &lwi);
988
989         return rc1 != 0 ? rc1 : rc;
990 }
991
992 int lfsck_stop_notify(const struct lu_env *env, struct lfsck_instance *lfsck,
993                       struct lfsck_tgt_descs *ltds, struct lfsck_tgt_desc *ltd)
994 {
995         struct ptlrpc_request_set *set;
996         struct lfsck_component    *com;
997         int                        cnt = 0;
998         int                        rc  = 0;
999         int                        rc1 = 0;
1000
1001         set = ptlrpc_prep_set();
1002         if (set == NULL)
1003                 return -ENOMEM;
1004
1005         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1006                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1007                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1008                                                             ltd, set);
1009                         if (rc != 0)
1010                                 rc1 = rc;
1011                         else
1012                                 cnt++;
1013                 }
1014         }
1015
1016         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1017                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1018                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1019                                                             ltd, set);
1020                         if (rc != 0)
1021                                 rc1 = rc;
1022                         else
1023                                 cnt++;
1024                 }
1025         }
1026
1027         if (cnt > 0)
1028                 rc = ptlrpc_set_wait(set);
1029         ptlrpc_set_destroy(set);
1030
1031         return rc1 != 0 ? rc1 : rc;
1032 }
1033
1034 void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
1035 {
1036         struct lfsck_component *com;
1037         struct lfsck_component *next;
1038
1039         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
1040                                  lc_link) {
1041                 if (com->lc_ops->lfsck_quit != NULL)
1042                         com->lc_ops->lfsck_quit(env, com);
1043         }
1044
1045         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1046                                  lc_link) {
1047                 if (com->lc_ops->lfsck_quit != NULL)
1048                         com->lc_ops->lfsck_quit(env, com);
1049         }
1050 }
1051
1052 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1053                         struct lfsck_request *lr,
1054                         struct ptlrpc_request_set *set,
1055                         ptlrpc_interpterer_t interpreter,
1056                         void *args, int request)
1057 {
1058         struct lfsck_async_interpret_args *laia;
1059         struct ptlrpc_request             *req;
1060         struct lfsck_request              *tmp;
1061         struct req_format                 *format;
1062         int                                rc;
1063
1064         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
1065                 return -EOPNOTSUPP;
1066
1067         switch (request) {
1068         case LFSCK_NOTIFY:
1069                 format = &RQF_LFSCK_NOTIFY;
1070                 break;
1071         case LFSCK_QUERY:
1072                 format = &RQF_LFSCK_QUERY;
1073                 break;
1074         default:
1075                 CERROR("%s: unknown async request: opc = %d\n",
1076                        exp->exp_obd->obd_name, request);
1077                 return -EINVAL;
1078         }
1079
1080         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1081         if (req == NULL)
1082                 return -ENOMEM;
1083
1084         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1085         if (rc != 0) {
1086                 ptlrpc_request_free(req);
1087
1088                 return rc;
1089         }
1090
1091         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1092         *tmp = *lr;
1093         ptlrpc_request_set_replen(req);
1094
1095         laia = ptlrpc_req_async_args(req);
1096         *laia = *(struct lfsck_async_interpret_args *)args;
1097         req->rq_interpret_reply = interpreter;
1098         ptlrpc_set_add_req(set, req);
1099
1100         return 0;
1101 }
1102
1103 /* external interfaces */
1104
1105 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
1106 {
1107         struct lu_env           env;
1108         struct lfsck_instance  *lfsck;
1109         int                     rc;
1110         ENTRY;
1111
1112         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1113         if (rc != 0)
1114                 RETURN(rc);
1115
1116         lfsck = lfsck_instance_find(key, true, false);
1117         if (likely(lfsck != NULL)) {
1118                 rc = snprintf(buf, len, "%u\n",
1119                               lfsck->li_bookmark_ram.lb_speed_limit);
1120                 lfsck_instance_put(&env, lfsck);
1121         } else {
1122                 rc = -ENODEV;
1123         }
1124
1125         lu_env_fini(&env);
1126
1127         RETURN(rc);
1128 }
1129 EXPORT_SYMBOL(lfsck_get_speed);
1130
1131 int lfsck_set_speed(struct dt_device *key, int val)
1132 {
1133         struct lu_env           env;
1134         struct lfsck_instance  *lfsck;
1135         int                     rc;
1136         ENTRY;
1137
1138         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1139         if (rc != 0)
1140                 RETURN(rc);
1141
1142         lfsck = lfsck_instance_find(key, true, false);
1143         if (likely(lfsck != NULL)) {
1144                 mutex_lock(&lfsck->li_mutex);
1145                 __lfsck_set_speed(lfsck, val);
1146                 rc = lfsck_bookmark_store(&env, lfsck);
1147                 mutex_unlock(&lfsck->li_mutex);
1148                 lfsck_instance_put(&env, lfsck);
1149         } else {
1150                 rc = -ENODEV;
1151         }
1152
1153         lu_env_fini(&env);
1154
1155         RETURN(rc);
1156 }
1157 EXPORT_SYMBOL(lfsck_set_speed);
1158
1159 int lfsck_get_windows(struct dt_device *key, void *buf, int len)
1160 {
1161         struct lu_env           env;
1162         struct lfsck_instance  *lfsck;
1163         int                     rc;
1164         ENTRY;
1165
1166         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1167         if (rc != 0)
1168                 RETURN(rc);
1169
1170         lfsck = lfsck_instance_find(key, true, false);
1171         if (likely(lfsck != NULL)) {
1172                 rc = snprintf(buf, len, "%u\n",
1173                               lfsck->li_bookmark_ram.lb_async_windows);
1174                 lfsck_instance_put(&env, lfsck);
1175         } else {
1176                 rc = -ENODEV;
1177         }
1178
1179         lu_env_fini(&env);
1180
1181         RETURN(rc);
1182 }
1183 EXPORT_SYMBOL(lfsck_get_windows);
1184
1185 int lfsck_set_windows(struct dt_device *key, int val)
1186 {
1187         struct lu_env           env;
1188         struct lfsck_instance  *lfsck;
1189         int                     rc;
1190         ENTRY;
1191
1192         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1193         if (rc != 0)
1194                 RETURN(rc);
1195
1196         lfsck = lfsck_instance_find(key, true, false);
1197         if (likely(lfsck != NULL)) {
1198                 if (val > LFSCK_ASYNC_WIN_MAX) {
1199                         CERROR("%s: Too large async windows size, which "
1200                                "may cause memory issues. The valid range "
1201                                "is [0 - %u]. If you do not want to restrict "
1202                                "the windows size for async requests pipeline, "
1203                                "just set it as 0.\n",
1204                                lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1205                         rc = -EINVAL;
1206                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1207                         mutex_lock(&lfsck->li_mutex);
1208                         lfsck->li_bookmark_ram.lb_async_windows = val;
1209                         rc = lfsck_bookmark_store(&env, lfsck);
1210                         mutex_unlock(&lfsck->li_mutex);
1211                 }
1212                 lfsck_instance_put(&env, lfsck);
1213         } else {
1214                 rc = -ENODEV;
1215         }
1216
1217         lu_env_fini(&env);
1218
1219         RETURN(rc);
1220 }
1221 EXPORT_SYMBOL(lfsck_set_windows);
1222
1223 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
1224 {
1225         struct lu_env           env;
1226         struct lfsck_instance  *lfsck;
1227         struct lfsck_component *com;
1228         int                     rc;
1229         ENTRY;
1230
1231         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1232         if (rc != 0)
1233                 RETURN(rc);
1234
1235         lfsck = lfsck_instance_find(key, true, false);
1236         if (likely(lfsck != NULL)) {
1237                 com = lfsck_component_find(lfsck, type);
1238                 if (likely(com != NULL)) {
1239                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
1240                         lfsck_component_put(&env, com);
1241                 } else {
1242                         rc = -ENOTSUPP;
1243                 }
1244
1245                 lfsck_instance_put(&env, lfsck);
1246         } else {
1247                 rc = -ENODEV;
1248         }
1249
1250         lu_env_fini(&env);
1251
1252         RETURN(rc);
1253 }
1254 EXPORT_SYMBOL(lfsck_dump);
1255
1256 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1257                 struct lfsck_start_param *lsp)
1258 {
1259         struct lfsck_start              *start  = lsp->lsp_start;
1260         struct lfsck_instance           *lfsck;
1261         struct lfsck_bookmark           *bk;
1262         struct ptlrpc_thread            *thread;
1263         struct lfsck_component          *com;
1264         struct l_wait_info               lwi    = { 0 };
1265         struct lfsck_thread_args        *lta;
1266         bool                             dirty  = false;
1267         long                             rc     = 0;
1268         __u16                            valid  = 0;
1269         __u16                            flags  = 0;
1270         __u16                            type   = 1;
1271         ENTRY;
1272
1273         lfsck = lfsck_instance_find(key, true, false);
1274         if (unlikely(lfsck == NULL))
1275                 RETURN(-ENODEV);
1276
1277         /* start == NULL means auto trigger paused LFSCK. */
1278         if ((start == NULL) &&
1279             (cfs_list_empty(&lfsck->li_list_scan) ||
1280              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
1281                 GOTO(put, rc = 0);
1282
1283         bk = &lfsck->li_bookmark_ram;
1284         thread = &lfsck->li_thread;
1285         mutex_lock(&lfsck->li_mutex);
1286         spin_lock(&lfsck->li_lock);
1287         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1288                 rc = -EALREADY;
1289                 while (start->ls_active != 0) {
1290                         if (type & start->ls_active) {
1291                                 com = __lfsck_component_find(lfsck, type,
1292                                                         &lfsck->li_list_scan);
1293                                 if (com == NULL)
1294                                         com = __lfsck_component_find(lfsck,
1295                                                 type,
1296                                                 &lfsck->li_list_double_scan);
1297                                 if (com == NULL) {
1298                                         rc = -EBUSY;
1299                                         break;
1300                                 } else {
1301                                         start->ls_active &= ~type;
1302                                 }
1303                         }
1304                         type <<= 1;
1305                 }
1306                 spin_unlock(&lfsck->li_lock);
1307                 GOTO(out, rc);
1308         }
1309         spin_unlock(&lfsck->li_lock);
1310
1311         lfsck->li_namespace = lsp->lsp_namespace;
1312         lfsck->li_status = 0;
1313         lfsck->li_oit_over = 0;
1314         lfsck->li_drop_dryrun = 0;
1315         lfsck->li_new_scanned = 0;
1316
1317         /* For auto trigger. */
1318         if (start == NULL)
1319                 goto trigger;
1320
1321         start->ls_version = bk->lb_version;
1322         if (start->ls_valid & LSV_SPEED_LIMIT) {
1323                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
1324                 dirty = true;
1325         }
1326
1327         if (start->ls_valid & LSV_ASYNC_WINDOWS &&
1328             bk->lb_async_windows != start->ls_async_windows) {
1329                 bk->lb_async_windows = start->ls_async_windows;
1330                 dirty = true;
1331         }
1332
1333         if (start->ls_valid & LSV_ERROR_HANDLE) {
1334                 valid |= DOIV_ERROR_HANDLE;
1335                 if (start->ls_flags & LPF_FAILOUT)
1336                         flags |= DOIF_FAILOUT;
1337
1338                 if ((start->ls_flags & LPF_FAILOUT) &&
1339                     !(bk->lb_param & LPF_FAILOUT)) {
1340                         bk->lb_param |= LPF_FAILOUT;
1341                         dirty = true;
1342                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
1343                            (bk->lb_param & LPF_FAILOUT)) {
1344                         bk->lb_param &= ~LPF_FAILOUT;
1345                         dirty = true;
1346                 }
1347         }
1348
1349         if (start->ls_valid & LSV_DRYRUN) {
1350                 valid |= DOIV_DRYRUN;
1351                 if (start->ls_flags & LPF_DRYRUN)
1352                         flags |= DOIF_DRYRUN;
1353
1354                 if ((start->ls_flags & LPF_DRYRUN) &&
1355                     !(bk->lb_param & LPF_DRYRUN)) {
1356                         bk->lb_param |= LPF_DRYRUN;
1357                         dirty = true;
1358                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
1359                            (bk->lb_param & LPF_DRYRUN)) {
1360                         bk->lb_param &= ~LPF_DRYRUN;
1361                         lfsck->li_drop_dryrun = 1;
1362                         dirty = true;
1363                 }
1364         }
1365
1366         if (dirty) {
1367                 rc = lfsck_bookmark_store(env, lfsck);
1368                 if (rc != 0)
1369                         GOTO(out, rc);
1370         }
1371
1372         if (start->ls_flags & LPF_RESET)
1373                 flags |= DOIF_RESET;
1374
1375         if (start->ls_active != 0) {
1376                 struct lfsck_component *next;
1377
1378                 if (start->ls_active == LFSCK_TYPES_ALL)
1379                         start->ls_active = LFSCK_TYPES_SUPPORTED;
1380
1381                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
1382                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
1383                         GOTO(out, rc = -ENOTSUPP);
1384                 }
1385
1386                 cfs_list_for_each_entry_safe(com, next,
1387                                              &lfsck->li_list_scan, lc_link) {
1388                         if (!(com->lc_type & start->ls_active)) {
1389                                 rc = com->lc_ops->lfsck_post(env, com, 0,
1390                                                              false);
1391                                 if (rc != 0)
1392                                         GOTO(out, rc);
1393                         }
1394                 }
1395
1396                 while (start->ls_active != 0) {
1397                         if (type & start->ls_active) {
1398                                 com = __lfsck_component_find(lfsck, type,
1399                                                         &lfsck->li_list_idle);
1400                                 if (com != NULL) {
1401                                         /* The component status will be updated
1402                                          * when its prep() is called later by
1403                                          * the LFSCK main engine. */
1404                                         cfs_list_del_init(&com->lc_link);
1405                                         cfs_list_add_tail(&com->lc_link,
1406                                                           &lfsck->li_list_scan);
1407                                 }
1408                                 start->ls_active &= ~type;
1409                         }
1410                         type <<= 1;
1411                 }
1412         }
1413
1414         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1415                 start->ls_active |= com->lc_type;
1416                 if (flags & DOIF_RESET) {
1417                         rc = com->lc_ops->lfsck_reset(env, com, false);
1418                         if (rc != 0)
1419                                 GOTO(out, rc);
1420                 }
1421         }
1422
1423 trigger:
1424         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1425         if (bk->lb_param & LPF_DRYRUN) {
1426                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1427                 valid |= DOIV_DRYRUN;
1428                 flags |= DOIF_DRYRUN;
1429         }
1430
1431         if (bk->lb_param & LPF_FAILOUT) {
1432                 valid |= DOIV_ERROR_HANDLE;
1433                 flags |= DOIF_FAILOUT;
1434         }
1435
1436         if (!cfs_list_empty(&lfsck->li_list_scan))
1437                 flags |= DOIF_OUTUSED;
1438
1439         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1440         thread_set_flags(thread, 0);
1441         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
1442         if (IS_ERR(lta))
1443                 GOTO(out, rc = PTR_ERR(lta));
1444
1445         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
1446         if (IS_ERR_VALUE(rc)) {
1447                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
1448                        lfsck_lfsck2name(lfsck), rc);
1449                 lfsck_thread_args_fini(lta);
1450         } else {
1451                 rc = 0;
1452                 l_wait_event(thread->t_ctl_waitq,
1453                              thread_is_running(thread) ||
1454                              thread_is_stopped(thread),
1455                              &lwi);
1456         }
1457
1458         GOTO(out, rc);
1459
1460 out:
1461         mutex_unlock(&lfsck->li_mutex);
1462 put:
1463         lfsck_instance_put(env, lfsck);
1464         return (rc < 0 ? rc : 0);
1465 }
1466 EXPORT_SYMBOL(lfsck_start);
1467
1468 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
1469                struct lfsck_stop *stop)
1470 {
1471         struct lfsck_instance   *lfsck;
1472         struct ptlrpc_thread    *thread;
1473         struct l_wait_info       lwi    = { 0 };
1474         int                      rc     = 0;
1475         ENTRY;
1476
1477         lfsck = lfsck_instance_find(key, true, false);
1478         if (unlikely(lfsck == NULL))
1479                 RETURN(-ENODEV);
1480
1481         thread = &lfsck->li_thread;
1482         mutex_lock(&lfsck->li_mutex);
1483         spin_lock(&lfsck->li_lock);
1484         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1485                 spin_unlock(&lfsck->li_lock);
1486                 GOTO(out, rc = -EALREADY);
1487         }
1488
1489         if (stop != NULL)
1490                 lfsck->li_status = stop->ls_status;
1491         else
1492                 lfsck->li_status = LS_STOPPED;
1493
1494         thread_set_flags(thread, SVC_STOPPING);
1495         spin_unlock(&lfsck->li_lock);
1496
1497         wake_up_all(&thread->t_ctl_waitq);
1498         l_wait_event(thread->t_ctl_waitq,
1499                      thread_is_stopped(thread),
1500                      &lwi);
1501
1502         GOTO(out, rc = 0);
1503
1504 out:
1505         mutex_unlock(&lfsck->li_mutex);
1506         lfsck_instance_put(env, lfsck);
1507
1508         return rc;
1509 }
1510 EXPORT_SYMBOL(lfsck_stop);
1511
1512 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
1513                     struct lfsck_request *lr)
1514 {
1515         struct lfsck_instance  *lfsck;
1516         struct lfsck_component *com;
1517         int                     rc;
1518         ENTRY;
1519
1520         switch (lr->lr_event) {
1521         case LE_STOP:
1522         case LE_PHASE1_DONE:
1523         case LE_PHASE2_DONE:
1524                 break;
1525         default:
1526                 RETURN(-EOPNOTSUPP);
1527         }
1528
1529         lfsck = lfsck_instance_find(key, true, false);
1530         if (unlikely(lfsck == NULL))
1531                 RETURN(-ENODEV);
1532
1533         com = lfsck_component_find(lfsck, lr->lr_active);
1534         if (likely(com != NULL)) {
1535                 rc = com->lc_ops->lfsck_in_notify(env, com, lr);
1536                 lfsck_component_put(env, com);
1537         } else {
1538                 rc = -ENOTSUPP;
1539         }
1540
1541         lfsck_instance_put(env, lfsck);
1542
1543         RETURN(rc);
1544 }
1545 EXPORT_SYMBOL(lfsck_in_notify);
1546
1547 int lfsck_query(const struct lu_env *env, struct dt_device *key,
1548                 struct lfsck_request *lr)
1549 {
1550         struct lfsck_instance  *lfsck;
1551         struct lfsck_component *com;
1552         int                     rc;
1553         ENTRY;
1554
1555         lfsck = lfsck_instance_find(key, true, false);
1556         if (unlikely(lfsck == NULL))
1557                 RETURN(-ENODEV);
1558
1559         com = lfsck_component_find(lfsck, lr->lr_active);
1560         if (likely(com != NULL)) {
1561                 rc = com->lc_ops->lfsck_query(env, com);
1562                 lfsck_component_put(env, com);
1563         } else {
1564                 rc = -ENOTSUPP;
1565         }
1566
1567         lfsck_instance_put(env, lfsck);
1568
1569         RETURN(rc);
1570 }
1571 EXPORT_SYMBOL(lfsck_query);
1572
1573 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1574                    struct dt_device *next, struct obd_device *obd,
1575                    lfsck_out_notify notify, void *notify_data, bool master)
1576 {
1577         struct lfsck_instance   *lfsck;
1578         struct dt_object        *root  = NULL;
1579         struct dt_object        *obj;
1580         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1581         int                      rc;
1582         ENTRY;
1583
1584         lfsck = lfsck_instance_find(key, false, false);
1585         if (unlikely(lfsck != NULL))
1586                 RETURN(-EEXIST);
1587
1588         OBD_ALLOC_PTR(lfsck);
1589         if (lfsck == NULL)
1590                 RETURN(-ENOMEM);
1591
1592         mutex_init(&lfsck->li_mutex);
1593         spin_lock_init(&lfsck->li_lock);
1594         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1595         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1596         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1597         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1598         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1599         atomic_set(&lfsck->li_ref, 1);
1600         atomic_set(&lfsck->li_double_scan_count, 0);
1601         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
1602         lfsck->li_out_notify = notify;
1603         lfsck->li_out_notify_data = notify_data;
1604         lfsck->li_next = next;
1605         lfsck->li_bottom = key;
1606         lfsck->li_obd = obd;
1607
1608         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
1609         if (rc != 0)
1610                 GOTO(out, rc);
1611
1612         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
1613         if (rc != 0)
1614                 GOTO(out, rc);
1615
1616         fid->f_seq = FID_SEQ_LOCAL_NAME;
1617         fid->f_oid = 1;
1618         fid->f_ver = 0;
1619         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1620         if (rc != 0)
1621                 GOTO(out, rc);
1622
1623         rc = dt_root_get(env, key, fid);
1624         if (rc != 0)
1625                 GOTO(out, rc);
1626
1627         root = dt_locate(env, lfsck->li_bottom, fid);
1628         if (IS_ERR(root))
1629                 GOTO(out, rc = PTR_ERR(root));
1630
1631         if (unlikely(!dt_try_as_dir(env, root)))
1632                 GOTO(out, rc = -ENOTDIR);
1633
1634         lfsck->li_local_root_fid = *fid;
1635         if (master) {
1636                 lfsck->li_master = 1;
1637                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1638                         rc = dt_lookup(env, root,
1639                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1640                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1641                         if (rc != 0)
1642                                 GOTO(out, rc);
1643                 }
1644         }
1645
1646         fid->f_seq = FID_SEQ_LOCAL_FILE;
1647         fid->f_oid = OTABLE_IT_OID;
1648         fid->f_ver = 0;
1649         obj = dt_locate(env, lfsck->li_bottom, fid);
1650         if (IS_ERR(obj))
1651                 GOTO(out, rc = PTR_ERR(obj));
1652
1653         lfsck->li_obj_oit = obj;
1654         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1655         if (rc != 0) {
1656                 if (rc == -ENOTSUPP)
1657                         GOTO(add, rc = 0);
1658
1659                 GOTO(out, rc);
1660         }
1661
1662         rc = lfsck_bookmark_setup(env, lfsck);
1663         if (rc != 0)
1664                 GOTO(out, rc);
1665
1666         if (master) {
1667                 rc = lfsck_namespace_setup(env, lfsck);
1668                 if (rc < 0)
1669                         GOTO(out, rc);
1670         }
1671
1672         rc = lfsck_layout_setup(env, lfsck);
1673         if (rc < 0)
1674                 GOTO(out, rc);
1675
1676         /* XXX: more LFSCK components initialization to be added here. */
1677
1678 add:
1679         rc = lfsck_instance_add(lfsck);
1680         if (rc == 0)
1681                 rc = lfsck_add_target_from_orphan(env, lfsck);
1682 out:
1683         if (root != NULL && !IS_ERR(root))
1684                 lu_object_put(env, &root->do_lu);
1685         if (rc != 0)
1686                 lfsck_instance_cleanup(env, lfsck);
1687         return rc;
1688 }
1689 EXPORT_SYMBOL(lfsck_register);
1690
1691 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1692 {
1693         struct lfsck_instance *lfsck;
1694
1695         lfsck = lfsck_instance_find(key, false, true);
1696         if (lfsck != NULL)
1697                 lfsck_instance_put(env, lfsck);
1698 }
1699 EXPORT_SYMBOL(lfsck_degister);
1700
1701 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
1702                      struct dt_device *tgt, struct obd_export *exp,
1703                      __u32 index, bool for_ost)
1704 {
1705         struct lfsck_instance   *lfsck;
1706         struct lfsck_tgt_desc   *ltd;
1707         int                      rc;
1708         ENTRY;
1709
1710         OBD_ALLOC_PTR(ltd);
1711         if (ltd == NULL)
1712                 RETURN(-ENOMEM);
1713
1714         ltd->ltd_tgt = tgt;
1715         ltd->ltd_key = key;
1716         ltd->ltd_exp = exp;
1717         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
1718         INIT_LIST_HEAD(&ltd->ltd_layout_list);
1719         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
1720         atomic_set(&ltd->ltd_ref, 1);
1721         ltd->ltd_index = index;
1722
1723         spin_lock(&lfsck_instance_lock);
1724         lfsck = __lfsck_instance_find(key, true, false);
1725         if (lfsck == NULL) {
1726                 if (for_ost)
1727                         list_add_tail(&ltd->ltd_orphan_list,
1728                                       &lfsck_ost_orphan_list);
1729                 else
1730                         list_add_tail(&ltd->ltd_orphan_list,
1731                                       &lfsck_mdt_orphan_list);
1732                 spin_unlock(&lfsck_instance_lock);
1733
1734                 RETURN(0);
1735         }
1736         spin_unlock(&lfsck_instance_lock);
1737
1738         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
1739         if (rc != 0)
1740                 lfsck_tgt_put(ltd);
1741
1742         lfsck_instance_put(env, lfsck);
1743
1744         RETURN(rc);
1745 }
1746 EXPORT_SYMBOL(lfsck_add_target);
1747
1748 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
1749                       struct dt_device *tgt, __u32 index, bool for_ost)
1750 {
1751         struct lfsck_instance   *lfsck;
1752         struct lfsck_tgt_descs  *ltds;
1753         struct lfsck_tgt_desc   *ltd;
1754         struct list_head        *head;
1755         bool                     found = false;
1756         bool                     stop  = false;
1757
1758         if (for_ost)
1759                 head = &lfsck_ost_orphan_list;
1760         else
1761                 head = &lfsck_mdt_orphan_list;
1762
1763         spin_lock(&lfsck_instance_lock);
1764         list_for_each_entry(ltd, head, ltd_orphan_list) {
1765                 if (ltd->ltd_tgt == tgt) {
1766                         list_del_init(&ltd->ltd_orphan_list);
1767                         spin_unlock(&lfsck_instance_lock);
1768                         lfsck_tgt_put(ltd);
1769
1770                         return;
1771                 }
1772         }
1773
1774         lfsck = __lfsck_instance_find(key, true, false);
1775         spin_unlock(&lfsck_instance_lock);
1776         if (unlikely(lfsck == NULL))
1777                 return;
1778
1779         if (for_ost)
1780                 ltds = &lfsck->li_ost_descs;
1781         else
1782                 ltds = &lfsck->li_mdt_descs;
1783
1784         down_write(&ltds->ltd_rw_sem);
1785
1786         LASSERT(ltds->ltd_tgts_bitmap != NULL);
1787
1788         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
1789                 goto unlock;
1790
1791         ltd = LTD_TGT(ltds, index);
1792         if (unlikely(ltd == NULL))
1793                 goto unlock;
1794
1795         found = true;
1796         spin_lock(&ltds->ltd_lock);
1797         ltd->ltd_dead = 1;
1798         if (!list_empty(&ltd->ltd_layout_list)) {
1799                 list_del_init(&ltd->ltd_layout_list);
1800                 stop = true;
1801         } else {
1802                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
1803         }
1804         spin_unlock(&ltds->ltd_lock);
1805
1806         if (stop && lfsck->li_master)
1807                 lfsck_stop_notify(env, lfsck, ltds, ltd);
1808
1809         LASSERT(ltds->ltd_tgtnr > 0);
1810
1811         ltds->ltd_tgtnr--;
1812         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
1813         LTD_TGT(ltds, index) = NULL;
1814         lfsck_tgt_put(ltd);
1815
1816 unlock:
1817         if (!found) {
1818                 if (for_ost)
1819                         head = &lfsck->li_ost_descs.ltd_orphan;
1820                 else
1821                         head = &lfsck->li_ost_descs.ltd_orphan;
1822
1823                 list_for_each_entry(ltd, head, ltd_orphan_list) {
1824                         if (ltd->ltd_tgt == tgt) {
1825                                 list_del_init(&ltd->ltd_orphan_list);
1826                                 lfsck_tgt_put(ltd);
1827                                 break;
1828                         }
1829                 }
1830         }
1831
1832         up_write(&ltds->ltd_rw_sem);
1833         lfsck_instance_put(env, lfsck);
1834 }
1835 EXPORT_SYMBOL(lfsck_del_target);
1836
1837 static int __init lfsck_init(void)
1838 {
1839         int rc;
1840
1841         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
1842         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
1843         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1844         rc = lu_context_key_register(&lfsck_thread_key);
1845         if (rc == 0) {
1846                 tgt_register_lfsck_start(lfsck_start);
1847                 tgt_register_lfsck_in_notify(lfsck_in_notify);
1848                 tgt_register_lfsck_query(lfsck_query);
1849         }
1850
1851         return rc;
1852 }
1853
1854 static void __exit lfsck_exit(void)
1855 {
1856         struct lfsck_tgt_desc *ltd;
1857         struct lfsck_tgt_desc *next;
1858
1859         LASSERT(cfs_list_empty(&lfsck_instance_list));
1860
1861         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
1862                                  ltd_orphan_list) {
1863                 list_del_init(&ltd->ltd_orphan_list);
1864                 lfsck_tgt_put(ltd);
1865         }
1866
1867         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
1868                                  ltd_orphan_list) {
1869                 list_del_init(&ltd->ltd_orphan_list);
1870                 lfsck_tgt_put(ltd);
1871         }
1872
1873         lu_context_key_degister(&lfsck_thread_key);
1874 }
1875
1876 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1877 MODULE_DESCRIPTION("LFSCK");
1878 MODULE_LICENSE("GPL");
1879
1880 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);