Whamcloud - gitweb
LU-1267 lfsck: framework (1) for MDT-OST consistency
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         OBD_FREE_PTR(info);
55 }
56
57 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
58 LU_KEY_INIT_GENERIC(lfsck);
59
60 static CFS_LIST_HEAD(lfsck_instance_list);
61 static DEFINE_SPINLOCK(lfsck_instance_lock);
62
63 static const char *lfsck_status_names[] = {
64         [LS_INIT]               = "init",
65         [LS_SCANNING_PHASE1]    = "scanning-phase1",
66         [LS_SCANNING_PHASE2]    = "scanning-phase2",
67         [LS_COMPLETED]          = "completed",
68         [LS_FAILED]             = "failed",
69         [LS_STOPPED]            = "stopped",
70         [LS_PAUSED]             = "paused",
71         [LS_CRASHED]            = "crashed",
72         [LS_PARTIAL]            = "partial"
73 };
74
75 const char *lfsck_flags_names[] = {
76         "scanned-once",
77         "inconsistent",
78         "upgrade",
79         "incomplete",
80         NULL
81 };
82
83 const char *lfsck_param_names[] = {
84         NULL,
85         "failout",
86         "dryrun",
87         NULL
88 };
89
90 const char *lfsck_status2names(enum lfsck_status status)
91 {
92         if (unlikely(status < 0 || status >= LS_MAX))
93                 return "unknown";
94
95         return lfsck_status_names[status];
96 }
97
98 static inline struct lfsck_component *
99 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
100 {
101         struct lfsck_component *com;
102
103         cfs_list_for_each_entry(com, list, lc_link) {
104                 if (com->lc_type == type)
105                         return com;
106         }
107         return NULL;
108 }
109
110 static struct lfsck_component *
111 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
112 {
113         struct lfsck_component *com;
114
115         spin_lock(&lfsck->li_lock);
116         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
117         if (com != NULL)
118                 goto unlock;
119
120         com = __lfsck_component_find(lfsck, type,
121                                      &lfsck->li_list_double_scan);
122         if (com != NULL)
123                 goto unlock;
124
125         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
126
127 unlock:
128         if (com != NULL)
129                 lfsck_component_get(com);
130         spin_unlock(&lfsck->li_lock);
131         return com;
132 }
133
134 void lfsck_component_cleanup(const struct lu_env *env,
135                              struct lfsck_component *com)
136 {
137         if (!cfs_list_empty(&com->lc_link))
138                 cfs_list_del_init(&com->lc_link);
139         if (!cfs_list_empty(&com->lc_link_dir))
140                 cfs_list_del_init(&com->lc_link_dir);
141
142         lfsck_component_put(env, com);
143 }
144
145 void lfsck_instance_cleanup(const struct lu_env *env,
146                             struct lfsck_instance *lfsck)
147 {
148         struct ptlrpc_thread    *thread = &lfsck->li_thread;
149         struct lfsck_component  *com;
150         ENTRY;
151
152         LASSERT(list_empty(&lfsck->li_link));
153         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
154
155         if (lfsck->li_obj_oit != NULL) {
156                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
157                 lfsck->li_obj_oit = NULL;
158         }
159
160         LASSERT(lfsck->li_obj_dir == NULL);
161
162         while (!cfs_list_empty(&lfsck->li_list_scan)) {
163                 com = cfs_list_entry(lfsck->li_list_scan.next,
164                                      struct lfsck_component,
165                                      lc_link);
166                 lfsck_component_cleanup(env, com);
167         }
168
169         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
170
171         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
172                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
173                                      struct lfsck_component,
174                                      lc_link);
175                 lfsck_component_cleanup(env, com);
176         }
177
178         while (!cfs_list_empty(&lfsck->li_list_idle)) {
179                 com = cfs_list_entry(lfsck->li_list_idle.next,
180                                      struct lfsck_component,
181                                      lc_link);
182                 lfsck_component_cleanup(env, com);
183         }
184
185         if (lfsck->li_bookmark_obj != NULL) {
186                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
187                 lfsck->li_bookmark_obj = NULL;
188         }
189
190         if (lfsck->li_los != NULL) {
191                 local_oid_storage_fini(env, lfsck->li_los);
192                 lfsck->li_los = NULL;
193         }
194
195         OBD_FREE_PTR(lfsck);
196 }
197
198 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
199                                                          bool ref, bool unlink)
200 {
201         struct lfsck_instance *lfsck;
202
203         spin_lock(&lfsck_instance_lock);
204         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
205                 if (lfsck->li_bottom == key) {
206                         if (ref)
207                                 lfsck_instance_get(lfsck);
208                         if (unlink)
209                                 list_del_init(&lfsck->li_link);
210                         spin_unlock(&lfsck_instance_lock);
211                         return lfsck;
212                 }
213         }
214         spin_unlock(&lfsck_instance_lock);
215         return NULL;
216 }
217
218 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
219 {
220         struct lfsck_instance *tmp;
221
222         spin_lock(&lfsck_instance_lock);
223         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
224                 if (lfsck->li_bottom == tmp->li_bottom) {
225                         spin_unlock(&lfsck_instance_lock);
226                         return -EEXIST;
227                 }
228         }
229
230         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
231         spin_unlock(&lfsck_instance_lock);
232         return 0;
233 }
234
235 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
236                     const char *prefix)
237 {
238         int save = *len;
239         int flag;
240         int rc;
241         int i;
242
243         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
244         if (rc <= 0)
245                 return -ENOSPC;
246
247         *buf += rc;
248         *len -= rc;
249         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
250                 if (flag & bits) {
251                         bits &= ~flag;
252                         if (names[i] != NULL) {
253                                 rc = snprintf(*buf, *len, "%s%c", names[i],
254                                               bits != 0 ? ',' : '\n');
255                                 if (rc <= 0)
256                                         return -ENOSPC;
257
258                                 *buf += rc;
259                                 *len -= rc;
260                         }
261                 }
262         }
263         return save - *len;
264 }
265
266 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
267 {
268         int rc;
269
270         if (time != 0)
271                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
272                               cfs_time_current_sec() - time);
273         else
274                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
275         if (rc <= 0)
276                 return -ENOSPC;
277
278         *buf += rc;
279         *len -= rc;
280         return rc;
281 }
282
283 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
284                    const char *prefix)
285 {
286         int rc;
287
288         if (fid_is_zero(&pos->lp_dir_parent)) {
289                 if (pos->lp_oit_cookie == 0)
290                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
291                                       prefix);
292                 else
293                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
294                                       prefix, pos->lp_oit_cookie);
295         } else {
296                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
297                               prefix, pos->lp_oit_cookie,
298                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
299         }
300         if (rc <= 0)
301                 return -ENOSPC;
302
303         *buf += rc;
304         *len -= rc;
305         return rc;
306 }
307
308 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
309                     struct lfsck_position *pos, bool init)
310 {
311         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
312
313         if (unlikely(lfsck->li_di_oit == NULL)) {
314                 memset(pos, 0, sizeof(*pos));
315                 return;
316         }
317
318         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
319         if (!lfsck->li_current_oit_processed && !init)
320                 pos->lp_oit_cookie--;
321
322         LASSERT(pos->lp_oit_cookie > 0);
323
324         if (lfsck->li_di_dir != NULL) {
325                 struct dt_object *dto = lfsck->li_obj_dir;
326
327                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
328                                                         lfsck->li_di_dir);
329
330                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
331                         fid_zero(&pos->lp_dir_parent);
332                         pos->lp_dir_cookie = 0;
333                 } else {
334                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
335                 }
336         } else {
337                 fid_zero(&pos->lp_dir_parent);
338                 pos->lp_dir_cookie = 0;
339         }
340 }
341
342 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
343 {
344         lfsck->li_bookmark_ram.lb_speed_limit = limit;
345         if (limit != LFSCK_SPEED_NO_LIMIT) {
346                 if (limit > HZ) {
347                         lfsck->li_sleep_rate = limit / HZ;
348                         lfsck->li_sleep_jif = 1;
349                 } else {
350                         lfsck->li_sleep_rate = 1;
351                         lfsck->li_sleep_jif = HZ / limit;
352                 }
353         } else {
354                 lfsck->li_sleep_jif = 0;
355                 lfsck->li_sleep_rate = 0;
356         }
357 }
358
359 void lfsck_control_speed(struct lfsck_instance *lfsck)
360 {
361         struct ptlrpc_thread *thread = &lfsck->li_thread;
362         struct l_wait_info    lwi;
363
364         if (lfsck->li_sleep_jif > 0 &&
365             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
366                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
367                                        LWI_ON_SIGNAL_NOOP, NULL);
368
369                 l_wait_event(thread->t_ctl_waitq,
370                              !thread_is_running(thread),
371                              &lwi);
372                 lfsck->li_new_scanned = 0;
373         }
374 }
375
376 void lfsck_control_speed_by_self(struct lfsck_component *com)
377 {
378         struct lfsck_instance   *lfsck  = com->lc_lfsck;
379         struct ptlrpc_thread    *thread = &lfsck->li_thread;
380         struct l_wait_info       lwi;
381
382         if (lfsck->li_sleep_jif > 0 &&
383             com->lc_new_scanned >= lfsck->li_sleep_rate) {
384                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
385                                        LWI_ON_SIGNAL_NOOP, NULL);
386
387                 l_wait_event(thread->t_ctl_waitq,
388                              !thread_is_running(thread),
389                              &lwi);
390                 com->lc_new_scanned = 0;
391         }
392 }
393
394 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
395                             struct lu_fid *fid)
396 {
397         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
398                      !dt_try_as_dir(env, obj)))
399                 return -ENOTDIR;
400
401         return dt_lookup(env, obj, (struct dt_rec *)fid,
402                          (const struct dt_key *)"..", BYPASS_CAPA);
403 }
404
405 static int lfsck_needs_scan_dir(const struct lu_env *env,
406                                 struct lfsck_instance *lfsck,
407                                 struct dt_object *obj)
408 {
409         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
410         int            depth = 0;
411         int            rc;
412
413         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
414             cfs_list_empty(&lfsck->li_list_dir))
415                RETURN(0);
416
417         while (1) {
418                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
419                  *      which is the agent directory to manage the objects
420                  *      which name entries reside on remote MDTs. Related
421                  *      consistency verification will be processed in LFSCK
422                  *      phase III. */
423                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
424                         if (depth > 0)
425                                 lfsck_object_put(env, obj);
426                         return 1;
427                 }
428
429                 /* .lustre doesn't contain "real" user objects, no need lfsck */
430                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
431                         if (depth > 0)
432                                 lfsck_object_put(env, obj);
433                         return 0;
434                 }
435
436                 dt_read_lock(env, obj, MOR_TGT_CHILD);
437                 if (unlikely(lfsck_is_dead_obj(obj))) {
438                         dt_read_unlock(env, obj);
439                         if (depth > 0)
440                                 lfsck_object_put(env, obj);
441                         return 0;
442                 }
443
444                 rc = dt_xattr_get(env, obj,
445                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
446                                   BYPASS_CAPA);
447                 dt_read_unlock(env, obj);
448                 if (rc >= 0) {
449                         if (depth > 0)
450                                 lfsck_object_put(env, obj);
451                         return 1;
452                 }
453
454                 if (rc < 0 && rc != -ENODATA) {
455                         if (depth > 0)
456                                 lfsck_object_put(env, obj);
457                         return rc;
458                 }
459
460                 rc = lfsck_parent_fid(env, obj, fid);
461                 if (depth > 0)
462                         lfsck_object_put(env, obj);
463                 if (rc != 0)
464                         return rc;
465
466                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
467                         return 0;
468
469                 obj = lfsck_object_find(env, lfsck, fid);
470                 if (obj == NULL)
471                         return 0;
472                 else if (IS_ERR(obj))
473                         return PTR_ERR(obj);
474
475                 if (!dt_object_exists(obj)) {
476                         lfsck_object_put(env, obj);
477                         return 0;
478                 }
479
480                 /* Currently, only client visible directory can be remote. */
481                 if (dt_object_remote(obj)) {
482                         lfsck_object_put(env, obj);
483                         return 1;
484                 }
485
486                 depth++;
487         }
488         return 0;
489 }
490
491 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
492                                                  struct lfsck_component *com)
493 {
494         struct lfsck_thread_args *lta;
495         int                       rc;
496
497         OBD_ALLOC_PTR(lta);
498         if (lta == NULL)
499                 return ERR_PTR(-ENOMEM);
500
501         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
502         if (rc != 0) {
503                 OBD_FREE_PTR(lta);
504                 return ERR_PTR(rc);
505         }
506
507         lta->lta_lfsck = lfsck_instance_get(lfsck);
508         if (com != NULL)
509                 lta->lta_com = lfsck_component_get(com);
510
511         return lta;
512 }
513
514 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
515 {
516         if (lta->lta_com != NULL)
517                 lfsck_component_put(&lta->lta_env, lta->lta_com);
518         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
519         lu_env_fini(&lta->lta_env);
520         OBD_FREE_PTR(lta);
521 }
522
523 /* LFSCK wrap functions */
524
525 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
526                 bool new_checked)
527 {
528         struct lfsck_component *com;
529
530         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
531                 com->lc_ops->lfsck_fail(env, com, new_checked);
532         }
533 }
534
535 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
536 {
537         struct lfsck_component *com;
538         int                     rc  = 0;
539         int                     rc1 = 0;
540
541         if (likely(cfs_time_beforeq(cfs_time_current(),
542                                     lfsck->li_time_next_checkpoint)))
543                 return 0;
544
545         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
546         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
547                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
548                 if (rc != 0)
549                         rc1 = rc;
550         }
551
552         lfsck->li_time_last_checkpoint = cfs_time_current();
553         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
554                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
555         return rc1 != 0 ? rc1 : rc;
556 }
557
558 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
559 {
560         struct dt_object       *obj     = NULL;
561         struct lfsck_component *com;
562         struct lfsck_component *next;
563         struct lfsck_position  *pos     = NULL;
564         const struct dt_it_ops *iops    =
565                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
566         struct dt_it           *di;
567         int                     rc;
568         ENTRY;
569
570         LASSERT(lfsck->li_obj_dir == NULL);
571         LASSERT(lfsck->li_di_dir == NULL);
572
573         lfsck->li_current_oit_processed = 0;
574         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
575                 com->lc_new_checked = 0;
576                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
577                         com->lc_journal = 0;
578
579                 rc = com->lc_ops->lfsck_prep(env, com);
580                 if (rc != 0)
581                         GOTO(out, rc);
582
583                 if ((pos == NULL) ||
584                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
585                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
586                         pos = &com->lc_pos_start;
587         }
588
589         /* Init otable-based iterator. */
590         if (pos == NULL) {
591                 rc = iops->load(env, lfsck->li_di_oit, 0);
592                 if (rc > 0) {
593                         lfsck->li_oit_over = 1;
594                         rc = 0;
595                 }
596
597                 GOTO(out, rc);
598         }
599
600         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
601         if (rc < 0)
602                 GOTO(out, rc);
603         else if (rc > 0)
604                 lfsck->li_oit_over = 1;
605
606         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
607                 GOTO(out, rc = 0);
608
609         /* Find the directory for namespace-based traverse. */
610         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
611         if (obj == NULL)
612                 GOTO(out, rc = 0);
613         else if (IS_ERR(obj))
614                 RETURN(PTR_ERR(obj));
615
616         /* XXX: Currently, skip remote object, the consistency for
617          *      remote object will be processed in LFSCK phase III. */
618         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
619             unlikely(!S_ISDIR(lfsck_object_type(obj))))
620                 GOTO(out, rc = 0);
621
622         if (unlikely(!dt_try_as_dir(env, obj)))
623                 GOTO(out, rc = -ENOTDIR);
624
625         /* Init the namespace-based directory traverse. */
626         iops = &obj->do_index_ops->dio_it;
627         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
628         if (IS_ERR(di))
629                 GOTO(out, rc = PTR_ERR(di));
630
631         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
632
633         rc = iops->load(env, di, pos->lp_dir_cookie);
634         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
635                 rc = iops->next(env, di);
636         else if (rc > 0)
637                 rc = 0;
638
639         if (rc != 0) {
640                 iops->put(env, di);
641                 iops->fini(env, di);
642                 GOTO(out, rc);
643         }
644
645         lfsck->li_obj_dir = lfsck_object_get(obj);
646         lfsck->li_cookie_dir = iops->store(env, di);
647         spin_lock(&lfsck->li_lock);
648         lfsck->li_di_dir = di;
649         spin_unlock(&lfsck->li_lock);
650
651         GOTO(out, rc = 0);
652
653 out:
654         if (obj != NULL)
655                 lfsck_object_put(env, obj);
656
657         if (rc < 0) {
658                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
659                                              lc_link)
660                         com->lc_ops->lfsck_post(env, com, rc, true);
661
662                 return rc;
663         }
664
665         rc = 0;
666         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
667         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
668                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
669                 if (rc != 0)
670                         break;
671         }
672
673         lfsck->li_time_last_checkpoint = cfs_time_current();
674         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
675                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
676         return rc;
677 }
678
679 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
680                    struct dt_object *obj)
681 {
682         struct lfsck_component *com;
683         const struct dt_it_ops *iops;
684         struct dt_it           *di;
685         int                     rc;
686         ENTRY;
687
688         LASSERT(lfsck->li_obj_dir == NULL);
689
690         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
691                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
692                 if (rc != 0)
693                         RETURN(rc);
694         }
695
696         rc = lfsck_needs_scan_dir(env, lfsck, obj);
697         if (rc <= 0)
698                 GOTO(out, rc);
699
700         if (unlikely(!dt_try_as_dir(env, obj)))
701                 GOTO(out, rc = -ENOTDIR);
702
703         iops = &obj->do_index_ops->dio_it;
704         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
705         if (IS_ERR(di))
706                 GOTO(out, rc = PTR_ERR(di));
707
708         rc = iops->load(env, di, 0);
709         if (rc == 0)
710                 rc = iops->next(env, di);
711         else if (rc > 0)
712                 rc = 0;
713
714         if (rc != 0) {
715                 iops->put(env, di);
716                 iops->fini(env, di);
717                 GOTO(out, rc);
718         }
719
720         lfsck->li_obj_dir = lfsck_object_get(obj);
721         lfsck->li_cookie_dir = iops->store(env, di);
722         spin_lock(&lfsck->li_lock);
723         lfsck->li_di_dir = di;
724         spin_unlock(&lfsck->li_lock);
725
726         GOTO(out, rc = 0);
727
728 out:
729         if (rc < 0)
730                 lfsck_fail(env, lfsck, false);
731         return (rc > 0 ? 0 : rc);
732 }
733
734 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
735                    struct dt_object *obj, struct lu_dirent *ent)
736 {
737         struct lfsck_component *com;
738         int                     rc;
739
740         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
741                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
742                 if (rc != 0)
743                         return rc;
744         }
745         return 0;
746 }
747
748 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
749                int result)
750 {
751         struct lfsck_component *com;
752         struct lfsck_component *next;
753         int                     rc  = 0;
754         int                     rc1 = 0;
755
756         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
757         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
758                 rc = com->lc_ops->lfsck_post(env, com, result, false);
759                 if (rc != 0)
760                         rc1 = rc;
761         }
762
763         lfsck->li_time_last_checkpoint = cfs_time_current();
764         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
765                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
766
767         /* Ignore some component post failure to make other can go ahead. */
768         return result;
769 }
770
771 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
772 {
773         struct lfsck_component *com;
774         struct lfsck_component *next;
775         int                     rc;
776
777         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
778                                      lc_link) {
779                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
780                         com->lc_journal = 0;
781
782                 rc = com->lc_ops->lfsck_double_scan(env, com);
783                 if (rc != 0)
784                         return rc;
785         }
786         return 0;
787 }
788
789 /* external interfaces */
790
791 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
792 {
793         struct lu_env           env;
794         struct lfsck_instance  *lfsck;
795         int                     rc;
796         ENTRY;
797
798         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
799         if (rc != 0)
800                 RETURN(rc);
801
802         lfsck = lfsck_instance_find(key, true, false);
803         if (likely(lfsck != NULL)) {
804                 rc = snprintf(buf, len, "%u\n",
805                               lfsck->li_bookmark_ram.lb_speed_limit);
806                 lfsck_instance_put(&env, lfsck);
807         } else {
808                 rc = -ENODEV;
809         }
810
811         lu_env_fini(&env);
812
813         RETURN(rc);
814 }
815 EXPORT_SYMBOL(lfsck_get_speed);
816
817 int lfsck_set_speed(struct dt_device *key, int val)
818 {
819         struct lu_env           env;
820         struct lfsck_instance  *lfsck;
821         int                     rc;
822         ENTRY;
823
824         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
825         if (rc != 0)
826                 RETURN(rc);
827
828         lfsck = lfsck_instance_find(key, true, false);
829         if (likely(lfsck != NULL)) {
830                 mutex_lock(&lfsck->li_mutex);
831                 __lfsck_set_speed(lfsck, val);
832                 rc = lfsck_bookmark_store(&env, lfsck);
833                 mutex_unlock(&lfsck->li_mutex);
834                 lfsck_instance_put(&env, lfsck);
835         } else {
836                 rc = -ENODEV;
837         }
838
839         lu_env_fini(&env);
840
841         RETURN(rc);
842 }
843 EXPORT_SYMBOL(lfsck_set_speed);
844
845 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type)
846 {
847         struct lu_env           env;
848         struct lfsck_instance  *lfsck;
849         struct lfsck_component *com;
850         int                     rc;
851         ENTRY;
852
853         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
854         if (rc != 0)
855                 RETURN(rc);
856
857         lfsck = lfsck_instance_find(key, true, false);
858         if (likely(lfsck != NULL)) {
859                 com = lfsck_component_find(lfsck, type);
860                 if (likely(com != NULL)) {
861                         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
862                         lfsck_component_put(&env, com);
863                 } else {
864                         rc = -ENOTSUPP;
865                 }
866
867                 lfsck_instance_put(&env, lfsck);
868         } else {
869                 rc = -ENODEV;
870         }
871
872         lu_env_fini(&env);
873
874         RETURN(rc);
875 }
876 EXPORT_SYMBOL(lfsck_dump);
877
878 int lfsck_start(const struct lu_env *env, struct dt_device *key,
879                 struct lfsck_start_param *lsp)
880 {
881         struct lfsck_start              *start  = lsp->lsp_start;
882         struct lfsck_instance           *lfsck;
883         struct lfsck_bookmark           *bk;
884         struct ptlrpc_thread            *thread;
885         struct lfsck_component          *com;
886         struct l_wait_info               lwi    = { 0 };
887         struct lfsck_thread_args        *lta;
888         bool                             dirty  = false;
889         long                             rc     = 0;
890         __u16                            valid  = 0;
891         __u16                            flags  = 0;
892         __u16                            type   = 1;
893         ENTRY;
894
895         lfsck = lfsck_instance_find(key, true, false);
896         if (unlikely(lfsck == NULL))
897                 RETURN(-ENODEV);
898
899         /* start == NULL means auto trigger paused LFSCK. */
900         if ((start == NULL) &&
901             (cfs_list_empty(&lfsck->li_list_scan) ||
902              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
903                 GOTO(put, rc = 0);
904
905         bk = &lfsck->li_bookmark_ram;
906         thread = &lfsck->li_thread;
907         mutex_lock(&lfsck->li_mutex);
908         spin_lock(&lfsck->li_lock);
909         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
910                 rc = -EALREADY;
911                 while (start->ls_active != 0) {
912                         if (type & start->ls_active) {
913                                 com = __lfsck_component_find(lfsck, type,
914                                                         &lfsck->li_list_scan);
915                                 if (com == NULL)
916                                         com = __lfsck_component_find(lfsck,
917                                                 type,
918                                                 &lfsck->li_list_double_scan);
919                                 if (com == NULL) {
920                                         rc = -EBUSY;
921                                         break;
922                                 } else {
923                                         start->ls_active &= ~type;
924                                 }
925                         }
926                         type <<= 1;
927                 }
928                 spin_unlock(&lfsck->li_lock);
929                 GOTO(out, rc);
930         }
931         spin_unlock(&lfsck->li_lock);
932
933         lfsck->li_namespace = lsp->lsp_namespace;
934         lfsck->li_paused = 0;
935         lfsck->li_oit_over = 0;
936         lfsck->li_drop_dryrun = 0;
937         lfsck->li_new_scanned = 0;
938
939         /* For auto trigger. */
940         if (start == NULL)
941                 goto trigger;
942
943         start->ls_version = bk->lb_version;
944         if (start->ls_valid & LSV_SPEED_LIMIT) {
945                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
946                 dirty = true;
947         }
948
949         if (start->ls_valid & LSV_ERROR_HANDLE) {
950                 valid |= DOIV_ERROR_HANDLE;
951                 if (start->ls_flags & LPF_FAILOUT)
952                         flags |= DOIF_FAILOUT;
953
954                 if ((start->ls_flags & LPF_FAILOUT) &&
955                     !(bk->lb_param & LPF_FAILOUT)) {
956                         bk->lb_param |= LPF_FAILOUT;
957                         dirty = true;
958                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
959                            (bk->lb_param & LPF_FAILOUT)) {
960                         bk->lb_param &= ~LPF_FAILOUT;
961                         dirty = true;
962                 }
963         }
964
965         if (start->ls_valid & LSV_DRYRUN) {
966                 valid |= DOIV_DRYRUN;
967                 if (start->ls_flags & LPF_DRYRUN)
968                         flags |= DOIF_DRYRUN;
969
970                 if ((start->ls_flags & LPF_DRYRUN) &&
971                     !(bk->lb_param & LPF_DRYRUN)) {
972                         bk->lb_param |= LPF_DRYRUN;
973                         dirty = true;
974                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
975                            (bk->lb_param & LPF_DRYRUN)) {
976                         bk->lb_param &= ~LPF_DRYRUN;
977                         lfsck->li_drop_dryrun = 1;
978                         dirty = true;
979                 }
980         }
981
982         if (dirty) {
983                 rc = lfsck_bookmark_store(env, lfsck);
984                 if (rc != 0)
985                         GOTO(out, rc);
986         }
987
988         if (start->ls_flags & LPF_RESET)
989                 flags |= DOIF_RESET;
990
991         if (start->ls_active != 0) {
992                 struct lfsck_component *next;
993
994                 if (start->ls_active == LFSCK_TYPES_ALL)
995                         start->ls_active = LFSCK_TYPES_SUPPORTED;
996
997                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
998                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
999                         GOTO(out, rc = -ENOTSUPP);
1000                 }
1001
1002                 cfs_list_for_each_entry_safe(com, next,
1003                                              &lfsck->li_list_scan, lc_link) {
1004                         if (!(com->lc_type & start->ls_active)) {
1005                                 rc = com->lc_ops->lfsck_post(env, com, 0,
1006                                                              false);
1007                                 if (rc != 0)
1008                                         GOTO(out, rc);
1009                         }
1010                 }
1011
1012                 while (start->ls_active != 0) {
1013                         if (type & start->ls_active) {
1014                                 com = __lfsck_component_find(lfsck, type,
1015                                                         &lfsck->li_list_idle);
1016                                 if (com != NULL) {
1017                                         /* The component status will be updated
1018                                          * when its prep() is called later by
1019                                          * the LFSCK main engine. */
1020                                         cfs_list_del_init(&com->lc_link);
1021                                         cfs_list_add_tail(&com->lc_link,
1022                                                           &lfsck->li_list_scan);
1023                                 }
1024                                 start->ls_active &= ~type;
1025                         }
1026                         type <<= 1;
1027                 }
1028         }
1029
1030         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1031                 start->ls_active |= com->lc_type;
1032                 if (flags & DOIF_RESET) {
1033                         rc = com->lc_ops->lfsck_reset(env, com, false);
1034                         if (rc != 0)
1035                                 GOTO(out, rc);
1036                 }
1037         }
1038
1039 trigger:
1040         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1041         if (bk->lb_param & LPF_DRYRUN) {
1042                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1043                 valid |= DOIV_DRYRUN;
1044                 flags |= DOIF_DRYRUN;
1045         }
1046
1047         if (bk->lb_param & LPF_FAILOUT) {
1048                 valid |= DOIV_ERROR_HANDLE;
1049                 flags |= DOIF_FAILOUT;
1050         }
1051
1052         if (!cfs_list_empty(&lfsck->li_list_scan))
1053                 flags |= DOIF_OUTUSED;
1054
1055         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1056         thread_set_flags(thread, 0);
1057         lta = lfsck_thread_args_init(lfsck, NULL);
1058         if (IS_ERR(lta))
1059                 GOTO(out, rc = PTR_ERR(lta));
1060
1061         rc = PTR_ERR(kthread_run(lfsck_master_engine, lta, "lfsck"));
1062         if (IS_ERR_VALUE(rc)) {
1063                 CERROR("%s: cannot start LFSCK thread: rc = %ld\n",
1064                        lfsck_lfsck2name(lfsck), rc);
1065                 lfsck_thread_args_fini(lta);
1066         } else {
1067                 rc = 0;
1068                 l_wait_event(thread->t_ctl_waitq,
1069                              thread_is_running(thread) ||
1070                              thread_is_stopped(thread),
1071                              &lwi);
1072         }
1073
1074         GOTO(out, rc);
1075
1076 out:
1077         mutex_unlock(&lfsck->li_mutex);
1078 put:
1079         lfsck_instance_put(env, lfsck);
1080         return (rc < 0 ? rc : 0);
1081 }
1082 EXPORT_SYMBOL(lfsck_start);
1083
1084 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1085 {
1086         struct lfsck_instance   *lfsck;
1087         struct ptlrpc_thread    *thread;
1088         struct l_wait_info       lwi    = { 0 };
1089         ENTRY;
1090
1091         lfsck = lfsck_instance_find(key, true, false);
1092         if (unlikely(lfsck == NULL))
1093                 RETURN(-ENODEV);
1094
1095         thread = &lfsck->li_thread;
1096         mutex_lock(&lfsck->li_mutex);
1097         spin_lock(&lfsck->li_lock);
1098         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1099                 spin_unlock(&lfsck->li_lock);
1100                 mutex_unlock(&lfsck->li_mutex);
1101                 lfsck_instance_put(env, lfsck);
1102                 RETURN(-EALREADY);
1103         }
1104
1105         if (pause)
1106                 lfsck->li_paused = 1;
1107         thread_set_flags(thread, SVC_STOPPING);
1108         spin_unlock(&lfsck->li_lock);
1109
1110         wake_up_all(&thread->t_ctl_waitq);
1111         l_wait_event(thread->t_ctl_waitq,
1112                      thread_is_stopped(thread),
1113                      &lwi);
1114         mutex_unlock(&lfsck->li_mutex);
1115         lfsck_instance_put(env, lfsck);
1116
1117         RETURN(0);
1118 }
1119 EXPORT_SYMBOL(lfsck_stop);
1120
1121 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1122                    struct dt_device *next, bool master)
1123 {
1124         struct lfsck_instance   *lfsck;
1125         struct dt_object        *root  = NULL;
1126         struct dt_object        *obj;
1127         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1128         int                      rc;
1129         ENTRY;
1130
1131         lfsck = lfsck_instance_find(key, false, false);
1132         if (unlikely(lfsck != NULL))
1133                 RETURN(-EEXIST);
1134
1135         OBD_ALLOC_PTR(lfsck);
1136         if (lfsck == NULL)
1137                 RETURN(-ENOMEM);
1138
1139         mutex_init(&lfsck->li_mutex);
1140         spin_lock_init(&lfsck->li_lock);
1141         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1142         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1143         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1144         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1145         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1146         atomic_set(&lfsck->li_ref, 1);
1147         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
1148         lfsck->li_next = next;
1149         lfsck->li_bottom = key;
1150
1151         fid->f_seq = FID_SEQ_LOCAL_NAME;
1152         fid->f_oid = 1;
1153         fid->f_ver = 0;
1154         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1155         if (rc != 0)
1156                 GOTO(out, rc);
1157
1158         rc = dt_root_get(env, key, fid);
1159         if (rc != 0)
1160                 GOTO(out, rc);
1161
1162         root = dt_locate(env, lfsck->li_bottom, fid);
1163         if (IS_ERR(root))
1164                 GOTO(out, rc = PTR_ERR(root));
1165
1166         if (unlikely(!dt_try_as_dir(env, root)))
1167                 GOTO(out, rc = -ENOTDIR);
1168
1169         lfsck->li_local_root_fid = *fid;
1170         if (master) {
1171                 lfsck->li_master = 1;
1172                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1173                         rc = dt_lookup(env, root,
1174                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1175                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1176                         if (rc != 0)
1177                                 GOTO(out, rc);
1178                 }
1179         }
1180
1181         fid->f_seq = FID_SEQ_LOCAL_FILE;
1182         fid->f_oid = OTABLE_IT_OID;
1183         fid->f_ver = 0;
1184         obj = dt_locate(env, lfsck->li_bottom, fid);
1185         if (IS_ERR(obj))
1186                 GOTO(out, rc = PTR_ERR(obj));
1187
1188         lfsck->li_obj_oit = obj;
1189         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1190         if (rc != 0) {
1191                 if (rc == -ENOTSUPP)
1192                         GOTO(add, rc = 0);
1193
1194                 GOTO(out, rc);
1195         }
1196
1197         rc = lfsck_bookmark_setup(env, lfsck);
1198         if (rc != 0)
1199                 GOTO(out, rc);
1200
1201         if (master) {
1202                 rc = lfsck_namespace_setup(env, lfsck);
1203                 if (rc < 0)
1204                         GOTO(out, rc);
1205         }
1206
1207         rc = lfsck_layout_setup(env, lfsck);
1208         if (rc < 0)
1209                 GOTO(out, rc);
1210
1211         /* XXX: more LFSCK components initialization to be added here. */
1212
1213 add:
1214         rc = lfsck_instance_add(lfsck);
1215 out:
1216         if (root != NULL && !IS_ERR(root))
1217                 lu_object_put(env, &root->do_lu);
1218         if (rc != 0)
1219                 lfsck_instance_cleanup(env, lfsck);
1220         return rc;
1221 }
1222 EXPORT_SYMBOL(lfsck_register);
1223
1224 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1225 {
1226         struct lfsck_instance *lfsck;
1227
1228         lfsck = lfsck_instance_find(key, false, true);
1229         if (lfsck != NULL)
1230                 lfsck_instance_put(env, lfsck);
1231 }
1232 EXPORT_SYMBOL(lfsck_degister);
1233
1234 static int __init lfsck_init(void)
1235 {
1236         int rc;
1237
1238         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1239         rc = lu_context_key_register(&lfsck_thread_key);
1240         return rc;
1241 }
1242
1243 static void __exit lfsck_exit(void)
1244 {
1245         LASSERT(cfs_list_empty(&lfsck_instance_list));
1246
1247         lu_context_key_degister(&lfsck_thread_key);
1248 }
1249
1250 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1251 MODULE_DESCRIPTION("LFSCK");
1252 MODULE_LICENSE("GPL");
1253
1254 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);