Whamcloud - gitweb
LU-1267 lfsck: rebuild LAST_ID
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         OBD_FREE_PTR(info);
55 }
56
57 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
58 LU_KEY_INIT_GENERIC(lfsck);
59
60 static CFS_LIST_HEAD(lfsck_instance_list);
61 static DEFINE_SPINLOCK(lfsck_instance_lock);
62
63 static const char *lfsck_status_names[] = {
64         [LS_INIT]               = "init",
65         [LS_SCANNING_PHASE1]    = "scanning-phase1",
66         [LS_SCANNING_PHASE2]    = "scanning-phase2",
67         [LS_COMPLETED]          = "completed",
68         [LS_FAILED]             = "failed",
69         [LS_STOPPED]            = "stopped",
70         [LS_PAUSED]             = "paused",
71         [LS_CRASHED]            = "crashed",
72         [LS_PARTIAL]            = "partial"
73 };
74
75 const char *lfsck_flags_names[] = {
76         "scanned-once",
77         "inconsistent",
78         "upgrade",
79         "incomplete",
80         "crashed_lastid",
81         NULL
82 };
83
84 const char *lfsck_param_names[] = {
85         NULL,
86         "failout",
87         "dryrun",
88         NULL
89 };
90
91 const char *lfsck_status2names(enum lfsck_status status)
92 {
93         if (unlikely(status < 0 || status >= LS_MAX))
94                 return "unknown";
95
96         return lfsck_status_names[status];
97 }
98
99 static inline struct lfsck_component *
100 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
101 {
102         struct lfsck_component *com;
103
104         cfs_list_for_each_entry(com, list, lc_link) {
105                 if (com->lc_type == type)
106                         return com;
107         }
108         return NULL;
109 }
110
111 static struct lfsck_component *
112 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
113 {
114         struct lfsck_component *com;
115
116         spin_lock(&lfsck->li_lock);
117         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
118         if (com != NULL)
119                 goto unlock;
120
121         com = __lfsck_component_find(lfsck, type,
122                                      &lfsck->li_list_double_scan);
123         if (com != NULL)
124                 goto unlock;
125
126         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
127
128 unlock:
129         if (com != NULL)
130                 lfsck_component_get(com);
131         spin_unlock(&lfsck->li_lock);
132         return com;
133 }
134
135 void lfsck_component_cleanup(const struct lu_env *env,
136                              struct lfsck_component *com)
137 {
138         if (!cfs_list_empty(&com->lc_link))
139                 cfs_list_del_init(&com->lc_link);
140         if (!cfs_list_empty(&com->lc_link_dir))
141                 cfs_list_del_init(&com->lc_link_dir);
142
143         lfsck_component_put(env, com);
144 }
145
146 void lfsck_instance_cleanup(const struct lu_env *env,
147                             struct lfsck_instance *lfsck)
148 {
149         struct ptlrpc_thread    *thread = &lfsck->li_thread;
150         struct lfsck_component  *com;
151         ENTRY;
152
153         LASSERT(list_empty(&lfsck->li_link));
154         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
155
156         if (lfsck->li_obj_oit != NULL) {
157                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
158                 lfsck->li_obj_oit = NULL;
159         }
160
161         LASSERT(lfsck->li_obj_dir == NULL);
162
163         while (!cfs_list_empty(&lfsck->li_list_scan)) {
164                 com = cfs_list_entry(lfsck->li_list_scan.next,
165                                      struct lfsck_component,
166                                      lc_link);
167                 lfsck_component_cleanup(env, com);
168         }
169
170         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
171
172         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
173                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
174                                      struct lfsck_component,
175                                      lc_link);
176                 lfsck_component_cleanup(env, com);
177         }
178
179         while (!cfs_list_empty(&lfsck->li_list_idle)) {
180                 com = cfs_list_entry(lfsck->li_list_idle.next,
181                                      struct lfsck_component,
182                                      lc_link);
183                 lfsck_component_cleanup(env, com);
184         }
185
186         if (lfsck->li_bookmark_obj != NULL) {
187                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
188                 lfsck->li_bookmark_obj = NULL;
189         }
190
191         if (lfsck->li_los != NULL) {
192                 local_oid_storage_fini(env, lfsck->li_los);
193                 lfsck->li_los = NULL;
194         }
195
196         OBD_FREE_PTR(lfsck);
197 }
198
199 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
200                                                          bool ref, bool unlink)
201 {
202         struct lfsck_instance *lfsck;
203
204         spin_lock(&lfsck_instance_lock);
205         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
206                 if (lfsck->li_bottom == key) {
207                         if (ref)
208                                 lfsck_instance_get(lfsck);
209                         if (unlink)
210                                 list_del_init(&lfsck->li_link);
211                         spin_unlock(&lfsck_instance_lock);
212                         return lfsck;
213                 }
214         }
215         spin_unlock(&lfsck_instance_lock);
216         return NULL;
217 }
218
219 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
220 {
221         struct lfsck_instance *tmp;
222
223         spin_lock(&lfsck_instance_lock);
224         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
225                 if (lfsck->li_bottom == tmp->li_bottom) {
226                         spin_unlock(&lfsck_instance_lock);
227                         return -EEXIST;
228                 }
229         }
230
231         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
232         spin_unlock(&lfsck_instance_lock);
233         return 0;
234 }
235
236 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
237                     const char *prefix)
238 {
239         int save = *len;
240         int flag;
241         int rc;
242         int i;
243
244         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
245         if (rc <= 0)
246                 return -ENOSPC;
247
248         *buf += rc;
249         *len -= rc;
250         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
251                 if (flag & bits) {
252                         bits &= ~flag;
253                         if (names[i] != NULL) {
254                                 rc = snprintf(*buf, *len, "%s%c", names[i],
255                                               bits != 0 ? ',' : '\n');
256                                 if (rc <= 0)
257                                         return -ENOSPC;
258
259                                 *buf += rc;
260                                 *len -= rc;
261                         }
262                 }
263         }
264         return save - *len;
265 }
266
267 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
268 {
269         int rc;
270
271         if (time != 0)
272                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
273                               cfs_time_current_sec() - time);
274         else
275                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
276         if (rc <= 0)
277                 return -ENOSPC;
278
279         *buf += rc;
280         *len -= rc;
281         return rc;
282 }
283
284 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
285                    const char *prefix)
286 {
287         int rc;
288
289         if (fid_is_zero(&pos->lp_dir_parent)) {
290                 if (pos->lp_oit_cookie == 0)
291                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
292                                       prefix);
293                 else
294                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
295                                       prefix, pos->lp_oit_cookie);
296         } else {
297                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
298                               prefix, pos->lp_oit_cookie,
299                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
300         }
301         if (rc <= 0)
302                 return -ENOSPC;
303
304         *buf += rc;
305         *len -= rc;
306         return rc;
307 }
308
309 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
310                     struct lfsck_position *pos, bool init)
311 {
312         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
313
314         if (unlikely(lfsck->li_di_oit == NULL)) {
315                 memset(pos, 0, sizeof(*pos));
316                 return;
317         }
318
319         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
320         if (!lfsck->li_current_oit_processed && !init)
321                 pos->lp_oit_cookie--;
322
323         LASSERT(pos->lp_oit_cookie > 0);
324
325         if (lfsck->li_di_dir != NULL) {
326                 struct dt_object *dto = lfsck->li_obj_dir;
327
328                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
329                                                         lfsck->li_di_dir);
330
331                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
332                         fid_zero(&pos->lp_dir_parent);
333                         pos->lp_dir_cookie = 0;
334                 } else {
335                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
336                 }
337         } else {
338                 fid_zero(&pos->lp_dir_parent);
339                 pos->lp_dir_cookie = 0;
340         }
341 }
342
343 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
344 {
345         lfsck->li_bookmark_ram.lb_speed_limit = limit;
346         if (limit != LFSCK_SPEED_NO_LIMIT) {
347                 if (limit > HZ) {
348                         lfsck->li_sleep_rate = limit / HZ;
349                         lfsck->li_sleep_jif = 1;
350                 } else {
351                         lfsck->li_sleep_rate = 1;
352                         lfsck->li_sleep_jif = HZ / limit;
353                 }
354         } else {
355                 lfsck->li_sleep_jif = 0;
356                 lfsck->li_sleep_rate = 0;
357         }
358 }
359
360 void lfsck_control_speed(struct lfsck_instance *lfsck)
361 {
362         struct ptlrpc_thread *thread = &lfsck->li_thread;
363         struct l_wait_info    lwi;
364
365         if (lfsck->li_sleep_jif > 0 &&
366             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
367                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
368                                        LWI_ON_SIGNAL_NOOP, NULL);
369
370                 l_wait_event(thread->t_ctl_waitq,
371                              !thread_is_running(thread),
372                              &lwi);
373                 lfsck->li_new_scanned = 0;
374         }
375 }
376
377 void lfsck_control_speed_by_self(struct lfsck_component *com)
378 {
379         struct lfsck_instance   *lfsck  = com->lc_lfsck;
380         struct ptlrpc_thread    *thread = &lfsck->li_thread;
381         struct l_wait_info       lwi;
382
383         if (lfsck->li_sleep_jif > 0 &&
384             com->lc_new_scanned >= lfsck->li_sleep_rate) {
385                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
386                                        LWI_ON_SIGNAL_NOOP, NULL);
387
388                 l_wait_event(thread->t_ctl_waitq,
389                              !thread_is_running(thread),
390                              &lwi);
391                 com->lc_new_scanned = 0;
392         }
393 }
394
395 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
396                             struct lu_fid *fid)
397 {
398         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
399                      !dt_try_as_dir(env, obj)))
400                 return -ENOTDIR;
401
402         return dt_lookup(env, obj, (struct dt_rec *)fid,
403                          (const struct dt_key *)"..", BYPASS_CAPA);
404 }
405
406 static int lfsck_needs_scan_dir(const struct lu_env *env,
407                                 struct lfsck_instance *lfsck,
408                                 struct dt_object *obj)
409 {
410         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
411         int            depth = 0;
412         int            rc;
413
414         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
415             cfs_list_empty(&lfsck->li_list_dir))
416                RETURN(0);
417
418         while (1) {
419                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
420                  *      which is the agent directory to manage the objects
421                  *      which name entries reside on remote MDTs. Related
422                  *      consistency verification will be processed in LFSCK
423                  *      phase III. */
424                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
425                         if (depth > 0)
426                                 lfsck_object_put(env, obj);
427                         return 1;
428                 }
429
430                 /* .lustre doesn't contain "real" user objects, no need lfsck */
431                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
432                         if (depth > 0)
433                                 lfsck_object_put(env, obj);
434                         return 0;
435                 }
436
437                 dt_read_lock(env, obj, MOR_TGT_CHILD);
438                 if (unlikely(lfsck_is_dead_obj(obj))) {
439                         dt_read_unlock(env, obj);
440                         if (depth > 0)
441                                 lfsck_object_put(env, obj);
442                         return 0;
443                 }
444
445                 rc = dt_xattr_get(env, obj,
446                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
447                                   BYPASS_CAPA);
448                 dt_read_unlock(env, obj);
449                 if (rc >= 0) {
450                         if (depth > 0)
451                                 lfsck_object_put(env, obj);
452                         return 1;
453                 }
454
455                 if (rc < 0 && rc != -ENODATA) {
456                         if (depth > 0)
457                                 lfsck_object_put(env, obj);
458                         return rc;
459                 }
460
461                 rc = lfsck_parent_fid(env, obj, fid);
462                 if (depth > 0)
463                         lfsck_object_put(env, obj);
464                 if (rc != 0)
465                         return rc;
466
467                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
468                         return 0;
469
470                 obj = lfsck_object_find(env, lfsck, fid);
471                 if (obj == NULL)
472                         return 0;
473                 else if (IS_ERR(obj))
474                         return PTR_ERR(obj);
475
476                 if (!dt_object_exists(obj)) {
477                         lfsck_object_put(env, obj);
478                         return 0;
479                 }
480
481                 /* Currently, only client visible directory can be remote. */
482                 if (dt_object_remote(obj)) {
483                         lfsck_object_put(env, obj);
484                         return 1;
485                 }
486
487                 depth++;
488         }
489         return 0;
490 }
491
492 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
493                                                  struct lfsck_component *com)
494 {
495         struct lfsck_thread_args *lta;
496         int                       rc;
497
498         OBD_ALLOC_PTR(lta);
499         if (lta == NULL)
500                 return ERR_PTR(-ENOMEM);
501
502         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
503         if (rc != 0) {
504                 OBD_FREE_PTR(lta);
505                 return ERR_PTR(rc);
506         }
507
508         lta->lta_lfsck = lfsck_instance_get(lfsck);
509         if (com != NULL)
510                 lta->lta_com = lfsck_component_get(com);
511
512         return lta;
513 }
514
515 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
516 {
517         if (lta->lta_com != NULL)
518                 lfsck_component_put(&lta->lta_env, lta->lta_com);
519         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
520         lu_env_fini(&lta->lta_env);
521         OBD_FREE_PTR(lta);
522 }
523
524 /* LFSCK wrap functions */
525
526 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
527                 bool new_checked)
528 {
529         struct lfsck_component *com;
530
531         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
532                 com->lc_ops->lfsck_fail(env, com, new_checked);
533         }
534 }
535
536 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
537 {
538         struct lfsck_component *com;
539         int                     rc  = 0;
540         int                     rc1 = 0;
541
542         if (likely(cfs_time_beforeq(cfs_time_current(),
543                                     lfsck->li_time_next_checkpoint)))
544                 return 0;
545
546         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
547         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
548                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
549                 if (rc != 0)
550                         rc1 = rc;
551         }
552
553         lfsck->li_time_last_checkpoint = cfs_time_current();
554         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
555                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
556         return rc1 != 0 ? rc1 : rc;
557 }
558
559 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
560 {
561         struct dt_object       *obj     = NULL;
562         struct lfsck_component *com;
563         struct lfsck_component *next;
564         struct lfsck_position  *pos     = NULL;
565         const struct dt_it_ops *iops    =
566                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
567         struct dt_it           *di;
568         int                     rc;
569         ENTRY;
570
571         LASSERT(lfsck->li_obj_dir == NULL);
572         LASSERT(lfsck->li_di_dir == NULL);
573
574         lfsck->li_current_oit_processed = 0;
575         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
576                 com->lc_new_checked = 0;
577                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
578                         com->lc_journal = 0;
579
580                 rc = com->lc_ops->lfsck_prep(env, com);
581                 if (rc != 0)
582                         GOTO(out, rc);
583
584                 if ((pos == NULL) ||
585                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
586                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
587                         pos = &com->lc_pos_start;
588         }
589
590         /* Init otable-based iterator. */
591         if (pos == NULL) {
592                 rc = iops->load(env, lfsck->li_di_oit, 0);
593                 if (rc > 0) {
594                         lfsck->li_oit_over = 1;
595                         rc = 0;
596                 }
597
598                 GOTO(out, rc);
599         }
600
601         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
602         if (rc < 0)
603                 GOTO(out, rc);
604         else if (rc > 0)
605                 lfsck->li_oit_over = 1;
606
607         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
608                 GOTO(out, rc = 0);
609
610         /* Find the directory for namespace-based traverse. */
611         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
612         if (obj == NULL)
613                 GOTO(out, rc = 0);
614         else if (IS_ERR(obj))
615                 RETURN(PTR_ERR(obj));
616
617         /* XXX: Currently, skip remote object, the consistency for
618          *      remote object will be processed in LFSCK phase III. */
619         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
620             unlikely(!S_ISDIR(lfsck_object_type(obj))))
621                 GOTO(out, rc = 0);
622
623         if (unlikely(!dt_try_as_dir(env, obj)))
624                 GOTO(out, rc = -ENOTDIR);
625
626         /* Init the namespace-based directory traverse. */
627         iops = &obj->do_index_ops->dio_it;
628         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
629         if (IS_ERR(di))
630                 GOTO(out, rc = PTR_ERR(di));
631
632         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
633
634         rc = iops->load(env, di, pos->lp_dir_cookie);
635         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
636                 rc = iops->next(env, di);
637         else if (rc > 0)
638                 rc = 0;
639
640         if (rc != 0) {
641                 iops->put(env, di);
642                 iops->fini(env, di);
643                 GOTO(out, rc);
644         }
645
646         lfsck->li_obj_dir = lfsck_object_get(obj);
647         lfsck->li_cookie_dir = iops->store(env, di);
648         spin_lock(&lfsck->li_lock);
649         lfsck->li_di_dir = di;
650         spin_unlock(&lfsck->li_lock);
651
652         GOTO(out, rc = 0);
653
654 out:
655         if (obj != NULL)
656                 lfsck_object_put(env, obj);
657
658         if (rc < 0) {
659                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
660                                              lc_link)
661                         com->lc_ops->lfsck_post(env, com, rc, true);
662
663                 return rc;
664         }
665
666         rc = 0;
667         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
668         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
669                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
670                 if (rc != 0)
671                         break;
672         }
673
674         lfsck->li_time_last_checkpoint = cfs_time_current();
675         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
676                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
677         return rc;
678 }
679
680 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
681                    struct dt_object *obj)
682 {
683         struct lfsck_component *com;
684         const struct dt_it_ops *iops;
685         struct dt_it           *di;
686         int                     rc;
687         ENTRY;
688
689         LASSERT(lfsck->li_obj_dir == NULL);
690
691         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
692                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
693                 if (rc != 0)
694                         RETURN(rc);
695         }
696
697         rc = lfsck_needs_scan_dir(env, lfsck, obj);
698         if (rc <= 0)
699                 GOTO(out, rc);
700
701         if (unlikely(!dt_try_as_dir(env, obj)))
702                 GOTO(out, rc = -ENOTDIR);
703
704         iops = &obj->do_index_ops->dio_it;
705         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
706         if (IS_ERR(di))
707                 GOTO(out, rc = PTR_ERR(di));
708
709         rc = iops->load(env, di, 0);
710         if (rc == 0)
711                 rc = iops->next(env, di);
712         else if (rc > 0)
713                 rc = 0;
714
715         if (rc != 0) {
716                 iops->put(env, di);
717                 iops->fini(env, di);
718                 GOTO(out, rc);
719         }
720
721         lfsck->li_obj_dir = lfsck_object_get(obj);
722         lfsck->li_cookie_dir = iops->store(env, di);
723         spin_lock(&lfsck->li_lock);
724         lfsck->li_di_dir = di;
725         spin_unlock(&lfsck->li_lock);
726
727         GOTO(out, rc = 0);
728
729 out:
730         if (rc < 0)
731                 lfsck_fail(env, lfsck, false);
732         return (rc > 0 ? 0 : rc);
733 }
734
735 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
736                    struct dt_object *obj, struct lu_dirent *ent)
737 {
738         struct lfsck_component *com;
739         int                     rc;
740
741         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
742                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
743                 if (rc != 0)
744                         return rc;
745         }
746         return 0;
747 }
748
749 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
750                int result)
751 {
752         struct lfsck_component *com;
753         struct lfsck_component *next;
754         int                     rc  = 0;
755         int                     rc1 = 0;
756
757         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
758         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
759                 rc = com->lc_ops->lfsck_post(env, com, result, false);
760                 if (rc != 0)
761                         rc1 = rc;
762         }
763
764         lfsck->li_time_last_checkpoint = cfs_time_current();
765         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
766                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
767
768         /* Ignore some component post failure to make other can go ahead. */
769         return result;
770 }
771
772 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
773 {
774         struct lfsck_component *com;
775         struct lfsck_component *next;
776         int                     rc;
777
778         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
779                                      lc_link) {
780                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
781                         com->lc_journal = 0;
782
783                 rc = com->lc_ops->lfsck_double_scan(env, com);
784                 if (rc != 0)
785                         return rc;
786         }
787         return 0;
788 }
789
790 /* external interfaces */
791
792 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
793 {
794         struct lu_env           env;
795         struct lfsck_instance  *lfsck;
796         int                     rc;
797         ENTRY;
798
799         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
800         if (rc != 0)
801                 RETURN(rc);
802
803         lfsck = lfsck_instance_find(key, true, false);
804         if (likely(lfsck != NULL)) {
805                 rc = snprintf(buf, len, "%u\n",
806                               lfsck->li_bookmark_ram.lb_speed_limit);
807                 lfsck_instance_put(&env, lfsck);
808         } else {
809                 rc = -ENODEV;
810         }
811
812         lu_env_fini(&env);
813
814         RETURN(rc);
815 }
816 EXPORT_SYMBOL(lfsck_get_speed);
817
818 int lfsck_set_speed(struct dt_device *key, int val)
819 {
820         struct lu_env           env;
821         struct lfsck_instance  *lfsck;
822         int                     rc;
823         ENTRY;
824
825         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
826         if (rc != 0)
827                 RETURN(rc);
828
829         lfsck = lfsck_instance_find(key, true, false);
830         if (likely(lfsck != NULL)) {
831                 mutex_lock(&lfsck->li_mutex);
832                 __lfsck_set_speed(lfsck, val);
833                 rc = lfsck_bookmark_store(&env, lfsck);
834                 mutex_unlock(&lfsck->li_mutex);
835                 lfsck_instance_put(&env, lfsck);
836         } else {
837                 rc = -ENODEV;
838         }
839
840         lu_env_fini(&env);
841
842         RETURN(rc);
843 }
844 EXPORT_SYMBOL(lfsck_set_speed);
845
846 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
847 {
848         struct lu_env           env;
849         struct lfsck_instance  *lfsck;
850         struct lfsck_component *com;
851         int                     rc;
852         ENTRY;
853
854         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
855         if (rc != 0)
856                 RETURN(rc);
857
858         lfsck = lfsck_instance_find(key, true, false);
859         if (likely(lfsck != NULL)) {
860                 com = lfsck_component_find(lfsck, type);
861                 if (likely(com != NULL)) {
862                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
863                         lfsck_component_put(&env, com);
864                 } else {
865                         rc = -ENOTSUPP;
866                 }
867
868                 lfsck_instance_put(&env, lfsck);
869         } else {
870                 rc = -ENODEV;
871         }
872
873         lu_env_fini(&env);
874
875         RETURN(rc);
876 }
877 EXPORT_SYMBOL(lfsck_dump);
878
879 int lfsck_start(const struct lu_env *env, struct dt_device *key,
880                 struct lfsck_start_param *lsp)
881 {
882         struct lfsck_start              *start  = lsp->lsp_start;
883         struct lfsck_instance           *lfsck;
884         struct lfsck_bookmark           *bk;
885         struct ptlrpc_thread            *thread;
886         struct lfsck_component          *com;
887         struct l_wait_info               lwi    = { 0 };
888         struct lfsck_thread_args        *lta;
889         bool                             dirty  = false;
890         long                             rc     = 0;
891         __u16                            valid  = 0;
892         __u16                            flags  = 0;
893         __u16                            type   = 1;
894         ENTRY;
895
896         lfsck = lfsck_instance_find(key, true, false);
897         if (unlikely(lfsck == NULL))
898                 RETURN(-ENODEV);
899
900         /* start == NULL means auto trigger paused LFSCK. */
901         if ((start == NULL) &&
902             (cfs_list_empty(&lfsck->li_list_scan) ||
903              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
904                 GOTO(put, rc = 0);
905
906         bk = &lfsck->li_bookmark_ram;
907         thread = &lfsck->li_thread;
908         mutex_lock(&lfsck->li_mutex);
909         spin_lock(&lfsck->li_lock);
910         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
911                 rc = -EALREADY;
912                 while (start->ls_active != 0) {
913                         if (type & start->ls_active) {
914                                 com = __lfsck_component_find(lfsck, type,
915                                                         &lfsck->li_list_scan);
916                                 if (com == NULL)
917                                         com = __lfsck_component_find(lfsck,
918                                                 type,
919                                                 &lfsck->li_list_double_scan);
920                                 if (com == NULL) {
921                                         rc = -EBUSY;
922                                         break;
923                                 } else {
924                                         start->ls_active &= ~type;
925                                 }
926                         }
927                         type <<= 1;
928                 }
929                 spin_unlock(&lfsck->li_lock);
930                 GOTO(out, rc);
931         }
932         spin_unlock(&lfsck->li_lock);
933
934         lfsck->li_namespace = lsp->lsp_namespace;
935         lfsck->li_paused = 0;
936         lfsck->li_oit_over = 0;
937         lfsck->li_drop_dryrun = 0;
938         lfsck->li_new_scanned = 0;
939
940         /* For auto trigger. */
941         if (start == NULL)
942                 goto trigger;
943
944         start->ls_version = bk->lb_version;
945         if (start->ls_valid & LSV_SPEED_LIMIT) {
946                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
947                 dirty = true;
948         }
949
950         if (start->ls_valid & LSV_ERROR_HANDLE) {
951                 valid |= DOIV_ERROR_HANDLE;
952                 if (start->ls_flags & LPF_FAILOUT)
953                         flags |= DOIF_FAILOUT;
954
955                 if ((start->ls_flags & LPF_FAILOUT) &&
956                     !(bk->lb_param & LPF_FAILOUT)) {
957                         bk->lb_param |= LPF_FAILOUT;
958                         dirty = true;
959                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
960                            (bk->lb_param & LPF_FAILOUT)) {
961                         bk->lb_param &= ~LPF_FAILOUT;
962                         dirty = true;
963                 }
964         }
965
966         if (start->ls_valid & LSV_DRYRUN) {
967                 valid |= DOIV_DRYRUN;
968                 if (start->ls_flags & LPF_DRYRUN)
969                         flags |= DOIF_DRYRUN;
970
971                 if ((start->ls_flags & LPF_DRYRUN) &&
972                     !(bk->lb_param & LPF_DRYRUN)) {
973                         bk->lb_param |= LPF_DRYRUN;
974                         dirty = true;
975                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
976                            (bk->lb_param & LPF_DRYRUN)) {
977                         bk->lb_param &= ~LPF_DRYRUN;
978                         lfsck->li_drop_dryrun = 1;
979                         dirty = true;
980                 }
981         }
982
983         if (dirty) {
984                 rc = lfsck_bookmark_store(env, lfsck);
985                 if (rc != 0)
986                         GOTO(out, rc);
987         }
988
989         if (start->ls_flags & LPF_RESET)
990                 flags |= DOIF_RESET;
991
992         if (start->ls_active != 0) {
993                 struct lfsck_component *next;
994
995                 if (start->ls_active == LFSCK_TYPES_ALL)
996                         start->ls_active = LFSCK_TYPES_SUPPORTED;
997
998                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
999                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
1000                         GOTO(out, rc = -ENOTSUPP);
1001                 }
1002
1003                 cfs_list_for_each_entry_safe(com, next,
1004                                              &lfsck->li_list_scan, lc_link) {
1005                         if (!(com->lc_type & start->ls_active)) {
1006                                 rc = com->lc_ops->lfsck_post(env, com, 0,
1007                                                              false);
1008                                 if (rc != 0)
1009                                         GOTO(out, rc);
1010                         }
1011                 }
1012
1013                 while (start->ls_active != 0) {
1014                         if (type & start->ls_active) {
1015                                 com = __lfsck_component_find(lfsck, type,
1016                                                         &lfsck->li_list_idle);
1017                                 if (com != NULL) {
1018                                         /* The component status will be updated
1019                                          * when its prep() is called later by
1020                                          * the LFSCK main engine. */
1021                                         cfs_list_del_init(&com->lc_link);
1022                                         cfs_list_add_tail(&com->lc_link,
1023                                                           &lfsck->li_list_scan);
1024                                 }
1025                                 start->ls_active &= ~type;
1026                         }
1027                         type <<= 1;
1028                 }
1029         }
1030
1031         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1032                 start->ls_active |= com->lc_type;
1033                 if (flags & DOIF_RESET) {
1034                         rc = com->lc_ops->lfsck_reset(env, com, false);
1035                         if (rc != 0)
1036                                 GOTO(out, rc);
1037                 }
1038         }
1039
1040 trigger:
1041         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1042         if (bk->lb_param & LPF_DRYRUN) {
1043                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1044                 valid |= DOIV_DRYRUN;
1045                 flags |= DOIF_DRYRUN;
1046         }
1047
1048         if (bk->lb_param & LPF_FAILOUT) {
1049                 valid |= DOIV_ERROR_HANDLE;
1050                 flags |= DOIF_FAILOUT;
1051         }
1052
1053         if (!cfs_list_empty(&lfsck->li_list_scan))
1054                 flags |= DOIF_OUTUSED;
1055
1056         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1057         thread_set_flags(thread, 0);
1058         lta = lfsck_thread_args_init(lfsck, NULL);
1059         if (IS_ERR(lta))
1060                 GOTO(out, rc = PTR_ERR(lta));
1061
1062         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
1063         if (IS_ERR_VALUE(rc)) {
1064                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
1065                        lfsck_lfsck2name(lfsck), rc);
1066                 lfsck_thread_args_fini(lta);
1067         } else {
1068                 rc = 0;
1069                 l_wait_event(thread->t_ctl_waitq,
1070                              thread_is_running(thread) ||
1071                              thread_is_stopped(thread),
1072                              &lwi);
1073         }
1074
1075         GOTO(out, rc);
1076
1077 out:
1078         mutex_unlock(&lfsck->li_mutex);
1079 put:
1080         lfsck_instance_put(env, lfsck);
1081         return (rc < 0 ? rc : 0);
1082 }
1083 EXPORT_SYMBOL(lfsck_start);
1084
1085 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1086 {
1087         struct lfsck_instance   *lfsck;
1088         struct ptlrpc_thread    *thread;
1089         struct l_wait_info       lwi    = { 0 };
1090         ENTRY;
1091
1092         lfsck = lfsck_instance_find(key, true, false);
1093         if (unlikely(lfsck == NULL))
1094                 RETURN(-ENODEV);
1095
1096         thread = &lfsck->li_thread;
1097         mutex_lock(&lfsck->li_mutex);
1098         spin_lock(&lfsck->li_lock);
1099         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1100                 spin_unlock(&lfsck->li_lock);
1101                 mutex_unlock(&lfsck->li_mutex);
1102                 lfsck_instance_put(env, lfsck);
1103                 RETURN(-EALREADY);
1104         }
1105
1106         if (pause)
1107                 lfsck->li_paused = 1;
1108         thread_set_flags(thread, SVC_STOPPING);
1109         spin_unlock(&lfsck->li_lock);
1110
1111         wake_up_all(&thread->t_ctl_waitq);
1112         l_wait_event(thread->t_ctl_waitq,
1113                      thread_is_stopped(thread),
1114                      &lwi);
1115         mutex_unlock(&lfsck->li_mutex);
1116         lfsck_instance_put(env, lfsck);
1117
1118         RETURN(0);
1119 }
1120 EXPORT_SYMBOL(lfsck_stop);
1121
1122 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1123                    struct dt_device *next, lfsck_out_notify notify,
1124                    void *notify_data, bool master)
1125 {
1126         struct lfsck_instance   *lfsck;
1127         struct dt_object        *root  = NULL;
1128         struct dt_object        *obj;
1129         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1130         int                      rc;
1131         ENTRY;
1132
1133         lfsck = lfsck_instance_find(key, false, false);
1134         if (unlikely(lfsck != NULL))
1135                 RETURN(-EEXIST);
1136
1137         OBD_ALLOC_PTR(lfsck);
1138         if (lfsck == NULL)
1139                 RETURN(-ENOMEM);
1140
1141         mutex_init(&lfsck->li_mutex);
1142         spin_lock_init(&lfsck->li_lock);
1143         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1144         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1145         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1146         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1147         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1148         atomic_set(&lfsck->li_ref, 1);
1149         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
1150         lfsck->li_out_notify = notify;
1151         lfsck->li_out_notify_data = notify_data;
1152         lfsck->li_next = next;
1153         lfsck->li_bottom = key;
1154
1155         fid->f_seq = FID_SEQ_LOCAL_NAME;
1156         fid->f_oid = 1;
1157         fid->f_ver = 0;
1158         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1159         if (rc != 0)
1160                 GOTO(out, rc);
1161
1162         rc = dt_root_get(env, key, fid);
1163         if (rc != 0)
1164                 GOTO(out, rc);
1165
1166         root = dt_locate(env, lfsck->li_bottom, fid);
1167         if (IS_ERR(root))
1168                 GOTO(out, rc = PTR_ERR(root));
1169
1170         if (unlikely(!dt_try_as_dir(env, root)))
1171                 GOTO(out, rc = -ENOTDIR);
1172
1173         lfsck->li_local_root_fid = *fid;
1174         if (master) {
1175                 lfsck->li_master = 1;
1176                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1177                         rc = dt_lookup(env, root,
1178                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1179                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1180                         if (rc != 0)
1181                                 GOTO(out, rc);
1182                 }
1183         }
1184
1185         fid->f_seq = FID_SEQ_LOCAL_FILE;
1186         fid->f_oid = OTABLE_IT_OID;
1187         fid->f_ver = 0;
1188         obj = dt_locate(env, lfsck->li_bottom, fid);
1189         if (IS_ERR(obj))
1190                 GOTO(out, rc = PTR_ERR(obj));
1191
1192         lfsck->li_obj_oit = obj;
1193         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1194         if (rc != 0) {
1195                 if (rc == -ENOTSUPP)
1196                         GOTO(add, rc = 0);
1197
1198                 GOTO(out, rc);
1199         }
1200
1201         rc = lfsck_bookmark_setup(env, lfsck);
1202         if (rc != 0)
1203                 GOTO(out, rc);
1204
1205         if (master) {
1206                 rc = lfsck_namespace_setup(env, lfsck);
1207                 if (rc < 0)
1208                         GOTO(out, rc);
1209         }
1210
1211         rc = lfsck_layout_setup(env, lfsck);
1212         if (rc < 0)
1213                 GOTO(out, rc);
1214
1215         /* XXX: more LFSCK components initialization to be added here. */
1216
1217 add:
1218         rc = lfsck_instance_add(lfsck);
1219 out:
1220         if (root != NULL && !IS_ERR(root))
1221                 lu_object_put(env, &root->do_lu);
1222         if (rc != 0)
1223                 lfsck_instance_cleanup(env, lfsck);
1224         return rc;
1225 }
1226 EXPORT_SYMBOL(lfsck_register);
1227
1228 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1229 {
1230         struct lfsck_instance *lfsck;
1231
1232         lfsck = lfsck_instance_find(key, false, true);
1233         if (lfsck != NULL)
1234                 lfsck_instance_put(env, lfsck);
1235 }
1236 EXPORT_SYMBOL(lfsck_degister);
1237
1238 static int __init lfsck_init(void)
1239 {
1240         int rc;
1241
1242         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1243         rc = lu_context_key_register(&lfsck_thread_key);
1244         return rc;
1245 }
1246
1247 static void __exit lfsck_exit(void)
1248 {
1249         LASSERT(cfs_list_empty(&lfsck_instance_list));
1250
1251         lu_context_key_degister(&lfsck_thread_key);
1252 }
1253
1254 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1255 MODULE_DESCRIPTION("LFSCK");
1256 MODULE_LICENSE("GPL");
1257
1258 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);