Whamcloud - gitweb
e459c449ad9640a6183ebd10aea387e2d92a4eb0
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <libcfs/list.h>
37 #include <lu_object.h>
38 #include <dt_object.h>
39 #include <lustre_lib.h>
40 #include <lustre_net.h>
41 #include <lustre_lfsck.h>
42 #include <lustre/lustre_lfsck_user.h>
43
44 #include "lfsck_internal.h"
45
46 /* define lfsck thread key */
47 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
48
49 static void lfsck_key_fini(const struct lu_context *ctx,
50                            struct lu_context_key *key, void *data)
51 {
52         struct lfsck_thread_info *info = data;
53
54         lu_buf_free(&info->lti_linkea_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static CFS_LIST_HEAD(lfsck_instance_list);
62 static DEFINE_SPINLOCK(lfsck_instance_lock);
63
64 const char *lfsck_status_names[] = {
65         "init",
66         "scanning-phase1",
67         "scanning-phase2",
68         "completed",
69         "failed",
70         "stopped",
71         "paused",
72         "crashed",
73         NULL
74 };
75
76 const char *lfsck_flags_names[] = {
77         "scanned-once",
78         "inconsistent",
79         "upgrade",
80         NULL
81 };
82
83 const char *lfsck_param_names[] = {
84         "failout",
85         "dryrun",
86         NULL
87 };
88
89 static inline mdsno_t lfsck_dev_idx(struct dt_device *dev)
90 {
91         return dev->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
92 }
93
94 static inline void lfsck_component_get(struct lfsck_component *com)
95 {
96         atomic_inc(&com->lc_ref);
97 }
98
99 static inline void lfsck_component_put(const struct lu_env *env,
100                                        struct lfsck_component *com)
101 {
102         if (atomic_dec_and_test(&com->lc_ref)) {
103                 if (com->lc_obj != NULL)
104                         lu_object_put(env, &com->lc_obj->do_lu);
105                 if (com->lc_file_ram != NULL)
106                         OBD_FREE(com->lc_file_ram, com->lc_file_size);
107                 if (com->lc_file_disk != NULL)
108                         OBD_FREE(com->lc_file_disk, com->lc_file_size);
109                 OBD_FREE_PTR(com);
110         }
111 }
112
113 static inline struct lfsck_component *
114 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
115 {
116         struct lfsck_component *com;
117
118         cfs_list_for_each_entry(com, list, lc_link) {
119                 if (com->lc_type == type)
120                         return com;
121         }
122         return NULL;
123 }
124
125 static struct lfsck_component *
126 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
127 {
128         struct lfsck_component *com;
129
130         spin_lock(&lfsck->li_lock);
131         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
132         if (com != NULL)
133                 goto unlock;
134
135         com = __lfsck_component_find(lfsck, type,
136                                      &lfsck->li_list_double_scan);
137         if (com != NULL)
138                 goto unlock;
139
140         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
141
142 unlock:
143         if (com != NULL)
144                 lfsck_component_get(com);
145         spin_unlock(&lfsck->li_lock);
146         return com;
147 }
148
149 void lfsck_component_cleanup(const struct lu_env *env,
150                              struct lfsck_component *com)
151 {
152         if (!cfs_list_empty(&com->lc_link))
153                 cfs_list_del_init(&com->lc_link);
154         if (!cfs_list_empty(&com->lc_link_dir))
155                 cfs_list_del_init(&com->lc_link_dir);
156
157         lfsck_component_put(env, com);
158 }
159
160 static void lfsck_instance_cleanup(const struct lu_env *env,
161                                    struct lfsck_instance *lfsck)
162 {
163         struct ptlrpc_thread    *thread = &lfsck->li_thread;
164         struct lfsck_component  *com;
165         ENTRY;
166
167         LASSERT(list_empty(&lfsck->li_link));
168         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
169
170         if (lfsck->li_obj_oit != NULL) {
171                 lu_object_put(env, &lfsck->li_obj_oit->do_lu);
172                 lfsck->li_obj_oit = NULL;
173         }
174
175         LASSERT(lfsck->li_obj_dir == NULL);
176
177         while (!cfs_list_empty(&lfsck->li_list_scan)) {
178                 com = cfs_list_entry(lfsck->li_list_scan.next,
179                                      struct lfsck_component,
180                                      lc_link);
181                 lfsck_component_cleanup(env, com);
182         }
183
184         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
185
186         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
187                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
188                                      struct lfsck_component,
189                                      lc_link);
190                 lfsck_component_cleanup(env, com);
191         }
192
193         while (!cfs_list_empty(&lfsck->li_list_idle)) {
194                 com = cfs_list_entry(lfsck->li_list_idle.next,
195                                      struct lfsck_component,
196                                      lc_link);
197                 lfsck_component_cleanup(env, com);
198         }
199
200         if (lfsck->li_bookmark_obj != NULL) {
201                 lu_object_put(env, &lfsck->li_bookmark_obj->do_lu);
202                 lfsck->li_bookmark_obj = NULL;
203         }
204
205         if (lfsck->li_los != NULL) {
206                 local_oid_storage_fini(env, lfsck->li_los);
207                 lfsck->li_los = NULL;
208         }
209
210         if (lfsck->li_local_root != NULL) {
211                 lu_object_put(env, &lfsck->li_local_root->do_lu);
212                 lfsck->li_local_root = NULL;
213         }
214
215         OBD_FREE_PTR(lfsck);
216 }
217
218 static inline void lfsck_instance_get(struct lfsck_instance *lfsck)
219 {
220         atomic_inc(&lfsck->li_ref);
221 }
222
223 static inline void lfsck_instance_put(const struct lu_env *env,
224                                       struct lfsck_instance *lfsck)
225 {
226         if (atomic_dec_and_test(&lfsck->li_ref))
227                 lfsck_instance_cleanup(env, lfsck);
228 }
229
230 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
231                                                          bool ref, bool unlink)
232 {
233         struct lfsck_instance *lfsck;
234
235         spin_lock(&lfsck_instance_lock);
236         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
237                 if (lfsck->li_bottom == key) {
238                         if (ref)
239                                 lfsck_instance_get(lfsck);
240                         if (unlink)
241                                 list_del_init(&lfsck->li_link);
242                         spin_unlock(&lfsck_instance_lock);
243                         return lfsck;
244                 }
245         }
246         spin_unlock(&lfsck_instance_lock);
247         return NULL;
248 }
249
250 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
251 {
252         struct lfsck_instance *tmp;
253
254         spin_lock(&lfsck_instance_lock);
255         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
256                 if (lfsck->li_bottom == tmp->li_bottom) {
257                         spin_unlock(&lfsck_instance_lock);
258                         return -EEXIST;
259                 }
260         }
261
262         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
263         spin_unlock(&lfsck_instance_lock);
264         return 0;
265 }
266
267 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
268                     const char *prefix)
269 {
270         int save = *len;
271         int flag;
272         int rc;
273         int i;
274
275         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
276         if (rc <= 0)
277                 return -ENOSPC;
278
279         *buf += rc;
280         *len -= rc;
281         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
282                 if (flag & bits) {
283                         bits &= ~flag;
284                         rc = snprintf(*buf, *len, "%s%c", names[i],
285                                       bits != 0 ? ',' : '\n');
286                         if (rc <= 0)
287                                 return -ENOSPC;
288
289                         *buf += rc;
290                         *len -= rc;
291                 }
292         }
293         return save - *len;
294 }
295
296 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
297 {
298         int rc;
299
300         if (time != 0)
301                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
302                               cfs_time_current_sec() - time);
303         else
304                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
305         if (rc <= 0)
306                 return -ENOSPC;
307
308         *buf += rc;
309         *len -= rc;
310         return rc;
311 }
312
313 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
314                    const char *prefix)
315 {
316         int rc;
317
318         if (fid_is_zero(&pos->lp_dir_parent)) {
319                 if (pos->lp_oit_cookie == 0)
320                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
321                                       prefix);
322                 else
323                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
324                                       prefix, pos->lp_oit_cookie);
325         } else {
326                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
327                               prefix, pos->lp_oit_cookie,
328                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
329         }
330         if (rc <= 0)
331                 return -ENOSPC;
332
333         *buf += rc;
334         *len -= rc;
335         return rc;
336 }
337
338 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
339                     struct lfsck_position *pos, bool init)
340 {
341         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
342
343         if (unlikely(lfsck->li_di_oit == NULL)) {
344                 memset(pos, 0, sizeof(*pos));
345                 return;
346         }
347
348         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
349         if (!lfsck->li_current_oit_processed && !init)
350                 pos->lp_oit_cookie--;
351
352         LASSERT(pos->lp_oit_cookie > 0);
353
354         if (lfsck->li_di_dir != NULL) {
355                 struct dt_object *dto = lfsck->li_obj_dir;
356
357                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
358                                                         lfsck->li_di_dir);
359
360                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
361                         fid_zero(&pos->lp_dir_parent);
362                         pos->lp_dir_cookie = 0;
363                 } else {
364                         pos->lp_dir_parent = *lu_object_fid(&dto->do_lu);
365                 }
366         } else {
367                 fid_zero(&pos->lp_dir_parent);
368                 pos->lp_dir_cookie = 0;
369         }
370 }
371
372 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
373 {
374         lfsck->li_bookmark_ram.lb_speed_limit = limit;
375         if (limit != LFSCK_SPEED_NO_LIMIT) {
376                 if (limit > CFS_HZ) {
377                         lfsck->li_sleep_rate = limit / CFS_HZ;
378                         lfsck->li_sleep_jif = 1;
379                 } else {
380                         lfsck->li_sleep_rate = 1;
381                         lfsck->li_sleep_jif = CFS_HZ / limit;
382                 }
383         } else {
384                 lfsck->li_sleep_jif = 0;
385                 lfsck->li_sleep_rate = 0;
386         }
387 }
388
389 void lfsck_control_speed(struct lfsck_instance *lfsck)
390 {
391         struct ptlrpc_thread *thread = &lfsck->li_thread;
392         struct l_wait_info    lwi;
393
394         if (lfsck->li_sleep_jif > 0 &&
395             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
396                 spin_lock(&lfsck->li_lock);
397                 if (likely(lfsck->li_sleep_jif > 0 &&
398                            lfsck->li_new_scanned >= lfsck->li_sleep_rate)) {
399                         lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
400                                                LWI_ON_SIGNAL_NOOP, NULL);
401                         spin_unlock(&lfsck->li_lock);
402
403                         l_wait_event(thread->t_ctl_waitq,
404                                      !thread_is_running(thread),
405                                      &lwi);
406                         lfsck->li_new_scanned = 0;
407                 } else {
408                         spin_unlock(&lfsck->li_lock);
409                 }
410         }
411 }
412
413 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
414                             struct lu_fid *fid)
415 {
416         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
417                      !dt_try_as_dir(env, obj)))
418                 return -ENOTDIR;
419
420         return dt_lookup(env, obj, (struct dt_rec *)fid,
421                          (const struct dt_key *)"..", BYPASS_CAPA);
422 }
423
424 static int lfsck_needs_scan_dir(const struct lu_env *env,
425                                 struct lfsck_instance *lfsck,
426                                 struct dt_object *obj)
427 {
428         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
429         int            depth = 0;
430         int            rc;
431
432         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
433             cfs_list_empty(&lfsck->li_list_dir))
434                RETURN(0);
435
436         while (1) {
437                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
438                  *      which is the agent directory to manage the objects
439                  *      which name entries reside on remote MDTs. Related
440                  *      consistency verification will be processed in LFSCK
441                  *      phase III. */
442                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
443                         if (depth > 0)
444                                 lfsck_object_put(env, obj);
445                         return 1;
446                 }
447
448                 /* .lustre doesn't contain "real" user objects, no need lfsck */
449                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
450                         if (depth > 0)
451                                 lfsck_object_put(env, obj);
452                         return 0;
453                 }
454
455                 dt_read_lock(env, obj, MOR_TGT_CHILD);
456                 if (unlikely(lfsck_is_dead_obj(obj))) {
457                         dt_read_unlock(env, obj);
458                         if (depth > 0)
459                                 lfsck_object_put(env, obj);
460                         return 0;
461                 }
462
463                 rc = dt_xattr_get(env, obj,
464                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
465                                   BYPASS_CAPA);
466                 dt_read_unlock(env, obj);
467                 if (rc >= 0) {
468                         if (depth > 0)
469                                 lfsck_object_put(env, obj);
470                         return 1;
471                 }
472
473                 if (rc < 0 && rc != -ENODATA) {
474                         if (depth > 0)
475                                 lfsck_object_put(env, obj);
476                         return rc;
477                 }
478
479                 rc = lfsck_parent_fid(env, obj, fid);
480                 if (depth > 0)
481                         lfsck_object_put(env, obj);
482                 if (rc != 0)
483                         return rc;
484
485                 if (unlikely(lu_fid_eq(fid,
486                                        lfsck_dto2fid(lfsck->li_local_root))))
487                         return 0;
488
489                 obj = lfsck_object_find(env, lfsck, fid);
490                 if (obj == NULL)
491                         return 0;
492                 else if (IS_ERR(obj))
493                         return PTR_ERR(obj);
494
495                 if (!dt_object_exists(obj)) {
496                         lfsck_object_put(env, obj);
497                         return 0;
498                 }
499
500                 /* Currently, only client visible directory can be remote. */
501                 if (dt_object_remote(obj)) {
502                         lfsck_object_put(env, obj);
503                         return 1;
504                 }
505
506                 depth++;
507         }
508         return 0;
509 }
510
511 /* LFSCK wrap functions */
512
513 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
514                 bool new_checked)
515 {
516         struct lfsck_component *com;
517
518         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
519                 com->lc_ops->lfsck_fail(env, com, new_checked);
520         }
521 }
522
523 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
524 {
525         struct lfsck_component *com;
526         int                     rc;
527
528         if (likely(cfs_time_beforeq(cfs_time_current(),
529                                     lfsck->li_time_next_checkpoint)))
530                 return 0;
531
532         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
533         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
534                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
535                 if (rc != 0)
536                         return rc;;
537         }
538
539         lfsck->li_time_last_checkpoint = cfs_time_current();
540         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
541                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
542         return 0;
543 }
544
545 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
546 {
547         struct dt_object       *obj     = NULL;
548         struct lfsck_component *com;
549         struct lfsck_component *next;
550         struct lfsck_position  *pos     = NULL;
551         const struct dt_it_ops *iops    =
552                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
553         struct dt_it           *di;
554         int                     rc;
555         ENTRY;
556
557         LASSERT(lfsck->li_obj_dir == NULL);
558         LASSERT(lfsck->li_di_dir == NULL);
559
560         lfsck->li_current_oit_processed = 0;
561         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
562                 com->lc_new_checked = 0;
563                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
564                         com->lc_journal = 0;
565
566                 rc = com->lc_ops->lfsck_prep(env, com);
567                 if (rc != 0)
568                         RETURN(rc);
569
570                 if ((pos == NULL) ||
571                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
572                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
573                         pos = &com->lc_pos_start;
574         }
575
576         /* Init otable-based iterator. */
577         if (pos == NULL) {
578                 rc = iops->load(env, lfsck->li_di_oit, 0);
579                 if (rc > 0) {
580                         lfsck->li_oit_over = 1;
581                         rc = 0;
582                 }
583
584                 GOTO(out, rc);
585         }
586
587         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
588         if (rc < 0)
589                 GOTO(out, rc);
590         else if (rc > 0)
591                 lfsck->li_oit_over = 1;
592
593         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
594                 GOTO(out, rc = 0);
595
596         /* Find the directory for namespace-based traverse. */
597         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
598         if (obj == NULL)
599                 GOTO(out, rc = 0);
600         else if (IS_ERR(obj))
601                 RETURN(PTR_ERR(obj));
602
603         /* XXX: Currently, skip remote object, the consistency for
604          *      remote object will be processed in LFSCK phase III. */
605         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
606             unlikely(!S_ISDIR(lfsck_object_type(obj))))
607                 GOTO(out, rc = 0);
608
609         if (unlikely(!dt_try_as_dir(env, obj)))
610                 GOTO(out, rc = -ENOTDIR);
611
612         /* Init the namespace-based directory traverse. */
613         iops = &obj->do_index_ops->dio_it;
614         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
615         if (IS_ERR(di))
616                 GOTO(out, rc = PTR_ERR(di));
617
618         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
619
620         rc = iops->load(env, di, pos->lp_dir_cookie);
621         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
622                 rc = iops->next(env, di);
623         else if (rc > 0)
624                 rc = 0;
625
626         if (rc != 0) {
627                 iops->put(env, di);
628                 iops->fini(env, di);
629                 GOTO(out, rc);
630         }
631
632         lfsck->li_obj_dir = lfsck_object_get(obj);
633         lfsck->li_cookie_dir = iops->store(env, di);
634         spin_lock(&lfsck->li_lock);
635         lfsck->li_di_dir = di;
636         spin_unlock(&lfsck->li_lock);
637
638         GOTO(out, rc = 0);
639
640 out:
641         if (obj != NULL)
642                 lfsck_object_put(env, obj);
643
644         if (rc < 0) {
645                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
646                                              lc_link)
647                         com->lc_ops->lfsck_post(env, com, rc, true);
648
649                 return rc;
650         }
651
652         rc = 0;
653         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
654         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
655                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
656                 if (rc != 0)
657                         break;
658         }
659
660         lfsck->li_time_last_checkpoint = cfs_time_current();
661         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
662                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
663         return rc;
664 }
665
666 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
667                    struct dt_object *obj)
668 {
669         struct lfsck_component *com;
670         const struct dt_it_ops *iops;
671         struct dt_it           *di;
672         int                     rc;
673         ENTRY;
674
675         LASSERT(lfsck->li_obj_dir == NULL);
676
677         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
678                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
679                 if (rc != 0)
680                         RETURN(rc);
681         }
682
683         rc = lfsck_needs_scan_dir(env, lfsck, obj);
684         if (rc <= 0)
685                 GOTO(out, rc);
686
687         if (unlikely(!dt_try_as_dir(env, obj)))
688                 GOTO(out, rc = -ENOTDIR);
689
690         iops = &obj->do_index_ops->dio_it;
691         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
692         if (IS_ERR(di))
693                 GOTO(out, rc = PTR_ERR(di));
694
695         rc = iops->load(env, di, 0);
696         if (rc == 0)
697                 rc = iops->next(env, di);
698         else if (rc > 0)
699                 rc = 0;
700
701         if (rc != 0) {
702                 iops->put(env, di);
703                 iops->fini(env, di);
704                 GOTO(out, rc);
705         }
706
707         lfsck->li_obj_dir = lfsck_object_get(obj);
708         lfsck->li_cookie_dir = iops->store(env, di);
709         spin_lock(&lfsck->li_lock);
710         lfsck->li_di_dir = di;
711         spin_unlock(&lfsck->li_lock);
712
713         GOTO(out, rc = 0);
714
715 out:
716         if (rc < 0)
717                 lfsck_fail(env, lfsck, false);
718         return (rc > 0 ? 0 : rc);
719 }
720
721 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
722                    struct dt_object *obj, struct lu_dirent *ent)
723 {
724         struct lfsck_component *com;
725         int                     rc;
726
727         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
728                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
729                 if (rc != 0)
730                         return rc;
731         }
732         return 0;
733 }
734
735 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
736                int result)
737 {
738         struct lfsck_component *com;
739         struct lfsck_component *next;
740         int                     rc;
741
742         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
743         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
744                 rc = com->lc_ops->lfsck_post(env, com, result, false);
745                 if (rc != 0)
746                         return rc;
747         }
748
749         lfsck->li_time_last_checkpoint = cfs_time_current();
750         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
751                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
752         return result;
753 }
754
755 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
756 {
757         struct lfsck_component *com;
758         struct lfsck_component *next;
759         int                     rc;
760
761         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
762                                      lc_link) {
763                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
764                         com->lc_journal = 0;
765
766                 rc = com->lc_ops->lfsck_double_scan(env, com);
767                 if (rc != 0)
768                         return rc;
769         }
770         return 0;
771 }
772
773 /* external interfaces */
774
775 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
776 {
777         struct lu_env           env;
778         struct lfsck_instance  *lfsck;
779         int                     rc;
780         ENTRY;
781
782         lfsck = lfsck_instance_find(key, true, false);
783         if (unlikely(lfsck == NULL))
784                 RETURN(-ENODEV);
785
786         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
787         if (rc != 0)
788                 GOTO(out, rc);
789
790         rc = snprintf(buf, len, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
791         lu_env_fini(&env);
792
793         GOTO(out, rc);
794
795 out:
796         lfsck_instance_put(&env, lfsck);
797         return rc;
798 }
799 EXPORT_SYMBOL(lfsck_get_speed);
800
801 int lfsck_set_speed(struct dt_device *key, int val)
802 {
803         struct lu_env           env;
804         struct lfsck_instance  *lfsck;
805         int                     rc;
806         ENTRY;
807
808         lfsck = lfsck_instance_find(key, true, false);
809         if (unlikely(lfsck == NULL))
810                 RETURN(-ENODEV);
811
812         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
813         if (rc != 0)
814                 GOTO(out, rc);
815
816         mutex_lock(&lfsck->li_mutex);
817         __lfsck_set_speed(lfsck, val);
818         rc = lfsck_bookmark_store(&env, lfsck);
819         mutex_unlock(&lfsck->li_mutex);
820         lu_env_fini(&env);
821
822         GOTO(out, rc);
823
824 out:
825         lfsck_instance_put(&env, lfsck);
826         return rc;
827 }
828 EXPORT_SYMBOL(lfsck_set_speed);
829
830 int lfsck_dump(struct dt_device *key, void *buf, int len, __u16 type)
831 {
832         struct lu_env           env;
833         struct lfsck_instance  *lfsck;
834         struct lfsck_component *com   = NULL;
835         int                     rc;
836         ENTRY;
837
838         lfsck = lfsck_instance_find(key, true, false);
839         if (unlikely(lfsck == NULL))
840                 RETURN(-ENODEV);
841
842         com = lfsck_component_find(lfsck, type);
843         if (com == NULL)
844                 GOTO(out, rc = -ENOTSUPP);
845
846         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
847         if (rc != 0)
848                 GOTO(out, rc);
849
850         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
851         lu_env_fini(&env);
852
853         GOTO(out, rc);
854
855 out:
856         if (com != NULL)
857                 lfsck_component_put(&env, com);
858         lfsck_instance_put(&env, lfsck);
859         return rc;
860 }
861 EXPORT_SYMBOL(lfsck_dump);
862
863 int lfsck_start(const struct lu_env *env, struct dt_device *key,
864                 struct lfsck_start_param *lsp)
865 {
866         struct lfsck_start     *start  = lsp->lsp_start;
867         struct lfsck_instance  *lfsck;
868         struct lfsck_bookmark  *bk;
869         struct ptlrpc_thread   *thread;
870         struct lfsck_component *com;
871         struct l_wait_info      lwi    = { 0 };
872         bool                    dirty  = false;
873         int                     rc     = 0;
874         __u16                   valid  = 0;
875         __u16                   flags  = 0;
876         ENTRY;
877
878         lfsck = lfsck_instance_find(key, true, false);
879         if (unlikely(lfsck == NULL))
880                 RETURN(-ENODEV);
881
882         /* start == NULL means auto trigger paused LFSCK. */
883         if ((start == NULL) &&
884             (cfs_list_empty(&lfsck->li_list_scan) ||
885              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
886                 GOTO(put, rc = 0);
887
888         bk = &lfsck->li_bookmark_ram;
889         thread = &lfsck->li_thread;
890         mutex_lock(&lfsck->li_mutex);
891         spin_lock(&lfsck->li_lock);
892         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
893                 spin_unlock(&lfsck->li_lock);
894                 GOTO(out, rc = -EALREADY);
895         }
896
897         spin_unlock(&lfsck->li_lock);
898
899         lfsck->li_namespace = lsp->lsp_namespace;
900         lfsck->li_paused = 0;
901         lfsck->li_oit_over = 0;
902         lfsck->li_drop_dryrun = 0;
903         lfsck->li_new_scanned = 0;
904
905         /* For auto trigger. */
906         if (start == NULL)
907                 goto trigger;
908
909         start->ls_version = bk->lb_version;
910         if (start->ls_valid & LSV_SPEED_LIMIT) {
911                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
912                 dirty = true;
913         }
914
915         if (start->ls_valid & LSV_ERROR_HANDLE) {
916                 valid |= DOIV_ERROR_HANDLE;
917                 if (start->ls_flags & LPF_FAILOUT)
918                         flags |= DOIF_FAILOUT;
919
920                 if ((start->ls_flags & LPF_FAILOUT) &&
921                     !(bk->lb_param & LPF_FAILOUT)) {
922                         bk->lb_param |= LPF_FAILOUT;
923                         dirty = true;
924                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
925                            (bk->lb_param & LPF_FAILOUT)) {
926                         bk->lb_param &= ~LPF_FAILOUT;
927                         dirty = true;
928                 }
929         }
930
931         if (start->ls_valid & LSV_DRYRUN) {
932                 if ((start->ls_flags & LPF_DRYRUN) &&
933                     !(bk->lb_param & LPF_DRYRUN)) {
934                         bk->lb_param |= LPF_DRYRUN;
935                         dirty = true;
936                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
937                            (bk->lb_param & LPF_DRYRUN)) {
938                         bk->lb_param &= ~LPF_DRYRUN;
939                         lfsck->li_drop_dryrun = 1;
940                         dirty = true;
941                 }
942         }
943
944         if (dirty) {
945                 rc = lfsck_bookmark_store(env, lfsck);
946                 if (rc != 0)
947                         GOTO(out, rc);
948         }
949
950         if (start->ls_flags & LPF_RESET)
951                 flags |= DOIF_RESET;
952
953         if (start->ls_active != 0) {
954                 struct lfsck_component *next;
955                 __u16 type = 1;
956
957                 if (start->ls_active == LFSCK_TYPES_ALL)
958                         start->ls_active = LFSCK_TYPES_SUPPORTED;
959
960                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
961                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
962                         GOTO(out, rc = -ENOTSUPP);
963                 }
964
965                 cfs_list_for_each_entry_safe(com, next,
966                                              &lfsck->li_list_scan, lc_link) {
967                         if (!(com->lc_type & start->ls_active)) {
968                                 rc = com->lc_ops->lfsck_post(env, com, 0,
969                                                              false);
970                                 if (rc != 0)
971                                         GOTO(out, rc);
972                         }
973                 }
974
975                 while (start->ls_active != 0) {
976                         if (type & start->ls_active) {
977                                 com = __lfsck_component_find(lfsck, type,
978                                                         &lfsck->li_list_idle);
979                                 if (com != NULL) {
980                                         /* The component status will be updated
981                                          * when its prep() is called later by
982                                          * the LFSCK main engine. */
983                                         cfs_list_del_init(&com->lc_link);
984                                         cfs_list_add_tail(&com->lc_link,
985                                                           &lfsck->li_list_scan);
986                                 }
987                                 start->ls_active &= ~type;
988                         }
989                         type <<= 1;
990                 }
991         }
992
993         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
994                 start->ls_active |= com->lc_type;
995                 if (flags & DOIF_RESET) {
996                         rc = com->lc_ops->lfsck_reset(env, com, false);
997                         if (rc != 0)
998                                 GOTO(out, rc);
999                 }
1000         }
1001
1002 trigger:
1003         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1004         if (bk->lb_param & LPF_DRYRUN)
1005                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1006
1007         if (bk->lb_param & LPF_FAILOUT) {
1008                 valid |= DOIV_ERROR_HANDLE;
1009                 flags |= DOIF_FAILOUT;
1010         }
1011
1012         if (!cfs_list_empty(&lfsck->li_list_scan))
1013                 flags |= DOIF_OUTUSED;
1014
1015         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1016         thread_set_flags(thread, 0);
1017         if (lfsck->li_master)
1018                 rc = PTR_ERR(kthread_run(lfsck_master_engine, lfsck, "lfsck"));
1019         if (rc < 0)
1020                 CERROR("%s: cannot start LFSCK thread, rc = %d\n",
1021                        lfsck_lfsck2name(lfsck), rc);
1022         else
1023                 l_wait_event(thread->t_ctl_waitq,
1024                              thread_is_running(thread) ||
1025                              thread_is_stopped(thread),
1026                              &lwi);
1027
1028         GOTO(out, rc = 0);
1029
1030 out:
1031         mutex_unlock(&lfsck->li_mutex);
1032 put:
1033         lfsck_instance_put(env, lfsck);
1034         return (rc < 0 ? rc : 0);
1035 }
1036 EXPORT_SYMBOL(lfsck_start);
1037
1038 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1039 {
1040         struct lfsck_instance   *lfsck;
1041         struct ptlrpc_thread    *thread;
1042         struct l_wait_info       lwi    = { 0 };
1043         ENTRY;
1044
1045         lfsck = lfsck_instance_find(key, true, false);
1046         if (unlikely(lfsck == NULL))
1047                 RETURN(-ENODEV);
1048
1049         thread = &lfsck->li_thread;
1050         mutex_lock(&lfsck->li_mutex);
1051         spin_lock(&lfsck->li_lock);
1052         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1053                 spin_unlock(&lfsck->li_lock);
1054                 mutex_unlock(&lfsck->li_mutex);
1055                 lfsck_instance_put(env, lfsck);
1056                 RETURN(-EALREADY);
1057         }
1058
1059         if (pause)
1060                 lfsck->li_paused = 1;
1061         thread_set_flags(thread, SVC_STOPPING);
1062         spin_unlock(&lfsck->li_lock);
1063
1064         cfs_waitq_broadcast(&thread->t_ctl_waitq);
1065         l_wait_event(thread->t_ctl_waitq,
1066                      thread_is_stopped(thread),
1067                      &lwi);
1068         mutex_unlock(&lfsck->li_mutex);
1069         lfsck_instance_put(env, lfsck);
1070
1071         RETURN(0);
1072 }
1073 EXPORT_SYMBOL(lfsck_stop);
1074
1075 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1076                    struct dt_device *next, bool master)
1077 {
1078         struct lfsck_instance   *lfsck;
1079         struct dt_object        *root;
1080         struct dt_object        *obj;
1081         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1082         int                      rc;
1083         ENTRY;
1084
1085         lfsck = lfsck_instance_find(key, false, false);
1086         if (unlikely(lfsck != NULL))
1087                 RETURN(-EEXIST);
1088
1089         OBD_ALLOC_PTR(lfsck);
1090         if (lfsck == NULL)
1091                 RETURN(-ENOMEM);
1092
1093         mutex_init(&lfsck->li_mutex);
1094         spin_lock_init(&lfsck->li_lock);
1095         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1096         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1097         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1098         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1099         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1100         atomic_set(&lfsck->li_ref, 1);
1101         cfs_waitq_init(&lfsck->li_thread.t_ctl_waitq);
1102         lfsck->li_next = next;
1103         lfsck->li_bottom = key;
1104
1105         fid->f_seq = FID_SEQ_LOCAL_NAME;
1106         fid->f_oid = 1;
1107         fid->f_ver = 0;
1108         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1109         if (rc != 0)
1110                 GOTO(out, rc);
1111
1112         rc = dt_root_get(env, key, fid);
1113         if (rc != 0)
1114                 GOTO(out, rc);
1115
1116         root = dt_locate(env, lfsck->li_bottom, fid);
1117         if (IS_ERR(root))
1118                 GOTO(out, rc = PTR_ERR(root));
1119
1120         lfsck->li_local_root = root;
1121         dt_try_as_dir(env, root);
1122         if (master) {
1123                 lfsck->li_master = 1;
1124                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1125                         rc = dt_lookup(env, root,
1126                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1127                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1128                         if (rc != 0)
1129                                 GOTO(out, rc);
1130                 }
1131         }
1132
1133         fid->f_seq = FID_SEQ_LOCAL_FILE;
1134         fid->f_oid = OTABLE_IT_OID;
1135         fid->f_ver = 0;
1136         obj = dt_locate(env, lfsck->li_bottom, fid);
1137         if (IS_ERR(obj))
1138                 GOTO(out, rc = PTR_ERR(obj));
1139
1140         lfsck->li_obj_oit = obj;
1141         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1142         if (rc != 0) {
1143                 if (rc == -ENOTSUPP)
1144                         GOTO(add, rc = 0);
1145
1146                 GOTO(out, rc);
1147         }
1148
1149         rc = lfsck_bookmark_setup(env, lfsck);
1150         if (rc != 0)
1151                 GOTO(out, rc);
1152
1153         if (master) {
1154                 rc = lfsck_namespace_setup(env, lfsck);
1155                 if (rc < 0)
1156                         GOTO(out, rc);
1157         }
1158
1159         /* XXX: more LFSCK components initialization to be added here. */
1160
1161 add:
1162         rc = lfsck_instance_add(lfsck);
1163 out:
1164         if (rc != 0)
1165                 lfsck_instance_cleanup(env, lfsck);
1166         return rc;
1167 }
1168 EXPORT_SYMBOL(lfsck_register);
1169
1170 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1171 {
1172         struct lfsck_instance *lfsck;
1173
1174         lfsck = lfsck_instance_find(key, false, true);
1175         if (lfsck != NULL)
1176                 lfsck_instance_put(env, lfsck);
1177 }
1178 EXPORT_SYMBOL(lfsck_degister);
1179
1180 static int __init lfsck_init(void)
1181 {
1182         int rc;
1183
1184         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1185         rc = lu_context_key_register(&lfsck_thread_key);
1186         return rc;
1187 }
1188
1189 static void __exit lfsck_exit(void)
1190 {
1191         LASSERT(cfs_list_empty(&lfsck_instance_list));
1192
1193         lu_context_key_degister(&lfsck_thread_key);
1194 }
1195
1196 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1197 MODULE_DESCRIPTION("LFSCK");
1198 MODULE_LICENSE("GPL");
1199
1200 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);