Whamcloud - gitweb
LU-2914 lfsck: split LFSCK code from mdd to lfsck
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <libcfs/list.h>
37 #include <lu_object.h>
38 #include <dt_object.h>
39 #include <lustre_lib.h>
40 #include <lustre_net.h>
41 #include <lustre_lfsck.h>
42 #include <lustre/lustre_lfsck_user.h>
43
44 #include "lfsck_internal.h"
45
46 /* define lfsck thread key */
47 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
48
49 static void lfsck_key_fini(const struct lu_context *ctx,
50                            struct lu_context_key *key, void *data)
51 {
52         struct lfsck_thread_info *info = data;
53
54         lu_buf_free(&info->lti_linkea_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static CFS_LIST_HEAD(lfsck_instance_list);
62 static DEFINE_SPINLOCK(lfsck_instance_lock);
63
64 const char *lfsck_status_names[] = {
65         "init",
66         "scanning-phase1",
67         "scanning-phase2",
68         "completed",
69         "failed",
70         "stopped",
71         "paused",
72         "crashed",
73         NULL
74 };
75
76 const char *lfsck_flags_names[] = {
77         "scanned-once",
78         "inconsistent",
79         "upgrade",
80         NULL
81 };
82
83 const char *lfsck_param_names[] = {
84         "failout",
85         "dryrun",
86         NULL
87 };
88
89 static inline mdsno_t lfsck_dev_idx(struct dt_device *dev)
90 {
91         return dev->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
92 }
93
94 static inline void lfsck_component_get(struct lfsck_component *com)
95 {
96         atomic_inc(&com->lc_ref);
97 }
98
99 static inline void lfsck_component_put(const struct lu_env *env,
100                                        struct lfsck_component *com)
101 {
102         if (atomic_dec_and_test(&com->lc_ref)) {
103                 if (com->lc_obj != NULL)
104                         lu_object_put(env, &com->lc_obj->do_lu);
105                 if (com->lc_file_ram != NULL)
106                         OBD_FREE(com->lc_file_ram, com->lc_file_size);
107                 if (com->lc_file_disk != NULL)
108                         OBD_FREE(com->lc_file_disk, com->lc_file_size);
109                 OBD_FREE_PTR(com);
110         }
111 }
112
113 static inline struct lfsck_component *
114 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
115 {
116         struct lfsck_component *com;
117
118         cfs_list_for_each_entry(com, list, lc_link) {
119                 if (com->lc_type == type)
120                         return com;
121         }
122         return NULL;
123 }
124
125 static struct lfsck_component *
126 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
127 {
128         struct lfsck_component *com;
129
130         spin_lock(&lfsck->li_lock);
131         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
132         if (com != NULL)
133                 goto unlock;
134
135         com = __lfsck_component_find(lfsck, type,
136                                      &lfsck->li_list_double_scan);
137         if (com != NULL)
138                 goto unlock;
139
140         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
141
142 unlock:
143         if (com != NULL)
144                 lfsck_component_get(com);
145         spin_unlock(&lfsck->li_lock);
146         return com;
147 }
148
149 void lfsck_component_cleanup(const struct lu_env *env,
150                              struct lfsck_component *com)
151 {
152         if (!cfs_list_empty(&com->lc_link))
153                 cfs_list_del_init(&com->lc_link);
154         if (!cfs_list_empty(&com->lc_link_dir))
155                 cfs_list_del_init(&com->lc_link_dir);
156
157         lfsck_component_put(env, com);
158 }
159
160 static void lfsck_instance_cleanup(const struct lu_env *env,
161                                    struct lfsck_instance *lfsck)
162 {
163         struct ptlrpc_thread    *thread = &lfsck->li_thread;
164         struct lfsck_component  *com;
165         ENTRY;
166
167         LASSERT(list_empty(&lfsck->li_link));
168         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
169
170         if (lfsck->li_obj_oit != NULL) {
171                 lu_object_put(env, &lfsck->li_obj_oit->do_lu);
172                 lfsck->li_obj_oit = NULL;
173         }
174
175         LASSERT(lfsck->li_obj_dir == NULL);
176
177         while (!cfs_list_empty(&lfsck->li_list_scan)) {
178                 com = cfs_list_entry(lfsck->li_list_scan.next,
179                                      struct lfsck_component,
180                                      lc_link);
181                 lfsck_component_cleanup(env, com);
182         }
183
184         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
185
186         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
187                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
188                                      struct lfsck_component,
189                                      lc_link);
190                 lfsck_component_cleanup(env, com);
191         }
192
193         while (!cfs_list_empty(&lfsck->li_list_idle)) {
194                 com = cfs_list_entry(lfsck->li_list_idle.next,
195                                      struct lfsck_component,
196                                      lc_link);
197                 lfsck_component_cleanup(env, com);
198         }
199
200         if (lfsck->li_bookmark_obj != NULL) {
201                 lu_object_put(env, &lfsck->li_bookmark_obj->do_lu);
202                 lfsck->li_bookmark_obj = NULL;
203         }
204
205         if (lfsck->li_los != NULL) {
206                 local_oid_storage_fini(env, lfsck->li_los);
207                 lfsck->li_los = NULL;
208         }
209
210         if (lfsck->li_local_root != NULL) {
211                 lu_object_put(env, &lfsck->li_local_root->do_lu);
212                 lfsck->li_local_root = NULL;
213         }
214
215         OBD_FREE_PTR(lfsck);
216 }
217
218 static inline void lfsck_instance_get(struct lfsck_instance *lfsck)
219 {
220         atomic_inc(&lfsck->li_ref);
221 }
222
223 static inline void lfsck_instance_put(const struct lu_env *env,
224                                       struct lfsck_instance *lfsck)
225 {
226         if (atomic_dec_and_test(&lfsck->li_ref))
227                 lfsck_instance_cleanup(env, lfsck);
228 }
229
230 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
231                                                          bool ref, bool unlink)
232 {
233         struct lfsck_instance *lfsck;
234
235         spin_lock(&lfsck_instance_lock);
236         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
237                 if (lfsck->li_bottom == key) {
238                         if (ref)
239                                 lfsck_instance_get(lfsck);
240                         if (unlink)
241                                 list_del_init(&lfsck->li_link);
242                         spin_unlock(&lfsck_instance_lock);
243                         return lfsck;
244                 }
245         }
246         spin_unlock(&lfsck_instance_lock);
247         return NULL;
248 }
249
250 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
251 {
252         struct lfsck_instance *tmp;
253
254         spin_lock(&lfsck_instance_lock);
255         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
256                 if (lfsck->li_bottom == tmp->li_bottom) {
257                         spin_unlock(&lfsck_instance_lock);
258                         return -EEXIST;
259                 }
260         }
261
262         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
263         spin_unlock(&lfsck_instance_lock);
264         return 0;
265 }
266
267 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
268                     const char *prefix)
269 {
270         int save = *len;
271         int flag;
272         int rc;
273         int i;
274
275         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
276         if (rc <= 0)
277                 return -ENOSPC;
278
279         *buf += rc;
280         *len -= rc;
281         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
282                 if (flag & bits) {
283                         bits &= ~flag;
284                         rc = snprintf(*buf, *len, "%s%c", names[i],
285                                       bits != 0 ? ',' : '\n');
286                         if (rc <= 0)
287                                 return -ENOSPC;
288
289                         *buf += rc;
290                         *len -= rc;
291                 }
292         }
293         return save - *len;
294 }
295
296 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
297 {
298         int rc;
299
300         if (time != 0)
301                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
302                               cfs_time_current_sec() - time);
303         else
304                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
305         if (rc <= 0)
306                 return -ENOSPC;
307
308         *buf += rc;
309         *len -= rc;
310         return rc;
311 }
312
313 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
314                    const char *prefix)
315 {
316         int rc;
317
318         if (fid_is_zero(&pos->lp_dir_parent)) {
319                 if (pos->lp_oit_cookie == 0)
320                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
321                                       prefix);
322                 else
323                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
324                                       prefix, pos->lp_oit_cookie);
325         } else {
326                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
327                               prefix, pos->lp_oit_cookie,
328                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
329         }
330         if (rc <= 0)
331                 return -ENOSPC;
332
333         *buf += rc;
334         *len -= rc;
335         return rc;
336 }
337
338 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
339                     struct lfsck_position *pos, bool init)
340 {
341         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
342
343         spin_lock(&lfsck->li_lock);
344         if (unlikely(lfsck->li_di_oit == NULL)) {
345                 spin_unlock(&lfsck->li_lock);
346                 memset(pos, 0, sizeof(*pos));
347                 return;
348         }
349
350         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
351         if (!lfsck->li_current_oit_processed && !init)
352                 pos->lp_oit_cookie--;
353
354         LASSERT(pos->lp_oit_cookie > 0);
355
356         if (lfsck->li_di_dir != NULL) {
357                 struct dt_object *dto = lfsck->li_obj_dir;
358
359                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
360                                                         lfsck->li_di_dir);
361
362                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
363                         fid_zero(&pos->lp_dir_parent);
364                         pos->lp_dir_cookie = 0;
365                 } else {
366                         pos->lp_dir_parent = *lu_object_fid(&dto->do_lu);
367                 }
368         } else {
369                 fid_zero(&pos->lp_dir_parent);
370                 pos->lp_dir_cookie = 0;
371         }
372         spin_unlock(&lfsck->li_lock);
373 }
374
375 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
376 {
377         lfsck->li_bookmark_ram.lb_speed_limit = limit;
378         if (limit != LFSCK_SPEED_NO_LIMIT) {
379                 if (limit > CFS_HZ) {
380                         lfsck->li_sleep_rate = limit / CFS_HZ;
381                         lfsck->li_sleep_jif = 1;
382                 } else {
383                         lfsck->li_sleep_rate = 1;
384                         lfsck->li_sleep_jif = CFS_HZ / limit;
385                 }
386         } else {
387                 lfsck->li_sleep_jif = 0;
388                 lfsck->li_sleep_rate = 0;
389         }
390 }
391
392 void lfsck_control_speed(struct lfsck_instance *lfsck)
393 {
394         struct ptlrpc_thread *thread = &lfsck->li_thread;
395         struct l_wait_info    lwi;
396
397         if (lfsck->li_sleep_jif > 0 &&
398             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
399                 spin_lock(&lfsck->li_lock);
400                 if (likely(lfsck->li_sleep_jif > 0 &&
401                            lfsck->li_new_scanned >= lfsck->li_sleep_rate)) {
402                         lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
403                                                LWI_ON_SIGNAL_NOOP, NULL);
404                         spin_unlock(&lfsck->li_lock);
405
406                         l_wait_event(thread->t_ctl_waitq,
407                                      !thread_is_running(thread),
408                                      &lwi);
409                         lfsck->li_new_scanned = 0;
410                 } else {
411                         spin_unlock(&lfsck->li_lock);
412                 }
413         }
414 }
415
416 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
417                             struct lu_fid *fid)
418 {
419         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
420                      !dt_try_as_dir(env, obj)))
421                 return -ENOTDIR;
422
423         return dt_lookup(env, obj, (struct dt_rec *)fid,
424                          (const struct dt_key *)"..", BYPASS_CAPA);
425 }
426
427 static int lfsck_needs_scan_dir(const struct lu_env *env,
428                                 struct lfsck_instance *lfsck,
429                                 struct dt_object *obj)
430 {
431         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
432         int            depth = 0;
433         int            rc;
434
435         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
436             cfs_list_empty(&lfsck->li_list_dir))
437                RETURN(0);
438
439         while (1) {
440                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
441                  *      which is the agent directory to manage the objects
442                  *      which name entries reside on remote MDTs. Related
443                  *      consistency verification will be processed in LFSCK
444                  *      phase III. */
445                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
446                         if (depth > 0)
447                                 lfsck_object_put(env, obj);
448                         return 1;
449                 }
450
451                 /* .lustre doesn't contain "real" user objects, no need lfsck */
452                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
453                         if (depth > 0)
454                                 lfsck_object_put(env, obj);
455                         return 0;
456                 }
457
458                 dt_read_lock(env, obj, MOR_TGT_CHILD);
459                 if (unlikely(lfsck_is_dead_obj(obj))) {
460                         dt_read_unlock(env, obj);
461                         if (depth > 0)
462                                 lfsck_object_put(env, obj);
463                         return 0;
464                 }
465
466                 rc = dt_xattr_get(env, obj,
467                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
468                                   BYPASS_CAPA);
469                 dt_read_unlock(env, obj);
470                 if (rc >= 0) {
471                         if (depth > 0)
472                                 lfsck_object_put(env, obj);
473                         return 1;
474                 }
475
476                 if (rc < 0 && rc != -ENODATA) {
477                         if (depth > 0)
478                                 lfsck_object_put(env, obj);
479                         return rc;
480                 }
481
482                 rc = lfsck_parent_fid(env, obj, fid);
483                 if (depth > 0)
484                         lfsck_object_put(env, obj);
485                 if (rc != 0)
486                         return rc;
487
488                 if (unlikely(lu_fid_eq(fid,
489                                        lfsck_dto2fid(lfsck->li_local_root))))
490                         return 0;
491
492                 obj = lfsck_object_find(env, lfsck, fid);
493                 if (obj == NULL)
494                         return 0;
495                 else if (IS_ERR(obj))
496                         return PTR_ERR(obj);
497
498                 if (!dt_object_exists(obj)) {
499                         lfsck_object_put(env, obj);
500                         return 0;
501                 }
502
503                 /* Currently, only client visible directory can be remote. */
504                 if (dt_object_remote(obj)) {
505                         lfsck_object_put(env, obj);
506                         return 1;
507                 }
508
509                 depth++;
510         }
511         return 0;
512 }
513
514 /* LFSCK wrap functions */
515
516 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
517                 bool new_checked)
518 {
519         struct lfsck_component *com;
520
521         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
522                 com->lc_ops->lfsck_fail(env, com, new_checked);
523         }
524 }
525
526 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
527 {
528         struct lfsck_component *com;
529         int                     rc;
530
531         if (likely(cfs_time_beforeq(cfs_time_current(),
532                                     lfsck->li_time_next_checkpoint)))
533                 return 0;
534
535         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
536         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
537                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
538                 if (rc != 0)
539                         return rc;;
540         }
541
542         lfsck->li_time_last_checkpoint = cfs_time_current();
543         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
544                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
545         return 0;
546 }
547
548 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
549 {
550         struct dt_object       *obj     = NULL;
551         struct lfsck_component *com;
552         struct lfsck_component *next;
553         struct lfsck_position  *pos     = NULL;
554         const struct dt_it_ops *iops    =
555                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
556         struct dt_it           *di;
557         int                     rc;
558         ENTRY;
559
560         LASSERT(lfsck->li_obj_dir == NULL);
561         LASSERT(lfsck->li_di_dir == NULL);
562
563         lfsck->li_current_oit_processed = 0;
564         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
565                 com->lc_new_checked = 0;
566                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
567                         com->lc_journal = 0;
568
569                 rc = com->lc_ops->lfsck_prep(env, com);
570                 if (rc != 0)
571                         RETURN(rc);
572
573                 if ((pos == NULL) ||
574                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
575                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
576                         pos = &com->lc_pos_start;
577         }
578
579         /* Init otable-based iterator. */
580         if (pos == NULL) {
581                 rc = iops->load(env, lfsck->li_di_oit, 0);
582                 if (rc > 0) {
583                         lfsck->li_oit_over = 1;
584                         rc = 0;
585                 }
586
587                 GOTO(out, rc);
588         }
589
590         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
591         if (rc < 0)
592                 GOTO(out, rc);
593         else if (rc > 0)
594                 lfsck->li_oit_over = 1;
595
596         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
597                 GOTO(out, rc = 0);
598
599         /* Find the directory for namespace-based traverse. */
600         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
601         if (obj == NULL)
602                 GOTO(out, rc = 0);
603         else if (IS_ERR(obj))
604                 RETURN(PTR_ERR(obj));
605
606         /* XXX: Currently, skip remote object, the consistency for
607          *      remote object will be processed in LFSCK phase III. */
608         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
609             unlikely(!S_ISDIR(lfsck_object_type(obj))))
610                 GOTO(out, rc = 0);
611
612         if (unlikely(!dt_try_as_dir(env, obj)))
613                 GOTO(out, rc = -ENOTDIR);
614
615         /* Init the namespace-based directory traverse. */
616         iops = &obj->do_index_ops->dio_it;
617         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
618         if (IS_ERR(di))
619                 GOTO(out, rc = PTR_ERR(di));
620
621         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
622
623         rc = iops->load(env, di, pos->lp_dir_cookie);
624         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
625                 rc = iops->next(env, di);
626         else if (rc > 0)
627                 rc = 0;
628
629         if (rc != 0) {
630                 iops->put(env, di);
631                 iops->fini(env, di);
632                 GOTO(out, rc);
633         }
634
635         lfsck->li_obj_dir = lfsck_object_get(obj);
636         spin_lock(&lfsck->li_lock);
637         lfsck->li_di_dir = di;
638         spin_unlock(&lfsck->li_lock);
639
640         GOTO(out, rc = 0);
641
642 out:
643         if (obj != NULL)
644                 lfsck_object_put(env, obj);
645
646         if (rc < 0) {
647                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
648                                              lc_link)
649                         com->lc_ops->lfsck_post(env, com, rc, true);
650
651                 return rc;
652         }
653
654         rc = 0;
655         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
656         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
657                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
658                 if (rc != 0)
659                         break;
660         }
661
662         lfsck->li_time_last_checkpoint = cfs_time_current();
663         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
664                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
665         return rc;
666 }
667
668 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
669                    struct dt_object *obj)
670 {
671         struct lfsck_component *com;
672         const struct dt_it_ops *iops;
673         struct dt_it           *di;
674         int                     rc;
675         ENTRY;
676
677         LASSERT(lfsck->li_obj_dir == NULL);
678
679         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
680                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
681                 if (rc != 0)
682                         RETURN(rc);
683         }
684
685         rc = lfsck_needs_scan_dir(env, lfsck, obj);
686         if (rc <= 0)
687                 GOTO(out, rc);
688
689         if (unlikely(!dt_try_as_dir(env, obj)))
690                 GOTO(out, rc = -ENOTDIR);
691
692         iops = &obj->do_index_ops->dio_it;
693         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
694         if (IS_ERR(di))
695                 GOTO(out, rc = PTR_ERR(di));
696
697         rc = iops->load(env, di, 0);
698         if (rc == 0)
699                 rc = iops->next(env, di);
700         else if (rc > 0)
701                 rc = 0;
702
703         if (rc != 0) {
704                 iops->put(env, di);
705                 iops->fini(env, di);
706                 GOTO(out, rc);
707         }
708
709         lfsck->li_obj_dir = lfsck_object_get(obj);
710         spin_lock(&lfsck->li_lock);
711         lfsck->li_di_dir = di;
712         spin_unlock(&lfsck->li_lock);
713
714         GOTO(out, rc = 0);
715
716 out:
717         if (rc < 0)
718                 lfsck_fail(env, lfsck, false);
719         return (rc > 0 ? 0 : rc);
720 }
721
722 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
723                    struct dt_object *obj, struct lu_dirent *ent)
724 {
725         struct lfsck_component *com;
726         int                     rc;
727
728         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
729                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
730                 if (rc != 0)
731                         return rc;
732         }
733         return 0;
734 }
735
736 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
737                int result)
738 {
739         struct lfsck_component *com;
740         struct lfsck_component *next;
741         int                     rc;
742
743         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
744         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
745                 rc = com->lc_ops->lfsck_post(env, com, result, false);
746                 if (rc != 0)
747                         return rc;
748         }
749
750         lfsck->li_time_last_checkpoint = cfs_time_current();
751         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
752                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
753         return result;
754 }
755
756 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
757 {
758         struct lfsck_component *com;
759         struct lfsck_component *next;
760         int                     rc;
761
762         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
763                                      lc_link) {
764                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
765                         com->lc_journal = 0;
766
767                 rc = com->lc_ops->lfsck_double_scan(env, com);
768                 if (rc != 0)
769                         return rc;
770         }
771         return 0;
772 }
773
774 /* external interfaces */
775
776 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
777 {
778         struct lu_env           env;
779         struct lfsck_instance  *lfsck;
780         int                     rc;
781         ENTRY;
782
783         lfsck = lfsck_instance_find(key, true, false);
784         if (unlikely(lfsck == NULL))
785                 RETURN(-ENODEV);
786
787         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
788         if (rc != 0)
789                 GOTO(out, rc);
790
791         rc = snprintf(buf, len, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
792         lu_env_fini(&env);
793
794         GOTO(out, rc);
795
796 out:
797         lfsck_instance_put(&env, lfsck);
798         return rc;
799 }
800 EXPORT_SYMBOL(lfsck_get_speed);
801
802 int lfsck_set_speed(struct dt_device *key, int val)
803 {
804         struct lu_env           env;
805         struct lfsck_instance  *lfsck;
806         int                     rc;
807         ENTRY;
808
809         lfsck = lfsck_instance_find(key, true, false);
810         if (unlikely(lfsck == NULL))
811                 RETURN(-ENODEV);
812
813         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
814         if (rc != 0)
815                 GOTO(out, rc);
816
817         mutex_lock(&lfsck->li_mutex);
818         __lfsck_set_speed(lfsck, val);
819         rc = lfsck_bookmark_store(&env, lfsck);
820         mutex_unlock(&lfsck->li_mutex);
821         lu_env_fini(&env);
822
823         GOTO(out, rc);
824
825 out:
826         lfsck_instance_put(&env, lfsck);
827         return rc;
828 }
829 EXPORT_SYMBOL(lfsck_set_speed);
830
831 int lfsck_dump(struct dt_device *key, void *buf, int len, __u16 type)
832 {
833         struct lu_env           env;
834         struct lfsck_instance  *lfsck;
835         struct lfsck_component *com   = NULL;
836         int                     rc;
837         ENTRY;
838
839         lfsck = lfsck_instance_find(key, true, false);
840         if (unlikely(lfsck == NULL))
841                 RETURN(-ENODEV);
842
843         com = lfsck_component_find(lfsck, type);
844         if (com == NULL)
845                 GOTO(out, rc = -ENOTSUPP);
846
847         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
848         if (rc != 0)
849                 GOTO(out, rc);
850
851         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
852         lu_env_fini(&env);
853
854         GOTO(out, rc);
855
856 out:
857         if (com != NULL)
858                 lfsck_component_put(&env, com);
859         lfsck_instance_put(&env, lfsck);
860         return rc;
861 }
862 EXPORT_SYMBOL(lfsck_dump);
863
864 int lfsck_start(const struct lu_env *env, struct dt_device *key,
865                 struct lfsck_start_param *lsp)
866 {
867         struct lfsck_start     *start  = lsp->lsp_start;
868         struct lfsck_instance  *lfsck;
869         struct lfsck_bookmark  *bk;
870         struct ptlrpc_thread   *thread;
871         struct lfsck_component *com;
872         struct l_wait_info      lwi    = { 0 };
873         bool                    dirty  = false;
874         int                     rc     = 0;
875         __u16                   valid  = 0;
876         __u16                   flags  = 0;
877         ENTRY;
878
879         lfsck = lfsck_instance_find(key, true, false);
880         if (unlikely(lfsck == NULL))
881                 RETURN(-ENODEV);
882
883         /* start == NULL means auto trigger paused LFSCK. */
884         if ((start == NULL) &&
885             (cfs_list_empty(&lfsck->li_list_scan) ||
886              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
887                 GOTO(put, rc = 0);
888
889         bk = &lfsck->li_bookmark_ram;
890         thread = &lfsck->li_thread;
891         mutex_lock(&lfsck->li_mutex);
892         spin_lock(&lfsck->li_lock);
893         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
894                 spin_unlock(&lfsck->li_lock);
895                 GOTO(out, rc = -EALREADY);
896         }
897
898         spin_unlock(&lfsck->li_lock);
899
900         lfsck->li_namespace = lsp->lsp_namespace;
901         lfsck->li_paused = 0;
902         lfsck->li_oit_over = 0;
903         lfsck->li_drop_dryrun = 0;
904         lfsck->li_new_scanned = 0;
905
906         /* For auto trigger. */
907         if (start == NULL)
908                 goto trigger;
909
910         start->ls_version = bk->lb_version;
911         if (start->ls_valid & LSV_SPEED_LIMIT) {
912                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
913                 dirty = true;
914         }
915
916         if (start->ls_valid & LSV_ERROR_HANDLE) {
917                 valid |= DOIV_ERROR_HANDLE;
918                 if (start->ls_flags & LPF_FAILOUT)
919                         flags |= DOIF_FAILOUT;
920
921                 if ((start->ls_flags & LPF_FAILOUT) &&
922                     !(bk->lb_param & LPF_FAILOUT)) {
923                         bk->lb_param |= LPF_FAILOUT;
924                         dirty = true;
925                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
926                            (bk->lb_param & LPF_FAILOUT)) {
927                         bk->lb_param &= ~LPF_FAILOUT;
928                         dirty = true;
929                 }
930         }
931
932         if (start->ls_valid & LSV_DRYRUN) {
933                 if ((start->ls_flags & LPF_DRYRUN) &&
934                     !(bk->lb_param & LPF_DRYRUN)) {
935                         bk->lb_param |= LPF_DRYRUN;
936                         dirty = true;
937                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
938                            (bk->lb_param & LPF_DRYRUN)) {
939                         bk->lb_param &= ~LPF_DRYRUN;
940                         lfsck->li_drop_dryrun = 1;
941                         dirty = true;
942                 }
943         }
944
945         if (dirty) {
946                 rc = lfsck_bookmark_store(env, lfsck);
947                 if (rc != 0)
948                         GOTO(out, rc);
949         }
950
951         if (start->ls_flags & LPF_RESET)
952                 flags |= DOIF_RESET;
953
954         if (start->ls_active != 0) {
955                 struct lfsck_component *next;
956                 __u16 type = 1;
957
958                 if (start->ls_active == LFSCK_TYPES_ALL)
959                         start->ls_active = LFSCK_TYPES_SUPPORTED;
960
961                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
962                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
963                         GOTO(out, rc = -ENOTSUPP);
964                 }
965
966                 cfs_list_for_each_entry_safe(com, next,
967                                              &lfsck->li_list_scan, lc_link) {
968                         if (!(com->lc_type & start->ls_active)) {
969                                 rc = com->lc_ops->lfsck_post(env, com, 0,
970                                                              false);
971                                 if (rc != 0)
972                                         GOTO(out, rc);
973                         }
974                 }
975
976                 while (start->ls_active != 0) {
977                         if (type & start->ls_active) {
978                                 com = __lfsck_component_find(lfsck, type,
979                                                         &lfsck->li_list_idle);
980                                 if (com != NULL) {
981                                         /* The component status will be updated
982                                          * when its prep() is called later by
983                                          * the LFSCK main engine. */
984                                         cfs_list_del_init(&com->lc_link);
985                                         cfs_list_add_tail(&com->lc_link,
986                                                           &lfsck->li_list_scan);
987                                 }
988                                 start->ls_active &= ~type;
989                         }
990                         type <<= 1;
991                 }
992         }
993
994         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
995                 start->ls_active |= com->lc_type;
996                 if (flags & DOIF_RESET) {
997                         rc = com->lc_ops->lfsck_reset(env, com, false);
998                         if (rc != 0)
999                                 GOTO(out, rc);
1000                 }
1001         }
1002
1003 trigger:
1004         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1005         if (bk->lb_param & LPF_DRYRUN)
1006                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1007
1008         if (bk->lb_param & LPF_FAILOUT) {
1009                 valid |= DOIV_ERROR_HANDLE;
1010                 flags |= DOIF_FAILOUT;
1011         }
1012
1013         if (!cfs_list_empty(&lfsck->li_list_scan))
1014                 flags |= DOIF_OUTUSED;
1015
1016         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1017         thread_set_flags(thread, 0);
1018         if (lfsck->li_master)
1019                 rc = cfs_create_thread(lfsck_master_engine, lfsck, 0);
1020         if (rc < 0)
1021                 CERROR("%s: cannot start LFSCK thread, rc = %d\n",
1022                        lfsck_lfsck2name(lfsck), rc);
1023         else
1024                 l_wait_event(thread->t_ctl_waitq,
1025                              thread_is_running(thread) ||
1026                              thread_is_stopped(thread),
1027                              &lwi);
1028
1029         GOTO(out, rc = 0);
1030
1031 out:
1032         mutex_unlock(&lfsck->li_mutex);
1033 put:
1034         lfsck_instance_put(env, lfsck);
1035         return (rc < 0 ? rc : 0);
1036 }
1037 EXPORT_SYMBOL(lfsck_start);
1038
1039 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1040 {
1041         struct lfsck_instance   *lfsck;
1042         struct ptlrpc_thread    *thread;
1043         struct l_wait_info       lwi    = { 0 };
1044         ENTRY;
1045
1046         lfsck = lfsck_instance_find(key, true, false);
1047         if (unlikely(lfsck == NULL))
1048                 RETURN(-ENODEV);
1049
1050         thread = &lfsck->li_thread;
1051         mutex_lock(&lfsck->li_mutex);
1052         spin_lock(&lfsck->li_lock);
1053         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1054                 spin_unlock(&lfsck->li_lock);
1055                 mutex_unlock(&lfsck->li_mutex);
1056                 lfsck_instance_put(env, lfsck);
1057                 RETURN(-EALREADY);
1058         }
1059
1060         if (pause)
1061                 lfsck->li_paused = 1;
1062         thread_set_flags(thread, SVC_STOPPING);
1063         /* The LFSCK thread may be sleeping on low layer wait queue,
1064          * wake it up. */
1065         if (likely(lfsck->li_di_oit != NULL))
1066                 lfsck->li_obj_oit->do_index_ops->dio_it.put(env,
1067                                                             lfsck->li_di_oit);
1068         spin_unlock(&lfsck->li_lock);
1069
1070         cfs_waitq_broadcast(&thread->t_ctl_waitq);
1071         l_wait_event(thread->t_ctl_waitq,
1072                      thread_is_stopped(thread),
1073                      &lwi);
1074         mutex_unlock(&lfsck->li_mutex);
1075         lfsck_instance_put(env, lfsck);
1076
1077         RETURN(0);
1078 }
1079 EXPORT_SYMBOL(lfsck_stop);
1080
1081 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1082                    struct dt_device *next, bool master)
1083 {
1084         struct lfsck_instance   *lfsck;
1085         struct dt_object        *root;
1086         struct dt_object        *obj;
1087         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1088         int                      rc;
1089         ENTRY;
1090
1091         lfsck = lfsck_instance_find(key, false, false);
1092         if (unlikely(lfsck != NULL))
1093                 RETURN(-EEXIST);
1094
1095         OBD_ALLOC_PTR(lfsck);
1096         if (lfsck == NULL)
1097                 RETURN(-ENOMEM);
1098
1099         mutex_init(&lfsck->li_mutex);
1100         spin_lock_init(&lfsck->li_lock);
1101         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1102         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1103         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1104         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1105         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1106         atomic_set(&lfsck->li_ref, 1);
1107         cfs_waitq_init(&lfsck->li_thread.t_ctl_waitq);
1108         lfsck->li_next = next;
1109         lfsck->li_bottom = key;
1110
1111         fid->f_seq = FID_SEQ_LOCAL_NAME;
1112         fid->f_oid = 1;
1113         fid->f_ver = 0;
1114         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1115         if (rc != 0)
1116                 GOTO(out, rc);
1117
1118         rc = dt_root_get(env, key, fid);
1119         if (rc != 0)
1120                 GOTO(out, rc);
1121
1122         root = dt_locate(env, lfsck->li_bottom, fid);
1123         if (IS_ERR(root))
1124                 GOTO(out, rc = PTR_ERR(root));
1125
1126         lfsck->li_local_root = root;
1127         dt_try_as_dir(env, root);
1128         if (master) {
1129                 lfsck->li_master = 1;
1130                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1131                         rc = dt_lookup(env, root,
1132                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1133                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1134                         if (rc != 0)
1135                                 GOTO(out, rc);
1136                 }
1137         }
1138
1139         fid->f_seq = FID_SEQ_LOCAL_FILE;
1140         fid->f_oid = OTABLE_IT_OID;
1141         fid->f_ver = 0;
1142         obj = dt_locate(env, lfsck->li_bottom, fid);
1143         if (IS_ERR(obj))
1144                 GOTO(out, rc = PTR_ERR(obj));
1145
1146         lfsck->li_obj_oit = obj;
1147         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1148         if (rc != 0) {
1149                 if (rc == -ENOTSUPP)
1150                         GOTO(add, rc = 0);
1151
1152                 GOTO(out, rc);
1153         }
1154
1155         rc = lfsck_bookmark_setup(env, lfsck);
1156         if (rc != 0)
1157                 GOTO(out, rc);
1158
1159         if (master) {
1160                 rc = lfsck_namespace_setup(env, lfsck);
1161                 if (rc < 0)
1162                         GOTO(out, rc);
1163         }
1164
1165         /* XXX: more LFSCK components initialization to be added here. */
1166
1167 add:
1168         rc = lfsck_instance_add(lfsck);
1169 out:
1170         if (rc != 0)
1171                 lfsck_instance_cleanup(env, lfsck);
1172         return rc;
1173 }
1174 EXPORT_SYMBOL(lfsck_register);
1175
1176 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1177 {
1178         struct lfsck_instance *lfsck;
1179
1180         lfsck = lfsck_instance_find(key, false, true);
1181         if (lfsck != NULL)
1182                 lfsck_instance_put(env, lfsck);
1183 }
1184 EXPORT_SYMBOL(lfsck_degister);
1185
1186 static int __init lfsck_init(void)
1187 {
1188         int rc;
1189
1190         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1191         rc = lu_context_key_register(&lfsck_thread_key);
1192         return rc;
1193 }
1194
1195 static void __exit lfsck_exit(void)
1196 {
1197         LASSERT(cfs_list_empty(&lfsck_instance_list));
1198
1199         lu_context_key_degister(&lfsck_thread_key);
1200 }
1201
1202 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1203 MODULE_DESCRIPTION("LFSCK");
1204 MODULE_LICENSE("GPL");
1205
1206 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);