Whamcloud - gitweb
LU-3335 scrub: control OI scrub on OST from user space
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         OBD_FREE_PTR(info);
55 }
56
57 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
58 LU_KEY_INIT_GENERIC(lfsck);
59
60 static CFS_LIST_HEAD(lfsck_instance_list);
61 static DEFINE_SPINLOCK(lfsck_instance_lock);
62
63 const char *lfsck_status_names[] = {
64         "init",
65         "scanning-phase1",
66         "scanning-phase2",
67         "completed",
68         "failed",
69         "stopped",
70         "paused",
71         "crashed",
72         NULL
73 };
74
75 const char *lfsck_flags_names[] = {
76         "scanned-once",
77         "inconsistent",
78         "upgrade",
79         NULL
80 };
81
82 const char *lfsck_param_names[] = {
83         "failout",
84         "dryrun",
85         NULL
86 };
87
88 static inline mdsno_t lfsck_dev_idx(struct dt_device *dev)
89 {
90         return dev->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
91 }
92
93 static inline void lfsck_component_get(struct lfsck_component *com)
94 {
95         atomic_inc(&com->lc_ref);
96 }
97
98 static inline void lfsck_component_put(const struct lu_env *env,
99                                        struct lfsck_component *com)
100 {
101         if (atomic_dec_and_test(&com->lc_ref)) {
102                 if (com->lc_obj != NULL)
103                         lu_object_put(env, &com->lc_obj->do_lu);
104                 if (com->lc_file_ram != NULL)
105                         OBD_FREE(com->lc_file_ram, com->lc_file_size);
106                 if (com->lc_file_disk != NULL)
107                         OBD_FREE(com->lc_file_disk, com->lc_file_size);
108                 OBD_FREE_PTR(com);
109         }
110 }
111
112 static inline struct lfsck_component *
113 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
114 {
115         struct lfsck_component *com;
116
117         cfs_list_for_each_entry(com, list, lc_link) {
118                 if (com->lc_type == type)
119                         return com;
120         }
121         return NULL;
122 }
123
124 static struct lfsck_component *
125 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
126 {
127         struct lfsck_component *com;
128
129         spin_lock(&lfsck->li_lock);
130         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
131         if (com != NULL)
132                 goto unlock;
133
134         com = __lfsck_component_find(lfsck, type,
135                                      &lfsck->li_list_double_scan);
136         if (com != NULL)
137                 goto unlock;
138
139         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
140
141 unlock:
142         if (com != NULL)
143                 lfsck_component_get(com);
144         spin_unlock(&lfsck->li_lock);
145         return com;
146 }
147
148 void lfsck_component_cleanup(const struct lu_env *env,
149                              struct lfsck_component *com)
150 {
151         if (!cfs_list_empty(&com->lc_link))
152                 cfs_list_del_init(&com->lc_link);
153         if (!cfs_list_empty(&com->lc_link_dir))
154                 cfs_list_del_init(&com->lc_link_dir);
155
156         lfsck_component_put(env, com);
157 }
158
159 static void lfsck_instance_cleanup(const struct lu_env *env,
160                                    struct lfsck_instance *lfsck)
161 {
162         struct ptlrpc_thread    *thread = &lfsck->li_thread;
163         struct lfsck_component  *com;
164         ENTRY;
165
166         LASSERT(list_empty(&lfsck->li_link));
167         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
168
169         if (lfsck->li_obj_oit != NULL) {
170                 lu_object_put(env, &lfsck->li_obj_oit->do_lu);
171                 lfsck->li_obj_oit = NULL;
172         }
173
174         LASSERT(lfsck->li_obj_dir == NULL);
175
176         while (!cfs_list_empty(&lfsck->li_list_scan)) {
177                 com = cfs_list_entry(lfsck->li_list_scan.next,
178                                      struct lfsck_component,
179                                      lc_link);
180                 lfsck_component_cleanup(env, com);
181         }
182
183         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
184
185         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
186                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
187                                      struct lfsck_component,
188                                      lc_link);
189                 lfsck_component_cleanup(env, com);
190         }
191
192         while (!cfs_list_empty(&lfsck->li_list_idle)) {
193                 com = cfs_list_entry(lfsck->li_list_idle.next,
194                                      struct lfsck_component,
195                                      lc_link);
196                 lfsck_component_cleanup(env, com);
197         }
198
199         if (lfsck->li_bookmark_obj != NULL) {
200                 lu_object_put(env, &lfsck->li_bookmark_obj->do_lu);
201                 lfsck->li_bookmark_obj = NULL;
202         }
203
204         if (lfsck->li_los != NULL) {
205                 local_oid_storage_fini(env, lfsck->li_los);
206                 lfsck->li_los = NULL;
207         }
208
209         if (lfsck->li_local_root != NULL) {
210                 lu_object_put(env, &lfsck->li_local_root->do_lu);
211                 lfsck->li_local_root = NULL;
212         }
213
214         OBD_FREE_PTR(lfsck);
215 }
216
217 static inline void lfsck_instance_get(struct lfsck_instance *lfsck)
218 {
219         atomic_inc(&lfsck->li_ref);
220 }
221
222 static inline void lfsck_instance_put(const struct lu_env *env,
223                                       struct lfsck_instance *lfsck)
224 {
225         if (atomic_dec_and_test(&lfsck->li_ref))
226                 lfsck_instance_cleanup(env, lfsck);
227 }
228
229 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
230                                                          bool ref, bool unlink)
231 {
232         struct lfsck_instance *lfsck;
233
234         spin_lock(&lfsck_instance_lock);
235         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
236                 if (lfsck->li_bottom == key) {
237                         if (ref)
238                                 lfsck_instance_get(lfsck);
239                         if (unlink)
240                                 list_del_init(&lfsck->li_link);
241                         spin_unlock(&lfsck_instance_lock);
242                         return lfsck;
243                 }
244         }
245         spin_unlock(&lfsck_instance_lock);
246         return NULL;
247 }
248
249 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
250 {
251         struct lfsck_instance *tmp;
252
253         spin_lock(&lfsck_instance_lock);
254         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
255                 if (lfsck->li_bottom == tmp->li_bottom) {
256                         spin_unlock(&lfsck_instance_lock);
257                         return -EEXIST;
258                 }
259         }
260
261         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
262         spin_unlock(&lfsck_instance_lock);
263         return 0;
264 }
265
266 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
267                     const char *prefix)
268 {
269         int save = *len;
270         int flag;
271         int rc;
272         int i;
273
274         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
275         if (rc <= 0)
276                 return -ENOSPC;
277
278         *buf += rc;
279         *len -= rc;
280         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
281                 if (flag & bits) {
282                         bits &= ~flag;
283                         rc = snprintf(*buf, *len, "%s%c", names[i],
284                                       bits != 0 ? ',' : '\n');
285                         if (rc <= 0)
286                                 return -ENOSPC;
287
288                         *buf += rc;
289                         *len -= rc;
290                 }
291         }
292         return save - *len;
293 }
294
295 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
296 {
297         int rc;
298
299         if (time != 0)
300                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
301                               cfs_time_current_sec() - time);
302         else
303                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
304         if (rc <= 0)
305                 return -ENOSPC;
306
307         *buf += rc;
308         *len -= rc;
309         return rc;
310 }
311
312 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
313                    const char *prefix)
314 {
315         int rc;
316
317         if (fid_is_zero(&pos->lp_dir_parent)) {
318                 if (pos->lp_oit_cookie == 0)
319                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
320                                       prefix);
321                 else
322                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
323                                       prefix, pos->lp_oit_cookie);
324         } else {
325                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
326                               prefix, pos->lp_oit_cookie,
327                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
328         }
329         if (rc <= 0)
330                 return -ENOSPC;
331
332         *buf += rc;
333         *len -= rc;
334         return rc;
335 }
336
337 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
338                     struct lfsck_position *pos, bool init)
339 {
340         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
341
342         if (unlikely(lfsck->li_di_oit == NULL)) {
343                 memset(pos, 0, sizeof(*pos));
344                 return;
345         }
346
347         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
348         if (!lfsck->li_current_oit_processed && !init)
349                 pos->lp_oit_cookie--;
350
351         LASSERT(pos->lp_oit_cookie > 0);
352
353         if (lfsck->li_di_dir != NULL) {
354                 struct dt_object *dto = lfsck->li_obj_dir;
355
356                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
357                                                         lfsck->li_di_dir);
358
359                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
360                         fid_zero(&pos->lp_dir_parent);
361                         pos->lp_dir_cookie = 0;
362                 } else {
363                         pos->lp_dir_parent = *lu_object_fid(&dto->do_lu);
364                 }
365         } else {
366                 fid_zero(&pos->lp_dir_parent);
367                 pos->lp_dir_cookie = 0;
368         }
369 }
370
371 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
372 {
373         lfsck->li_bookmark_ram.lb_speed_limit = limit;
374         if (limit != LFSCK_SPEED_NO_LIMIT) {
375                 if (limit > CFS_HZ) {
376                         lfsck->li_sleep_rate = limit / CFS_HZ;
377                         lfsck->li_sleep_jif = 1;
378                 } else {
379                         lfsck->li_sleep_rate = 1;
380                         lfsck->li_sleep_jif = CFS_HZ / limit;
381                 }
382         } else {
383                 lfsck->li_sleep_jif = 0;
384                 lfsck->li_sleep_rate = 0;
385         }
386 }
387
388 void lfsck_control_speed(struct lfsck_instance *lfsck)
389 {
390         struct ptlrpc_thread *thread = &lfsck->li_thread;
391         struct l_wait_info    lwi;
392
393         if (lfsck->li_sleep_jif > 0 &&
394             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
395                 spin_lock(&lfsck->li_lock);
396                 if (likely(lfsck->li_sleep_jif > 0 &&
397                            lfsck->li_new_scanned >= lfsck->li_sleep_rate)) {
398                         lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
399                                                LWI_ON_SIGNAL_NOOP, NULL);
400                         spin_unlock(&lfsck->li_lock);
401
402                         l_wait_event(thread->t_ctl_waitq,
403                                      !thread_is_running(thread),
404                                      &lwi);
405                         lfsck->li_new_scanned = 0;
406                 } else {
407                         spin_unlock(&lfsck->li_lock);
408                 }
409         }
410 }
411
412 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
413                             struct lu_fid *fid)
414 {
415         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
416                      !dt_try_as_dir(env, obj)))
417                 return -ENOTDIR;
418
419         return dt_lookup(env, obj, (struct dt_rec *)fid,
420                          (const struct dt_key *)"..", BYPASS_CAPA);
421 }
422
423 static int lfsck_needs_scan_dir(const struct lu_env *env,
424                                 struct lfsck_instance *lfsck,
425                                 struct dt_object *obj)
426 {
427         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
428         int            depth = 0;
429         int            rc;
430
431         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
432             cfs_list_empty(&lfsck->li_list_dir))
433                RETURN(0);
434
435         while (1) {
436                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
437                  *      which is the agent directory to manage the objects
438                  *      which name entries reside on remote MDTs. Related
439                  *      consistency verification will be processed in LFSCK
440                  *      phase III. */
441                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
442                         if (depth > 0)
443                                 lfsck_object_put(env, obj);
444                         return 1;
445                 }
446
447                 /* .lustre doesn't contain "real" user objects, no need lfsck */
448                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
449                         if (depth > 0)
450                                 lfsck_object_put(env, obj);
451                         return 0;
452                 }
453
454                 dt_read_lock(env, obj, MOR_TGT_CHILD);
455                 if (unlikely(lfsck_is_dead_obj(obj))) {
456                         dt_read_unlock(env, obj);
457                         if (depth > 0)
458                                 lfsck_object_put(env, obj);
459                         return 0;
460                 }
461
462                 rc = dt_xattr_get(env, obj,
463                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
464                                   BYPASS_CAPA);
465                 dt_read_unlock(env, obj);
466                 if (rc >= 0) {
467                         if (depth > 0)
468                                 lfsck_object_put(env, obj);
469                         return 1;
470                 }
471
472                 if (rc < 0 && rc != -ENODATA) {
473                         if (depth > 0)
474                                 lfsck_object_put(env, obj);
475                         return rc;
476                 }
477
478                 rc = lfsck_parent_fid(env, obj, fid);
479                 if (depth > 0)
480                         lfsck_object_put(env, obj);
481                 if (rc != 0)
482                         return rc;
483
484                 if (unlikely(lu_fid_eq(fid,
485                                        lfsck_dto2fid(lfsck->li_local_root))))
486                         return 0;
487
488                 obj = lfsck_object_find(env, lfsck, fid);
489                 if (obj == NULL)
490                         return 0;
491                 else if (IS_ERR(obj))
492                         return PTR_ERR(obj);
493
494                 if (!dt_object_exists(obj)) {
495                         lfsck_object_put(env, obj);
496                         return 0;
497                 }
498
499                 /* Currently, only client visible directory can be remote. */
500                 if (dt_object_remote(obj)) {
501                         lfsck_object_put(env, obj);
502                         return 1;
503                 }
504
505                 depth++;
506         }
507         return 0;
508 }
509
510 /* LFSCK wrap functions */
511
512 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
513                 bool new_checked)
514 {
515         struct lfsck_component *com;
516
517         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
518                 com->lc_ops->lfsck_fail(env, com, new_checked);
519         }
520 }
521
522 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
523 {
524         struct lfsck_component *com;
525         int                     rc;
526
527         if (likely(cfs_time_beforeq(cfs_time_current(),
528                                     lfsck->li_time_next_checkpoint)))
529                 return 0;
530
531         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
532         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
533                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
534                 if (rc != 0)
535                         return rc;;
536         }
537
538         lfsck->li_time_last_checkpoint = cfs_time_current();
539         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
540                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
541         return 0;
542 }
543
544 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
545 {
546         struct dt_object       *obj     = NULL;
547         struct lfsck_component *com;
548         struct lfsck_component *next;
549         struct lfsck_position  *pos     = NULL;
550         const struct dt_it_ops *iops    =
551                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
552         struct dt_it           *di;
553         int                     rc;
554         ENTRY;
555
556         LASSERT(lfsck->li_obj_dir == NULL);
557         LASSERT(lfsck->li_di_dir == NULL);
558
559         lfsck->li_current_oit_processed = 0;
560         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
561                 com->lc_new_checked = 0;
562                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
563                         com->lc_journal = 0;
564
565                 rc = com->lc_ops->lfsck_prep(env, com);
566                 if (rc != 0)
567                         RETURN(rc);
568
569                 if ((pos == NULL) ||
570                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
571                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
572                         pos = &com->lc_pos_start;
573         }
574
575         /* Init otable-based iterator. */
576         if (pos == NULL) {
577                 rc = iops->load(env, lfsck->li_di_oit, 0);
578                 if (rc > 0) {
579                         lfsck->li_oit_over = 1;
580                         rc = 0;
581                 }
582
583                 GOTO(out, rc);
584         }
585
586         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
587         if (rc < 0)
588                 GOTO(out, rc);
589         else if (rc > 0)
590                 lfsck->li_oit_over = 1;
591
592         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
593                 GOTO(out, rc = 0);
594
595         /* Find the directory for namespace-based traverse. */
596         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
597         if (obj == NULL)
598                 GOTO(out, rc = 0);
599         else if (IS_ERR(obj))
600                 RETURN(PTR_ERR(obj));
601
602         /* XXX: Currently, skip remote object, the consistency for
603          *      remote object will be processed in LFSCK phase III. */
604         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
605             unlikely(!S_ISDIR(lfsck_object_type(obj))))
606                 GOTO(out, rc = 0);
607
608         if (unlikely(!dt_try_as_dir(env, obj)))
609                 GOTO(out, rc = -ENOTDIR);
610
611         /* Init the namespace-based directory traverse. */
612         iops = &obj->do_index_ops->dio_it;
613         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
614         if (IS_ERR(di))
615                 GOTO(out, rc = PTR_ERR(di));
616
617         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
618
619         rc = iops->load(env, di, pos->lp_dir_cookie);
620         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
621                 rc = iops->next(env, di);
622         else if (rc > 0)
623                 rc = 0;
624
625         if (rc != 0) {
626                 iops->put(env, di);
627                 iops->fini(env, di);
628                 GOTO(out, rc);
629         }
630
631         lfsck->li_obj_dir = lfsck_object_get(obj);
632         lfsck->li_cookie_dir = iops->store(env, di);
633         spin_lock(&lfsck->li_lock);
634         lfsck->li_di_dir = di;
635         spin_unlock(&lfsck->li_lock);
636
637         GOTO(out, rc = 0);
638
639 out:
640         if (obj != NULL)
641                 lfsck_object_put(env, obj);
642
643         if (rc < 0) {
644                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
645                                              lc_link)
646                         com->lc_ops->lfsck_post(env, com, rc, true);
647
648                 return rc;
649         }
650
651         rc = 0;
652         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
653         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
654                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
655                 if (rc != 0)
656                         break;
657         }
658
659         lfsck->li_time_last_checkpoint = cfs_time_current();
660         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
661                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
662         return rc;
663 }
664
665 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
666                    struct dt_object *obj)
667 {
668         struct lfsck_component *com;
669         const struct dt_it_ops *iops;
670         struct dt_it           *di;
671         int                     rc;
672         ENTRY;
673
674         LASSERT(lfsck->li_obj_dir == NULL);
675
676         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
677                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
678                 if (rc != 0)
679                         RETURN(rc);
680         }
681
682         rc = lfsck_needs_scan_dir(env, lfsck, obj);
683         if (rc <= 0)
684                 GOTO(out, rc);
685
686         if (unlikely(!dt_try_as_dir(env, obj)))
687                 GOTO(out, rc = -ENOTDIR);
688
689         iops = &obj->do_index_ops->dio_it;
690         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
691         if (IS_ERR(di))
692                 GOTO(out, rc = PTR_ERR(di));
693
694         rc = iops->load(env, di, 0);
695         if (rc == 0)
696                 rc = iops->next(env, di);
697         else if (rc > 0)
698                 rc = 0;
699
700         if (rc != 0) {
701                 iops->put(env, di);
702                 iops->fini(env, di);
703                 GOTO(out, rc);
704         }
705
706         lfsck->li_obj_dir = lfsck_object_get(obj);
707         lfsck->li_cookie_dir = iops->store(env, di);
708         spin_lock(&lfsck->li_lock);
709         lfsck->li_di_dir = di;
710         spin_unlock(&lfsck->li_lock);
711
712         GOTO(out, rc = 0);
713
714 out:
715         if (rc < 0)
716                 lfsck_fail(env, lfsck, false);
717         return (rc > 0 ? 0 : rc);
718 }
719
720 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
721                    struct dt_object *obj, struct lu_dirent *ent)
722 {
723         struct lfsck_component *com;
724         int                     rc;
725
726         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
727                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
728                 if (rc != 0)
729                         return rc;
730         }
731         return 0;
732 }
733
734 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
735                int result)
736 {
737         struct lfsck_component *com;
738         struct lfsck_component *next;
739         int                     rc;
740
741         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
742         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
743                 rc = com->lc_ops->lfsck_post(env, com, result, false);
744                 if (rc != 0)
745                         return rc;
746         }
747
748         lfsck->li_time_last_checkpoint = cfs_time_current();
749         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
750                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
751         return result;
752 }
753
754 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
755 {
756         struct lfsck_component *com;
757         struct lfsck_component *next;
758         int                     rc;
759
760         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
761                                      lc_link) {
762                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
763                         com->lc_journal = 0;
764
765                 rc = com->lc_ops->lfsck_double_scan(env, com);
766                 if (rc != 0)
767                         return rc;
768         }
769         return 0;
770 }
771
772 /* external interfaces */
773
774 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
775 {
776         struct lu_env           env;
777         struct lfsck_instance  *lfsck;
778         int                     rc;
779         ENTRY;
780
781         lfsck = lfsck_instance_find(key, true, false);
782         if (unlikely(lfsck == NULL))
783                 RETURN(-ENODEV);
784
785         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
786         if (rc != 0)
787                 GOTO(out, rc);
788
789         rc = snprintf(buf, len, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
790         lu_env_fini(&env);
791
792         GOTO(out, rc);
793
794 out:
795         lfsck_instance_put(&env, lfsck);
796         return rc;
797 }
798 EXPORT_SYMBOL(lfsck_get_speed);
799
800 int lfsck_set_speed(struct dt_device *key, int val)
801 {
802         struct lu_env           env;
803         struct lfsck_instance  *lfsck;
804         int                     rc;
805         ENTRY;
806
807         lfsck = lfsck_instance_find(key, true, false);
808         if (unlikely(lfsck == NULL))
809                 RETURN(-ENODEV);
810
811         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
812         if (rc != 0)
813                 GOTO(out, rc);
814
815         mutex_lock(&lfsck->li_mutex);
816         __lfsck_set_speed(lfsck, val);
817         rc = lfsck_bookmark_store(&env, lfsck);
818         mutex_unlock(&lfsck->li_mutex);
819         lu_env_fini(&env);
820
821         GOTO(out, rc);
822
823 out:
824         lfsck_instance_put(&env, lfsck);
825         return rc;
826 }
827 EXPORT_SYMBOL(lfsck_set_speed);
828
829 int lfsck_dump(struct dt_device *key, void *buf, int len, __u16 type)
830 {
831         struct lu_env           env;
832         struct lfsck_instance  *lfsck;
833         struct lfsck_component *com   = NULL;
834         int                     rc;
835         ENTRY;
836
837         lfsck = lfsck_instance_find(key, true, false);
838         if (unlikely(lfsck == NULL))
839                 RETURN(-ENODEV);
840
841         com = lfsck_component_find(lfsck, type);
842         if (com == NULL)
843                 GOTO(out, rc = -ENOTSUPP);
844
845         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
846         if (rc != 0)
847                 GOTO(out, rc);
848
849         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
850         lu_env_fini(&env);
851
852         GOTO(out, rc);
853
854 out:
855         if (com != NULL)
856                 lfsck_component_put(&env, com);
857         lfsck_instance_put(&env, lfsck);
858         return rc;
859 }
860 EXPORT_SYMBOL(lfsck_dump);
861
862 int lfsck_start(const struct lu_env *env, struct dt_device *key,
863                 struct lfsck_start_param *lsp)
864 {
865         struct lfsck_start     *start  = lsp->lsp_start;
866         struct lfsck_instance  *lfsck;
867         struct lfsck_bookmark  *bk;
868         struct ptlrpc_thread   *thread;
869         struct lfsck_component *com;
870         struct l_wait_info      lwi    = { 0 };
871         bool                    dirty  = false;
872         long                    rc     = 0;
873         __u16                   valid  = 0;
874         __u16                   flags  = 0;
875         ENTRY;
876
877         lfsck = lfsck_instance_find(key, true, false);
878         if (unlikely(lfsck == NULL))
879                 RETURN(-ENODEV);
880
881         /* start == NULL means auto trigger paused LFSCK. */
882         if ((start == NULL) &&
883             (cfs_list_empty(&lfsck->li_list_scan) ||
884              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
885                 GOTO(put, rc = 0);
886
887         bk = &lfsck->li_bookmark_ram;
888         thread = &lfsck->li_thread;
889         mutex_lock(&lfsck->li_mutex);
890         spin_lock(&lfsck->li_lock);
891         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
892                 spin_unlock(&lfsck->li_lock);
893                 GOTO(out, rc = -EALREADY);
894         }
895
896         spin_unlock(&lfsck->li_lock);
897
898         lfsck->li_namespace = lsp->lsp_namespace;
899         lfsck->li_paused = 0;
900         lfsck->li_oit_over = 0;
901         lfsck->li_drop_dryrun = 0;
902         lfsck->li_new_scanned = 0;
903
904         /* For auto trigger. */
905         if (start == NULL)
906                 goto trigger;
907
908         start->ls_version = bk->lb_version;
909         if (start->ls_valid & LSV_SPEED_LIMIT) {
910                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
911                 dirty = true;
912         }
913
914         if (start->ls_valid & LSV_ERROR_HANDLE) {
915                 valid |= DOIV_ERROR_HANDLE;
916                 if (start->ls_flags & LPF_FAILOUT)
917                         flags |= DOIF_FAILOUT;
918
919                 if ((start->ls_flags & LPF_FAILOUT) &&
920                     !(bk->lb_param & LPF_FAILOUT)) {
921                         bk->lb_param |= LPF_FAILOUT;
922                         dirty = true;
923                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
924                            (bk->lb_param & LPF_FAILOUT)) {
925                         bk->lb_param &= ~LPF_FAILOUT;
926                         dirty = true;
927                 }
928         }
929
930         if (start->ls_valid & LSV_DRYRUN) {
931                 if ((start->ls_flags & LPF_DRYRUN) &&
932                     !(bk->lb_param & LPF_DRYRUN)) {
933                         bk->lb_param |= LPF_DRYRUN;
934                         dirty = true;
935                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
936                            (bk->lb_param & LPF_DRYRUN)) {
937                         bk->lb_param &= ~LPF_DRYRUN;
938                         lfsck->li_drop_dryrun = 1;
939                         dirty = true;
940                 }
941         }
942
943         if (dirty) {
944                 rc = lfsck_bookmark_store(env, lfsck);
945                 if (rc != 0)
946                         GOTO(out, rc);
947         }
948
949         if (start->ls_flags & LPF_RESET)
950                 flags |= DOIF_RESET;
951
952         if (start->ls_active != 0) {
953                 struct lfsck_component *next;
954                 __u16 type = 1;
955
956                 if (start->ls_active == LFSCK_TYPES_ALL)
957                         start->ls_active = LFSCK_TYPES_SUPPORTED;
958
959                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
960                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
961                         GOTO(out, rc = -ENOTSUPP);
962                 }
963
964                 cfs_list_for_each_entry_safe(com, next,
965                                              &lfsck->li_list_scan, lc_link) {
966                         if (!(com->lc_type & start->ls_active)) {
967                                 rc = com->lc_ops->lfsck_post(env, com, 0,
968                                                              false);
969                                 if (rc != 0)
970                                         GOTO(out, rc);
971                         }
972                 }
973
974                 while (start->ls_active != 0) {
975                         if (type & start->ls_active) {
976                                 com = __lfsck_component_find(lfsck, type,
977                                                         &lfsck->li_list_idle);
978                                 if (com != NULL) {
979                                         /* The component status will be updated
980                                          * when its prep() is called later by
981                                          * the LFSCK main engine. */
982                                         cfs_list_del_init(&com->lc_link);
983                                         cfs_list_add_tail(&com->lc_link,
984                                                           &lfsck->li_list_scan);
985                                 }
986                                 start->ls_active &= ~type;
987                         }
988                         type <<= 1;
989                 }
990         }
991
992         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
993                 start->ls_active |= com->lc_type;
994                 if (flags & DOIF_RESET) {
995                         rc = com->lc_ops->lfsck_reset(env, com, false);
996                         if (rc != 0)
997                                 GOTO(out, rc);
998                 }
999         }
1000
1001 trigger:
1002         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1003         if (bk->lb_param & LPF_DRYRUN)
1004                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1005
1006         if (bk->lb_param & LPF_FAILOUT) {
1007                 valid |= DOIV_ERROR_HANDLE;
1008                 flags |= DOIF_FAILOUT;
1009         }
1010
1011         if (!cfs_list_empty(&lfsck->li_list_scan))
1012                 flags |= DOIF_OUTUSED;
1013
1014         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1015         thread_set_flags(thread, 0);
1016         rc = PTR_ERR(kthread_run(lfsck_master_engine, lfsck, "lfsck"));
1017         if (IS_ERR_VALUE(rc)) {
1018                 CERROR("%s: cannot start LFSCK thread, rc = %ld\n",
1019                        lfsck_lfsck2name(lfsck), rc);
1020         } else {
1021                 rc = 0;
1022                 l_wait_event(thread->t_ctl_waitq,
1023                              thread_is_running(thread) ||
1024                              thread_is_stopped(thread),
1025                              &lwi);
1026         }
1027
1028         GOTO(out, rc);
1029
1030 out:
1031         mutex_unlock(&lfsck->li_mutex);
1032 put:
1033         lfsck_instance_put(env, lfsck);
1034         return (rc < 0 ? rc : 0);
1035 }
1036 EXPORT_SYMBOL(lfsck_start);
1037
1038 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1039 {
1040         struct lfsck_instance   *lfsck;
1041         struct ptlrpc_thread    *thread;
1042         struct l_wait_info       lwi    = { 0 };
1043         ENTRY;
1044
1045         lfsck = lfsck_instance_find(key, true, false);
1046         if (unlikely(lfsck == NULL))
1047                 RETURN(-ENODEV);
1048
1049         thread = &lfsck->li_thread;
1050         mutex_lock(&lfsck->li_mutex);
1051         spin_lock(&lfsck->li_lock);
1052         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1053                 spin_unlock(&lfsck->li_lock);
1054                 mutex_unlock(&lfsck->li_mutex);
1055                 lfsck_instance_put(env, lfsck);
1056                 RETURN(-EALREADY);
1057         }
1058
1059         if (pause)
1060                 lfsck->li_paused = 1;
1061         thread_set_flags(thread, SVC_STOPPING);
1062         spin_unlock(&lfsck->li_lock);
1063
1064         cfs_waitq_broadcast(&thread->t_ctl_waitq);
1065         l_wait_event(thread->t_ctl_waitq,
1066                      thread_is_stopped(thread),
1067                      &lwi);
1068         mutex_unlock(&lfsck->li_mutex);
1069         lfsck_instance_put(env, lfsck);
1070
1071         RETURN(0);
1072 }
1073 EXPORT_SYMBOL(lfsck_stop);
1074
1075 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1076                    struct dt_device *next, bool master)
1077 {
1078         struct lfsck_instance   *lfsck;
1079         struct dt_object        *root;
1080         struct dt_object        *obj;
1081         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1082         int                      rc;
1083         ENTRY;
1084
1085         lfsck = lfsck_instance_find(key, false, false);
1086         if (unlikely(lfsck != NULL))
1087                 RETURN(-EEXIST);
1088
1089         OBD_ALLOC_PTR(lfsck);
1090         if (lfsck == NULL)
1091                 RETURN(-ENOMEM);
1092
1093         mutex_init(&lfsck->li_mutex);
1094         spin_lock_init(&lfsck->li_lock);
1095         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1096         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1097         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1098         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1099         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1100         atomic_set(&lfsck->li_ref, 1);
1101         cfs_waitq_init(&lfsck->li_thread.t_ctl_waitq);
1102         lfsck->li_next = next;
1103         lfsck->li_bottom = key;
1104
1105         fid->f_seq = FID_SEQ_LOCAL_NAME;
1106         fid->f_oid = 1;
1107         fid->f_ver = 0;
1108         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1109         if (rc != 0)
1110                 GOTO(out, rc);
1111
1112         rc = dt_root_get(env, key, fid);
1113         if (rc != 0)
1114                 GOTO(out, rc);
1115
1116         root = dt_locate(env, lfsck->li_bottom, fid);
1117         if (IS_ERR(root))
1118                 GOTO(out, rc = PTR_ERR(root));
1119
1120         lfsck->li_local_root = root;
1121         dt_try_as_dir(env, root);
1122         if (master) {
1123                 lfsck->li_master = 1;
1124                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1125                         rc = dt_lookup(env, root,
1126                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1127                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1128                         if (rc != 0)
1129                                 GOTO(out, rc);
1130                 }
1131         }
1132
1133         fid->f_seq = FID_SEQ_LOCAL_FILE;
1134         fid->f_oid = OTABLE_IT_OID;
1135         fid->f_ver = 0;
1136         obj = dt_locate(env, lfsck->li_bottom, fid);
1137         if (IS_ERR(obj))
1138                 GOTO(out, rc = PTR_ERR(obj));
1139
1140         lfsck->li_obj_oit = obj;
1141         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1142         if (rc != 0) {
1143                 if (rc == -ENOTSUPP)
1144                         GOTO(add, rc = 0);
1145
1146                 GOTO(out, rc);
1147         }
1148
1149         rc = lfsck_bookmark_setup(env, lfsck);
1150         if (rc != 0)
1151                 GOTO(out, rc);
1152
1153         if (master) {
1154                 rc = lfsck_namespace_setup(env, lfsck);
1155                 if (rc < 0)
1156                         GOTO(out, rc);
1157         }
1158
1159         /* XXX: more LFSCK components initialization to be added here. */
1160
1161 add:
1162         rc = lfsck_instance_add(lfsck);
1163 out:
1164         if (rc != 0)
1165                 lfsck_instance_cleanup(env, lfsck);
1166         return rc;
1167 }
1168 EXPORT_SYMBOL(lfsck_register);
1169
1170 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1171 {
1172         struct lfsck_instance *lfsck;
1173
1174         lfsck = lfsck_instance_find(key, false, true);
1175         if (lfsck != NULL)
1176                 lfsck_instance_put(env, lfsck);
1177 }
1178 EXPORT_SYMBOL(lfsck_degister);
1179
1180 static int __init lfsck_init(void)
1181 {
1182         int rc;
1183
1184         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1185         rc = lu_context_key_register(&lfsck_thread_key);
1186         return rc;
1187 }
1188
1189 static void __exit lfsck_exit(void)
1190 {
1191         LASSERT(cfs_list_empty(&lfsck_instance_list));
1192
1193         lu_context_key_degister(&lfsck_thread_key);
1194 }
1195
1196 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1197 MODULE_DESCRIPTION("LFSCK");
1198 MODULE_LICENSE("GPL");
1199
1200 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);