Whamcloud - gitweb
5437f71448f5dc0067ee15b0488f83b632266fd2
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <libcfs/list.h>
37 #include <lu_object.h>
38 #include <dt_object.h>
39 #include <md_object.h>
40 #include <lustre_fld.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_lfsck.h>
44 #include <lustre/lustre_lfsck_user.h>
45
46 #include "lfsck_internal.h"
47
48 /* define lfsck thread key */
49 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
50
51 static void lfsck_key_fini(const struct lu_context *ctx,
52                            struct lu_context_key *key, void *data)
53 {
54         struct lfsck_thread_info *info = data;
55
56         lu_buf_free(&info->lti_linkea_buf);
57         OBD_FREE_PTR(info);
58 }
59
60 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
61 LU_KEY_INIT_GENERIC(lfsck);
62
63 static CFS_LIST_HEAD(lfsck_instance_list);
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 const char *lfsck_status_names[] = {
67         "init",
68         "scanning-phase1",
69         "scanning-phase2",
70         "completed",
71         "failed",
72         "stopped",
73         "paused",
74         "crashed",
75         NULL
76 };
77
78 const char *lfsck_flags_names[] = {
79         "scanned-once",
80         "inconsistent",
81         "upgrade",
82         NULL
83 };
84
85 const char *lfsck_param_names[] = {
86         "failout",
87         "dryrun",
88         NULL
89 };
90
91 static inline mdsno_t lfsck_dev_idx(struct dt_device *dev)
92 {
93         return dev->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
94 }
95
96 static inline void lfsck_component_get(struct lfsck_component *com)
97 {
98         atomic_inc(&com->lc_ref);
99 }
100
101 static inline void lfsck_component_put(const struct lu_env *env,
102                                        struct lfsck_component *com)
103 {
104         if (atomic_dec_and_test(&com->lc_ref)) {
105                 if (com->lc_obj != NULL)
106                         lu_object_put(env, &com->lc_obj->do_lu);
107                 if (com->lc_file_ram != NULL)
108                         OBD_FREE(com->lc_file_ram, com->lc_file_size);
109                 if (com->lc_file_disk != NULL)
110                         OBD_FREE(com->lc_file_disk, com->lc_file_size);
111                 OBD_FREE_PTR(com);
112         }
113 }
114
115 static inline struct lfsck_component *
116 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
117 {
118         struct lfsck_component *com;
119
120         cfs_list_for_each_entry(com, list, lc_link) {
121                 if (com->lc_type == type)
122                         return com;
123         }
124         return NULL;
125 }
126
127 static struct lfsck_component *
128 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
129 {
130         struct lfsck_component *com;
131
132         spin_lock(&lfsck->li_lock);
133         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
134         if (com != NULL)
135                 goto unlock;
136
137         com = __lfsck_component_find(lfsck, type,
138                                      &lfsck->li_list_double_scan);
139         if (com != NULL)
140                 goto unlock;
141
142         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
143
144 unlock:
145         if (com != NULL)
146                 lfsck_component_get(com);
147         spin_unlock(&lfsck->li_lock);
148         return com;
149 }
150
151 void lfsck_component_cleanup(const struct lu_env *env,
152                              struct lfsck_component *com)
153 {
154         if (!cfs_list_empty(&com->lc_link))
155                 cfs_list_del_init(&com->lc_link);
156         if (!cfs_list_empty(&com->lc_link_dir))
157                 cfs_list_del_init(&com->lc_link_dir);
158
159         lfsck_component_put(env, com);
160 }
161
162 static void lfsck_instance_cleanup(const struct lu_env *env,
163                                    struct lfsck_instance *lfsck)
164 {
165         struct ptlrpc_thread    *thread = &lfsck->li_thread;
166         struct lfsck_component  *com;
167         ENTRY;
168
169         LASSERT(list_empty(&lfsck->li_link));
170         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
171
172         if (lfsck->li_obj_oit != NULL) {
173                 lu_object_put(env, &lfsck->li_obj_oit->do_lu);
174                 lfsck->li_obj_oit = NULL;
175         }
176
177         LASSERT(lfsck->li_obj_dir == NULL);
178
179         while (!cfs_list_empty(&lfsck->li_list_scan)) {
180                 com = cfs_list_entry(lfsck->li_list_scan.next,
181                                      struct lfsck_component,
182                                      lc_link);
183                 lfsck_component_cleanup(env, com);
184         }
185
186         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
187
188         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
189                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
190                                      struct lfsck_component,
191                                      lc_link);
192                 lfsck_component_cleanup(env, com);
193         }
194
195         while (!cfs_list_empty(&lfsck->li_list_idle)) {
196                 com = cfs_list_entry(lfsck->li_list_idle.next,
197                                      struct lfsck_component,
198                                      lc_link);
199                 lfsck_component_cleanup(env, com);
200         }
201
202         if (lfsck->li_bookmark_obj != NULL) {
203                 lu_object_put(env, &lfsck->li_bookmark_obj->do_lu);
204                 lfsck->li_bookmark_obj = NULL;
205         }
206
207         if (lfsck->li_los != NULL) {
208                 local_oid_storage_fini(env, lfsck->li_los);
209                 lfsck->li_los = NULL;
210         }
211
212         if (lfsck->li_local_root != NULL) {
213                 lu_object_put(env, &lfsck->li_local_root->do_lu);
214                 lfsck->li_local_root = NULL;
215         }
216
217         OBD_FREE_PTR(lfsck);
218 }
219
220 static inline void lfsck_instance_get(struct lfsck_instance *lfsck)
221 {
222         atomic_inc(&lfsck->li_ref);
223 }
224
225 static inline void lfsck_instance_put(const struct lu_env *env,
226                                       struct lfsck_instance *lfsck)
227 {
228         if (atomic_dec_and_test(&lfsck->li_ref))
229                 lfsck_instance_cleanup(env, lfsck);
230 }
231
232 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
233                                                          bool ref, bool unlink)
234 {
235         struct lfsck_instance *lfsck;
236
237         spin_lock(&lfsck_instance_lock);
238         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
239                 if (lfsck->li_bottom == key) {
240                         if (ref)
241                                 lfsck_instance_get(lfsck);
242                         if (unlink)
243                                 list_del_init(&lfsck->li_link);
244                         spin_unlock(&lfsck_instance_lock);
245                         return lfsck;
246                 }
247         }
248         spin_unlock(&lfsck_instance_lock);
249         return NULL;
250 }
251
252 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
253 {
254         struct lfsck_instance *tmp;
255
256         spin_lock(&lfsck_instance_lock);
257         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
258                 if (lfsck->li_bottom == tmp->li_bottom) {
259                         spin_unlock(&lfsck_instance_lock);
260                         return -EEXIST;
261                 }
262         }
263
264         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
265         spin_unlock(&lfsck_instance_lock);
266         return 0;
267 }
268
269 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
270                     const char *prefix)
271 {
272         int save = *len;
273         int flag;
274         int rc;
275         int i;
276
277         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
278         if (rc <= 0)
279                 return -ENOSPC;
280
281         *buf += rc;
282         *len -= rc;
283         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
284                 if (flag & bits) {
285                         bits &= ~flag;
286                         rc = snprintf(*buf, *len, "%s%c", names[i],
287                                       bits != 0 ? ',' : '\n');
288                         if (rc <= 0)
289                                 return -ENOSPC;
290
291                         *buf += rc;
292                         *len -= rc;
293                 }
294         }
295         return save - *len;
296 }
297
298 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
299 {
300         int rc;
301
302         if (time != 0)
303                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
304                               cfs_time_current_sec() - time);
305         else
306                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
307         if (rc <= 0)
308                 return -ENOSPC;
309
310         *buf += rc;
311         *len -= rc;
312         return rc;
313 }
314
315 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
316                    const char *prefix)
317 {
318         int rc;
319
320         if (fid_is_zero(&pos->lp_dir_parent)) {
321                 if (pos->lp_oit_cookie == 0)
322                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
323                                       prefix);
324                 else
325                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
326                                       prefix, pos->lp_oit_cookie);
327         } else {
328                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
329                               prefix, pos->lp_oit_cookie,
330                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
331         }
332         if (rc <= 0)
333                 return -ENOSPC;
334
335         *buf += rc;
336         *len -= rc;
337         return rc;
338 }
339
340 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
341                     struct lfsck_position *pos, bool init)
342 {
343         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
344
345         if (unlikely(lfsck->li_di_oit == NULL)) {
346                 memset(pos, 0, sizeof(*pos));
347                 return;
348         }
349
350         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
351         if (!lfsck->li_current_oit_processed && !init)
352                 pos->lp_oit_cookie--;
353
354         LASSERT(pos->lp_oit_cookie > 0);
355
356         if (lfsck->li_di_dir != NULL) {
357                 struct dt_object *dto = lfsck->li_obj_dir;
358
359                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
360                                                         lfsck->li_di_dir);
361
362                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
363                         fid_zero(&pos->lp_dir_parent);
364                         pos->lp_dir_cookie = 0;
365                 } else {
366                         pos->lp_dir_parent = *lu_object_fid(&dto->do_lu);
367                 }
368         } else {
369                 fid_zero(&pos->lp_dir_parent);
370                 pos->lp_dir_cookie = 0;
371         }
372 }
373
374 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
375 {
376         lfsck->li_bookmark_ram.lb_speed_limit = limit;
377         if (limit != LFSCK_SPEED_NO_LIMIT) {
378                 if (limit > CFS_HZ) {
379                         lfsck->li_sleep_rate = limit / CFS_HZ;
380                         lfsck->li_sleep_jif = 1;
381                 } else {
382                         lfsck->li_sleep_rate = 1;
383                         lfsck->li_sleep_jif = CFS_HZ / limit;
384                 }
385         } else {
386                 lfsck->li_sleep_jif = 0;
387                 lfsck->li_sleep_rate = 0;
388         }
389 }
390
391 void lfsck_control_speed(struct lfsck_instance *lfsck)
392 {
393         struct ptlrpc_thread *thread = &lfsck->li_thread;
394         struct l_wait_info    lwi;
395
396         if (lfsck->li_sleep_jif > 0 &&
397             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
398                 spin_lock(&lfsck->li_lock);
399                 if (likely(lfsck->li_sleep_jif > 0 &&
400                            lfsck->li_new_scanned >= lfsck->li_sleep_rate)) {
401                         lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
402                                                LWI_ON_SIGNAL_NOOP, NULL);
403                         spin_unlock(&lfsck->li_lock);
404
405                         l_wait_event(thread->t_ctl_waitq,
406                                      !thread_is_running(thread),
407                                      &lwi);
408                         lfsck->li_new_scanned = 0;
409                 } else {
410                         spin_unlock(&lfsck->li_lock);
411                 }
412         }
413 }
414
415 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
416                             struct lu_fid *fid)
417 {
418         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
419                      !dt_try_as_dir(env, obj)))
420                 return -ENOTDIR;
421
422         return dt_lookup(env, obj, (struct dt_rec *)fid,
423                          (const struct dt_key *)"..", BYPASS_CAPA);
424 }
425
426 static int lfsck_needs_scan_dir(const struct lu_env *env,
427                                 struct lfsck_instance *lfsck,
428                                 struct dt_object *obj)
429 {
430         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
431         int            depth = 0;
432         int            rc;
433
434         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
435             cfs_list_empty(&lfsck->li_list_dir))
436                RETURN(0);
437
438         while (1) {
439                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
440                  *      which is the agent directory to manage the objects
441                  *      which name entries reside on remote MDTs. Related
442                  *      consistency verification will be processed in LFSCK
443                  *      phase III. */
444                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
445                         if (depth > 0)
446                                 lfsck_object_put(env, obj);
447                         return 1;
448                 }
449
450                 /* .lustre doesn't contain "real" user objects, no need lfsck */
451                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
452                         if (depth > 0)
453                                 lfsck_object_put(env, obj);
454                         return 0;
455                 }
456
457                 dt_read_lock(env, obj, MOR_TGT_CHILD);
458                 if (unlikely(lfsck_is_dead_obj(obj))) {
459                         dt_read_unlock(env, obj);
460                         if (depth > 0)
461                                 lfsck_object_put(env, obj);
462                         return 0;
463                 }
464
465                 rc = dt_xattr_get(env, obj,
466                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
467                                   BYPASS_CAPA);
468                 dt_read_unlock(env, obj);
469                 if (rc >= 0) {
470                         if (depth > 0)
471                                 lfsck_object_put(env, obj);
472                         return 1;
473                 }
474
475                 if (rc < 0 && rc != -ENODATA) {
476                         if (depth > 0)
477                                 lfsck_object_put(env, obj);
478                         return rc;
479                 }
480
481                 rc = lfsck_parent_fid(env, obj, fid);
482                 if (depth > 0)
483                         lfsck_object_put(env, obj);
484                 if (rc != 0)
485                         return rc;
486
487                 if (unlikely(lu_fid_eq(fid,
488                                        lfsck_dto2fid(lfsck->li_local_root))))
489                         return 0;
490
491                 obj = lfsck_object_find(env, lfsck, fid);
492                 if (obj == NULL)
493                         return 0;
494                 else if (IS_ERR(obj))
495                         return PTR_ERR(obj);
496
497                 if (!dt_object_exists(obj)) {
498                         lfsck_object_put(env, obj);
499                         return 0;
500                 }
501
502                 /* Currently, only client visible directory can be remote. */
503                 if (dt_object_remote(obj)) {
504                         lfsck_object_put(env, obj);
505                         return 1;
506                 }
507
508                 depth++;
509         }
510         return 0;
511 }
512
513 /* LFSCK wrap functions */
514
515 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
516                 bool new_checked)
517 {
518         struct lfsck_component *com;
519
520         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
521                 com->lc_ops->lfsck_fail(env, com, new_checked);
522         }
523 }
524
525 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
526 {
527         struct lfsck_component *com;
528         int                     rc;
529
530         if (likely(cfs_time_beforeq(cfs_time_current(),
531                                     lfsck->li_time_next_checkpoint)))
532                 return 0;
533
534         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
535         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
536                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
537                 if (rc != 0)
538                         return rc;;
539         }
540
541         lfsck->li_time_last_checkpoint = cfs_time_current();
542         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
543                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
544         return 0;
545 }
546
547 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
548 {
549         struct dt_object       *obj     = NULL;
550         struct lfsck_component *com;
551         struct lfsck_component *next;
552         struct lfsck_position  *pos     = NULL;
553         const struct dt_it_ops *iops    =
554                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
555         struct dt_it           *di;
556         int                     rc;
557         ENTRY;
558
559         LASSERT(lfsck->li_obj_dir == NULL);
560         LASSERT(lfsck->li_di_dir == NULL);
561
562         lfsck->li_current_oit_processed = 0;
563         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
564                 com->lc_new_checked = 0;
565                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
566                         com->lc_journal = 0;
567
568                 rc = com->lc_ops->lfsck_prep(env, com);
569                 if (rc != 0)
570                         RETURN(rc);
571
572                 if ((pos == NULL) ||
573                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
574                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
575                         pos = &com->lc_pos_start;
576         }
577
578         /* Init otable-based iterator. */
579         if (pos == NULL) {
580                 rc = iops->load(env, lfsck->li_di_oit, 0);
581                 if (rc > 0) {
582                         lfsck->li_oit_over = 1;
583                         rc = 0;
584                 }
585
586                 GOTO(out, rc);
587         }
588
589         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
590         if (rc < 0)
591                 GOTO(out, rc);
592         else if (rc > 0)
593                 lfsck->li_oit_over = 1;
594
595         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
596                 GOTO(out, rc = 0);
597
598         /* Find the directory for namespace-based traverse. */
599         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
600         if (obj == NULL)
601                 GOTO(out, rc = 0);
602         else if (IS_ERR(obj))
603                 RETURN(PTR_ERR(obj));
604
605         /* XXX: Currently, skip remote object, the consistency for
606          *      remote object will be processed in LFSCK phase III. */
607         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
608             unlikely(!S_ISDIR(lfsck_object_type(obj))))
609                 GOTO(out, rc = 0);
610
611         if (unlikely(!dt_try_as_dir(env, obj)))
612                 GOTO(out, rc = -ENOTDIR);
613
614         /* Init the namespace-based directory traverse. */
615         iops = &obj->do_index_ops->dio_it;
616         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
617         if (IS_ERR(di))
618                 GOTO(out, rc = PTR_ERR(di));
619
620         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
621
622         rc = iops->load(env, di, pos->lp_dir_cookie);
623         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
624                 rc = iops->next(env, di);
625         else if (rc > 0)
626                 rc = 0;
627
628         if (rc != 0) {
629                 iops->put(env, di);
630                 iops->fini(env, di);
631                 GOTO(out, rc);
632         }
633
634         lfsck->li_obj_dir = lfsck_object_get(obj);
635         lfsck->li_cookie_dir = iops->store(env, di);
636         spin_lock(&lfsck->li_lock);
637         lfsck->li_di_dir = di;
638         spin_unlock(&lfsck->li_lock);
639
640         GOTO(out, rc = 0);
641
642 out:
643         if (obj != NULL)
644                 lfsck_object_put(env, obj);
645
646         if (rc < 0) {
647                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
648                                              lc_link)
649                         com->lc_ops->lfsck_post(env, com, rc, true);
650
651                 return rc;
652         }
653
654         rc = 0;
655         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
656         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
657                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
658                 if (rc != 0)
659                         break;
660         }
661
662         lfsck->li_time_last_checkpoint = cfs_time_current();
663         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
664                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
665         return rc;
666 }
667
668 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
669                    struct dt_object *obj)
670 {
671         struct lfsck_component *com;
672         const struct dt_it_ops *iops;
673         struct dt_it           *di;
674         int                     rc;
675         ENTRY;
676
677         LASSERT(lfsck->li_obj_dir == NULL);
678
679         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
680                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
681                 if (rc != 0)
682                         RETURN(rc);
683         }
684
685         rc = lfsck_needs_scan_dir(env, lfsck, obj);
686         if (rc <= 0)
687                 GOTO(out, rc);
688
689         if (unlikely(!dt_try_as_dir(env, obj)))
690                 GOTO(out, rc = -ENOTDIR);
691
692         iops = &obj->do_index_ops->dio_it;
693         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
694         if (IS_ERR(di))
695                 GOTO(out, rc = PTR_ERR(di));
696
697         rc = iops->load(env, di, 0);
698         if (rc == 0)
699                 rc = iops->next(env, di);
700         else if (rc > 0)
701                 rc = 0;
702
703         if (rc != 0) {
704                 iops->put(env, di);
705                 iops->fini(env, di);
706                 GOTO(out, rc);
707         }
708
709         lfsck->li_obj_dir = lfsck_object_get(obj);
710         lfsck->li_cookie_dir = iops->store(env, di);
711         spin_lock(&lfsck->li_lock);
712         lfsck->li_di_dir = di;
713         spin_unlock(&lfsck->li_lock);
714
715         GOTO(out, rc = 0);
716
717 out:
718         if (rc < 0)
719                 lfsck_fail(env, lfsck, false);
720         return (rc > 0 ? 0 : rc);
721 }
722
723 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
724                    struct dt_object *obj, struct lu_dirent *ent)
725 {
726         struct lfsck_component *com;
727         int                     rc;
728
729         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
730                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
731                 if (rc != 0)
732                         return rc;
733         }
734         return 0;
735 }
736
737 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
738                int result)
739 {
740         struct lfsck_component *com;
741         struct lfsck_component *next;
742         int                     rc;
743
744         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
745         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
746                 rc = com->lc_ops->lfsck_post(env, com, result, false);
747                 if (rc != 0)
748                         return rc;
749         }
750
751         lfsck->li_time_last_checkpoint = cfs_time_current();
752         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
753                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
754         return result;
755 }
756
757 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
758 {
759         struct lfsck_component *com;
760         struct lfsck_component *next;
761         int                     rc;
762
763         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
764                                      lc_link) {
765                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
766                         com->lc_journal = 0;
767
768                 rc = com->lc_ops->lfsck_double_scan(env, com);
769                 if (rc != 0)
770                         return rc;
771         }
772         return 0;
773 }
774
775 /* external interfaces */
776
777 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
778 {
779         struct lu_env           env;
780         struct lfsck_instance  *lfsck;
781         int                     rc;
782         ENTRY;
783
784         lfsck = lfsck_instance_find(key, true, false);
785         if (unlikely(lfsck == NULL))
786                 RETURN(-ENODEV);
787
788         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
789         if (rc != 0)
790                 GOTO(out, rc);
791
792         rc = snprintf(buf, len, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
793         lu_env_fini(&env);
794
795         GOTO(out, rc);
796
797 out:
798         lfsck_instance_put(&env, lfsck);
799         return rc;
800 }
801 EXPORT_SYMBOL(lfsck_get_speed);
802
803 int lfsck_set_speed(struct dt_device *key, int val)
804 {
805         struct lu_env           env;
806         struct lfsck_instance  *lfsck;
807         int                     rc;
808         ENTRY;
809
810         lfsck = lfsck_instance_find(key, true, false);
811         if (unlikely(lfsck == NULL))
812                 RETURN(-ENODEV);
813
814         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
815         if (rc != 0)
816                 GOTO(out, rc);
817
818         mutex_lock(&lfsck->li_mutex);
819         __lfsck_set_speed(lfsck, val);
820         rc = lfsck_bookmark_store(&env, lfsck);
821         mutex_unlock(&lfsck->li_mutex);
822         lu_env_fini(&env);
823
824         GOTO(out, rc);
825
826 out:
827         lfsck_instance_put(&env, lfsck);
828         return rc;
829 }
830 EXPORT_SYMBOL(lfsck_set_speed);
831
832 int lfsck_dump(struct dt_device *key, void *buf, int len, __u16 type)
833 {
834         struct lu_env           env;
835         struct lfsck_instance  *lfsck;
836         struct lfsck_component *com   = NULL;
837         int                     rc;
838         ENTRY;
839
840         lfsck = lfsck_instance_find(key, true, false);
841         if (unlikely(lfsck == NULL))
842                 RETURN(-ENODEV);
843
844         com = lfsck_component_find(lfsck, type);
845         if (com == NULL)
846                 GOTO(out, rc = -ENOTSUPP);
847
848         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
849         if (rc != 0)
850                 GOTO(out, rc);
851
852         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
853         lu_env_fini(&env);
854
855         GOTO(out, rc);
856
857 out:
858         if (com != NULL)
859                 lfsck_component_put(&env, com);
860         lfsck_instance_put(&env, lfsck);
861         return rc;
862 }
863 EXPORT_SYMBOL(lfsck_dump);
864
865 int lfsck_start(const struct lu_env *env, struct dt_device *key,
866                 struct lfsck_start_param *lsp)
867 {
868         struct lfsck_start     *start  = lsp->lsp_start;
869         struct lfsck_instance  *lfsck;
870         struct lfsck_bookmark  *bk;
871         struct ptlrpc_thread   *thread;
872         struct lfsck_component *com;
873         struct l_wait_info      lwi    = { 0 };
874         bool                    dirty  = false;
875         int                     rc     = 0;
876         __u16                   valid  = 0;
877         __u16                   flags  = 0;
878         ENTRY;
879
880         lfsck = lfsck_instance_find(key, true, false);
881         if (unlikely(lfsck == NULL))
882                 RETURN(-ENODEV);
883
884         /* start == NULL means auto trigger paused LFSCK. */
885         if ((start == NULL) &&
886             (cfs_list_empty(&lfsck->li_list_scan) ||
887              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
888                 GOTO(put, rc = 0);
889
890         bk = &lfsck->li_bookmark_ram;
891         thread = &lfsck->li_thread;
892         mutex_lock(&lfsck->li_mutex);
893         spin_lock(&lfsck->li_lock);
894         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
895                 spin_unlock(&lfsck->li_lock);
896                 GOTO(out, rc = -EALREADY);
897         }
898
899         spin_unlock(&lfsck->li_lock);
900
901         lfsck->li_namespace = lsp->lsp_namespace;
902         lfsck->li_paused = 0;
903         lfsck->li_oit_over = 0;
904         lfsck->li_drop_dryrun = 0;
905         lfsck->li_new_scanned = 0;
906
907         /* For auto trigger. */
908         if (start == NULL)
909                 goto trigger;
910
911         start->ls_version = bk->lb_version;
912         if (start->ls_valid & LSV_SPEED_LIMIT) {
913                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
914                 dirty = true;
915         }
916
917         if (start->ls_valid & LSV_ERROR_HANDLE) {
918                 valid |= DOIV_ERROR_HANDLE;
919                 if (start->ls_flags & LPF_FAILOUT)
920                         flags |= DOIF_FAILOUT;
921
922                 if ((start->ls_flags & LPF_FAILOUT) &&
923                     !(bk->lb_param & LPF_FAILOUT)) {
924                         bk->lb_param |= LPF_FAILOUT;
925                         dirty = true;
926                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
927                            (bk->lb_param & LPF_FAILOUT)) {
928                         bk->lb_param &= ~LPF_FAILOUT;
929                         dirty = true;
930                 }
931         }
932
933         if (start->ls_valid & LSV_DRYRUN) {
934                 if ((start->ls_flags & LPF_DRYRUN) &&
935                     !(bk->lb_param & LPF_DRYRUN)) {
936                         bk->lb_param |= LPF_DRYRUN;
937                         dirty = true;
938                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
939                            (bk->lb_param & LPF_DRYRUN)) {
940                         bk->lb_param &= ~LPF_DRYRUN;
941                         lfsck->li_drop_dryrun = 1;
942                         dirty = true;
943                 }
944         }
945
946         if (dirty) {
947                 rc = lfsck_bookmark_store(env, lfsck);
948                 if (rc != 0)
949                         GOTO(out, rc);
950         }
951
952         if (start->ls_flags & LPF_RESET)
953                 flags |= DOIF_RESET;
954
955         if (start->ls_active != 0) {
956                 struct lfsck_component *next;
957                 __u16 type = 1;
958
959                 if (start->ls_active == LFSCK_TYPES_ALL)
960                         start->ls_active = LFSCK_TYPES_SUPPORTED;
961
962                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
963                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
964                         GOTO(out, rc = -ENOTSUPP);
965                 }
966
967                 cfs_list_for_each_entry_safe(com, next,
968                                              &lfsck->li_list_scan, lc_link) {
969                         if (!(com->lc_type & start->ls_active)) {
970                                 rc = com->lc_ops->lfsck_post(env, com, 0,
971                                                              false);
972                                 if (rc != 0)
973                                         GOTO(out, rc);
974                         }
975                 }
976
977                 while (start->ls_active != 0) {
978                         if (type & start->ls_active) {
979                                 com = __lfsck_component_find(lfsck, type,
980                                                         &lfsck->li_list_idle);
981                                 if (com != NULL) {
982                                         /* The component status will be updated
983                                          * when its prep() is called later by
984                                          * the LFSCK main engine. */
985                                         cfs_list_del_init(&com->lc_link);
986                                         cfs_list_add_tail(&com->lc_link,
987                                                           &lfsck->li_list_scan);
988                                 }
989                                 start->ls_active &= ~type;
990                         }
991                         type <<= 1;
992                 }
993         }
994
995         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
996                 start->ls_active |= com->lc_type;
997                 if (flags & DOIF_RESET) {
998                         rc = com->lc_ops->lfsck_reset(env, com, false);
999                         if (rc != 0)
1000                                 GOTO(out, rc);
1001                 }
1002         }
1003
1004 trigger:
1005         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1006         if (bk->lb_param & LPF_DRYRUN)
1007                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1008
1009         if (bk->lb_param & LPF_FAILOUT) {
1010                 valid |= DOIV_ERROR_HANDLE;
1011                 flags |= DOIF_FAILOUT;
1012         }
1013
1014         if (!cfs_list_empty(&lfsck->li_list_scan))
1015                 flags |= DOIF_OUTUSED;
1016
1017         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1018         thread_set_flags(thread, 0);
1019         if (lfsck->li_master)
1020                 rc = PTR_ERR(kthread_run(lfsck_master_engine, lfsck, "lfsck"));
1021         if (rc < 0)
1022                 CERROR("%s: cannot start LFSCK thread, rc = %d\n",
1023                        lfsck_lfsck2name(lfsck), rc);
1024         else
1025                 l_wait_event(thread->t_ctl_waitq,
1026                              thread_is_running(thread) ||
1027                              thread_is_stopped(thread),
1028                              &lwi);
1029
1030         GOTO(out, rc = 0);
1031
1032 out:
1033         mutex_unlock(&lfsck->li_mutex);
1034 put:
1035         lfsck_instance_put(env, lfsck);
1036         return (rc < 0 ? rc : 0);
1037 }
1038 EXPORT_SYMBOL(lfsck_start);
1039
1040 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1041 {
1042         struct lfsck_instance   *lfsck;
1043         struct ptlrpc_thread    *thread;
1044         struct l_wait_info       lwi    = { 0 };
1045         ENTRY;
1046
1047         lfsck = lfsck_instance_find(key, true, false);
1048         if (unlikely(lfsck == NULL))
1049                 RETURN(-ENODEV);
1050
1051         thread = &lfsck->li_thread;
1052         mutex_lock(&lfsck->li_mutex);
1053         spin_lock(&lfsck->li_lock);
1054         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1055                 spin_unlock(&lfsck->li_lock);
1056                 mutex_unlock(&lfsck->li_mutex);
1057                 lfsck_instance_put(env, lfsck);
1058                 RETURN(-EALREADY);
1059         }
1060
1061         if (pause)
1062                 lfsck->li_paused = 1;
1063         thread_set_flags(thread, SVC_STOPPING);
1064         spin_unlock(&lfsck->li_lock);
1065
1066         cfs_waitq_broadcast(&thread->t_ctl_waitq);
1067         l_wait_event(thread->t_ctl_waitq,
1068                      thread_is_stopped(thread),
1069                      &lwi);
1070         mutex_unlock(&lfsck->li_mutex);
1071         lfsck_instance_put(env, lfsck);
1072
1073         RETURN(0);
1074 }
1075 EXPORT_SYMBOL(lfsck_stop);
1076
1077 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1078                    struct dt_device *next, bool master)
1079 {
1080         struct lfsck_instance   *lfsck;
1081         struct dt_object        *root;
1082         struct dt_object        *obj;
1083         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1084         int                      rc;
1085         ENTRY;
1086
1087         lfsck = lfsck_instance_find(key, false, false);
1088         if (unlikely(lfsck != NULL))
1089                 RETURN(-EEXIST);
1090
1091         OBD_ALLOC_PTR(lfsck);
1092         if (lfsck == NULL)
1093                 RETURN(-ENOMEM);
1094
1095         mutex_init(&lfsck->li_mutex);
1096         spin_lock_init(&lfsck->li_lock);
1097         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1098         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1099         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1100         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1101         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1102         atomic_set(&lfsck->li_ref, 1);
1103         cfs_waitq_init(&lfsck->li_thread.t_ctl_waitq);
1104         lfsck->li_next = next;
1105         lfsck->li_bottom = key;
1106
1107         fid->f_seq = FID_SEQ_LOCAL_NAME;
1108         fid->f_oid = 1;
1109         fid->f_ver = 0;
1110         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1111         if (rc != 0)
1112                 GOTO(out, rc);
1113
1114         rc = dt_root_get(env, key, fid);
1115         if (rc != 0)
1116                 GOTO(out, rc);
1117
1118         root = dt_locate(env, lfsck->li_bottom, fid);
1119         if (IS_ERR(root))
1120                 GOTO(out, rc = PTR_ERR(root));
1121
1122         lfsck->li_local_root = root;
1123         dt_try_as_dir(env, root);
1124         if (master) {
1125                 lfsck->li_master = 1;
1126                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1127                         rc = dt_lookup(env, root,
1128                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1129                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1130                         if (rc != 0)
1131                                 GOTO(out, rc);
1132                 }
1133         }
1134
1135         fid->f_seq = FID_SEQ_LOCAL_FILE;
1136         fid->f_oid = OTABLE_IT_OID;
1137         fid->f_ver = 0;
1138         obj = dt_locate(env, lfsck->li_bottom, fid);
1139         if (IS_ERR(obj))
1140                 GOTO(out, rc = PTR_ERR(obj));
1141
1142         lfsck->li_obj_oit = obj;
1143         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1144         if (rc != 0) {
1145                 if (rc == -ENOTSUPP)
1146                         GOTO(add, rc = 0);
1147
1148                 GOTO(out, rc);
1149         }
1150
1151         rc = lfsck_bookmark_setup(env, lfsck);
1152         if (rc != 0)
1153                 GOTO(out, rc);
1154
1155         if (master) {
1156                 rc = lfsck_namespace_setup(env, lfsck);
1157                 if (rc < 0)
1158                         GOTO(out, rc);
1159         }
1160
1161         /* XXX: more LFSCK components initialization to be added here. */
1162
1163 add:
1164         rc = lfsck_instance_add(lfsck);
1165 out:
1166         if (rc != 0)
1167                 lfsck_instance_cleanup(env, lfsck);
1168         return rc;
1169 }
1170 EXPORT_SYMBOL(lfsck_register);
1171
1172 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1173 {
1174         struct lfsck_instance *lfsck;
1175
1176         lfsck = lfsck_instance_find(key, false, true);
1177         if (lfsck != NULL)
1178                 lfsck_instance_put(env, lfsck);
1179 }
1180 EXPORT_SYMBOL(lfsck_degister);
1181
1182 static int __init lfsck_init(void)
1183 {
1184         int rc;
1185
1186         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1187         rc = lu_context_key_register(&lfsck_thread_key);
1188         return rc;
1189 }
1190
1191 static void __exit lfsck_exit(void)
1192 {
1193         LASSERT(cfs_list_empty(&lfsck_instance_list));
1194
1195         lu_context_key_degister(&lfsck_thread_key);
1196 }
1197
1198 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1199 MODULE_DESCRIPTION("LFSCK");
1200 MODULE_LICENSE("GPL");
1201
1202 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);