Whamcloud - gitweb
a4c963cbb8016415af8bf46a87c3cef1e533d3b3
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         OBD_FREE_PTR(info);
55 }
56
57 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
58 LU_KEY_INIT_GENERIC(lfsck);
59
60 static CFS_LIST_HEAD(lfsck_instance_list);
61 static DEFINE_SPINLOCK(lfsck_instance_lock);
62
63 const char *lfsck_status_names[] = {
64         "init",
65         "scanning-phase1",
66         "scanning-phase2",
67         "completed",
68         "failed",
69         "stopped",
70         "paused",
71         "crashed",
72         NULL
73 };
74
75 const char *lfsck_flags_names[] = {
76         "scanned-once",
77         "inconsistent",
78         "upgrade",
79         NULL
80 };
81
82 const char *lfsck_param_names[] = {
83         "failout",
84         "dryrun",
85         NULL
86 };
87
88 static inline void lfsck_component_get(struct lfsck_component *com)
89 {
90         atomic_inc(&com->lc_ref);
91 }
92
93 static inline void lfsck_component_put(const struct lu_env *env,
94                                        struct lfsck_component *com)
95 {
96         if (atomic_dec_and_test(&com->lc_ref)) {
97                 if (com->lc_obj != NULL)
98                         lu_object_put_nocache(env, &com->lc_obj->do_lu);
99                 if (com->lc_file_ram != NULL)
100                         OBD_FREE(com->lc_file_ram, com->lc_file_size);
101                 if (com->lc_file_disk != NULL)
102                         OBD_FREE(com->lc_file_disk, com->lc_file_size);
103                 OBD_FREE_PTR(com);
104         }
105 }
106
107 static inline struct lfsck_component *
108 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
109 {
110         struct lfsck_component *com;
111
112         cfs_list_for_each_entry(com, list, lc_link) {
113                 if (com->lc_type == type)
114                         return com;
115         }
116         return NULL;
117 }
118
119 static struct lfsck_component *
120 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
121 {
122         struct lfsck_component *com;
123
124         spin_lock(&lfsck->li_lock);
125         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
126         if (com != NULL)
127                 goto unlock;
128
129         com = __lfsck_component_find(lfsck, type,
130                                      &lfsck->li_list_double_scan);
131         if (com != NULL)
132                 goto unlock;
133
134         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
135
136 unlock:
137         if (com != NULL)
138                 lfsck_component_get(com);
139         spin_unlock(&lfsck->li_lock);
140         return com;
141 }
142
143 void lfsck_component_cleanup(const struct lu_env *env,
144                              struct lfsck_component *com)
145 {
146         if (!cfs_list_empty(&com->lc_link))
147                 cfs_list_del_init(&com->lc_link);
148         if (!cfs_list_empty(&com->lc_link_dir))
149                 cfs_list_del_init(&com->lc_link_dir);
150
151         lfsck_component_put(env, com);
152 }
153
154 static void lfsck_instance_cleanup(const struct lu_env *env,
155                                    struct lfsck_instance *lfsck)
156 {
157         struct ptlrpc_thread    *thread = &lfsck->li_thread;
158         struct lfsck_component  *com;
159         ENTRY;
160
161         LASSERT(list_empty(&lfsck->li_link));
162         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
163
164         if (lfsck->li_obj_oit != NULL) {
165                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
166                 lfsck->li_obj_oit = NULL;
167         }
168
169         LASSERT(lfsck->li_obj_dir == NULL);
170
171         while (!cfs_list_empty(&lfsck->li_list_scan)) {
172                 com = cfs_list_entry(lfsck->li_list_scan.next,
173                                      struct lfsck_component,
174                                      lc_link);
175                 lfsck_component_cleanup(env, com);
176         }
177
178         LASSERT(cfs_list_empty(&lfsck->li_list_dir));
179
180         while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
181                 com = cfs_list_entry(lfsck->li_list_double_scan.next,
182                                      struct lfsck_component,
183                                      lc_link);
184                 lfsck_component_cleanup(env, com);
185         }
186
187         while (!cfs_list_empty(&lfsck->li_list_idle)) {
188                 com = cfs_list_entry(lfsck->li_list_idle.next,
189                                      struct lfsck_component,
190                                      lc_link);
191                 lfsck_component_cleanup(env, com);
192         }
193
194         if (lfsck->li_bookmark_obj != NULL) {
195                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
196                 lfsck->li_bookmark_obj = NULL;
197         }
198
199         if (lfsck->li_los != NULL) {
200                 local_oid_storage_fini(env, lfsck->li_los);
201                 lfsck->li_los = NULL;
202         }
203
204         OBD_FREE_PTR(lfsck);
205 }
206
207 static inline void lfsck_instance_get(struct lfsck_instance *lfsck)
208 {
209         atomic_inc(&lfsck->li_ref);
210 }
211
212 static inline void lfsck_instance_put(const struct lu_env *env,
213                                       struct lfsck_instance *lfsck)
214 {
215         if (atomic_dec_and_test(&lfsck->li_ref))
216                 lfsck_instance_cleanup(env, lfsck);
217 }
218
219 static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
220                                                          bool ref, bool unlink)
221 {
222         struct lfsck_instance *lfsck;
223
224         spin_lock(&lfsck_instance_lock);
225         cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
226                 if (lfsck->li_bottom == key) {
227                         if (ref)
228                                 lfsck_instance_get(lfsck);
229                         if (unlink)
230                                 list_del_init(&lfsck->li_link);
231                         spin_unlock(&lfsck_instance_lock);
232                         return lfsck;
233                 }
234         }
235         spin_unlock(&lfsck_instance_lock);
236         return NULL;
237 }
238
239 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
240 {
241         struct lfsck_instance *tmp;
242
243         spin_lock(&lfsck_instance_lock);
244         cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
245                 if (lfsck->li_bottom == tmp->li_bottom) {
246                         spin_unlock(&lfsck_instance_lock);
247                         return -EEXIST;
248                 }
249         }
250
251         cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
252         spin_unlock(&lfsck_instance_lock);
253         return 0;
254 }
255
256 int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
257                     const char *prefix)
258 {
259         int save = *len;
260         int flag;
261         int rc;
262         int i;
263
264         rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
265         if (rc <= 0)
266                 return -ENOSPC;
267
268         *buf += rc;
269         *len -= rc;
270         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
271                 if (flag & bits) {
272                         bits &= ~flag;
273                         rc = snprintf(*buf, *len, "%s%c", names[i],
274                                       bits != 0 ? ',' : '\n');
275                         if (rc <= 0)
276                                 return -ENOSPC;
277
278                         *buf += rc;
279                         *len -= rc;
280                 }
281         }
282         return save - *len;
283 }
284
285 int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
286 {
287         int rc;
288
289         if (time != 0)
290                 rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
291                               cfs_time_current_sec() - time);
292         else
293                 rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
294         if (rc <= 0)
295                 return -ENOSPC;
296
297         *buf += rc;
298         *len -= rc;
299         return rc;
300 }
301
302 int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
303                    const char *prefix)
304 {
305         int rc;
306
307         if (fid_is_zero(&pos->lp_dir_parent)) {
308                 if (pos->lp_oit_cookie == 0)
309                         rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
310                                       prefix);
311                 else
312                         rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
313                                       prefix, pos->lp_oit_cookie);
314         } else {
315                 rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
316                               prefix, pos->lp_oit_cookie,
317                               PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
318         }
319         if (rc <= 0)
320                 return -ENOSPC;
321
322         *buf += rc;
323         *len -= rc;
324         return rc;
325 }
326
327 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
328                     struct lfsck_position *pos, bool init)
329 {
330         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
331
332         if (unlikely(lfsck->li_di_oit == NULL)) {
333                 memset(pos, 0, sizeof(*pos));
334                 return;
335         }
336
337         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
338         if (!lfsck->li_current_oit_processed && !init)
339                 pos->lp_oit_cookie--;
340
341         LASSERT(pos->lp_oit_cookie > 0);
342
343         if (lfsck->li_di_dir != NULL) {
344                 struct dt_object *dto = lfsck->li_obj_dir;
345
346                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
347                                                         lfsck->li_di_dir);
348
349                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
350                         fid_zero(&pos->lp_dir_parent);
351                         pos->lp_dir_cookie = 0;
352                 } else {
353                         pos->lp_dir_parent = *lu_object_fid(&dto->do_lu);
354                 }
355         } else {
356                 fid_zero(&pos->lp_dir_parent);
357                 pos->lp_dir_cookie = 0;
358         }
359 }
360
361 static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
362 {
363         lfsck->li_bookmark_ram.lb_speed_limit = limit;
364         if (limit != LFSCK_SPEED_NO_LIMIT) {
365                 if (limit > HZ) {
366                         lfsck->li_sleep_rate = limit / HZ;
367                         lfsck->li_sleep_jif = 1;
368                 } else {
369                         lfsck->li_sleep_rate = 1;
370                         lfsck->li_sleep_jif = HZ / limit;
371                 }
372         } else {
373                 lfsck->li_sleep_jif = 0;
374                 lfsck->li_sleep_rate = 0;
375         }
376 }
377
378 void lfsck_control_speed(struct lfsck_instance *lfsck)
379 {
380         struct ptlrpc_thread *thread = &lfsck->li_thread;
381         struct l_wait_info    lwi;
382
383         if (lfsck->li_sleep_jif > 0 &&
384             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
385                 spin_lock(&lfsck->li_lock);
386                 if (likely(lfsck->li_sleep_jif > 0 &&
387                            lfsck->li_new_scanned >= lfsck->li_sleep_rate)) {
388                         lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
389                                                LWI_ON_SIGNAL_NOOP, NULL);
390                         spin_unlock(&lfsck->li_lock);
391
392                         l_wait_event(thread->t_ctl_waitq,
393                                      !thread_is_running(thread),
394                                      &lwi);
395                         lfsck->li_new_scanned = 0;
396                 } else {
397                         spin_unlock(&lfsck->li_lock);
398                 }
399         }
400 }
401
402 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
403                             struct lu_fid *fid)
404 {
405         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
406                      !dt_try_as_dir(env, obj)))
407                 return -ENOTDIR;
408
409         return dt_lookup(env, obj, (struct dt_rec *)fid,
410                          (const struct dt_key *)"..", BYPASS_CAPA);
411 }
412
413 static int lfsck_needs_scan_dir(const struct lu_env *env,
414                                 struct lfsck_instance *lfsck,
415                                 struct dt_object *obj)
416 {
417         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
418         int            depth = 0;
419         int            rc;
420
421         if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
422             cfs_list_empty(&lfsck->li_list_dir))
423                RETURN(0);
424
425         while (1) {
426                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
427                  *      which is the agent directory to manage the objects
428                  *      which name entries reside on remote MDTs. Related
429                  *      consistency verification will be processed in LFSCK
430                  *      phase III. */
431                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
432                         if (depth > 0)
433                                 lfsck_object_put(env, obj);
434                         return 1;
435                 }
436
437                 /* .lustre doesn't contain "real" user objects, no need lfsck */
438                 if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
439                         if (depth > 0)
440                                 lfsck_object_put(env, obj);
441                         return 0;
442                 }
443
444                 dt_read_lock(env, obj, MOR_TGT_CHILD);
445                 if (unlikely(lfsck_is_dead_obj(obj))) {
446                         dt_read_unlock(env, obj);
447                         if (depth > 0)
448                                 lfsck_object_put(env, obj);
449                         return 0;
450                 }
451
452                 rc = dt_xattr_get(env, obj,
453                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
454                                   BYPASS_CAPA);
455                 dt_read_unlock(env, obj);
456                 if (rc >= 0) {
457                         if (depth > 0)
458                                 lfsck_object_put(env, obj);
459                         return 1;
460                 }
461
462                 if (rc < 0 && rc != -ENODATA) {
463                         if (depth > 0)
464                                 lfsck_object_put(env, obj);
465                         return rc;
466                 }
467
468                 rc = lfsck_parent_fid(env, obj, fid);
469                 if (depth > 0)
470                         lfsck_object_put(env, obj);
471                 if (rc != 0)
472                         return rc;
473
474                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
475                         return 0;
476
477                 obj = lfsck_object_find(env, lfsck, fid);
478                 if (obj == NULL)
479                         return 0;
480                 else if (IS_ERR(obj))
481                         return PTR_ERR(obj);
482
483                 if (!dt_object_exists(obj)) {
484                         lfsck_object_put(env, obj);
485                         return 0;
486                 }
487
488                 /* Currently, only client visible directory can be remote. */
489                 if (dt_object_remote(obj)) {
490                         lfsck_object_put(env, obj);
491                         return 1;
492                 }
493
494                 depth++;
495         }
496         return 0;
497 }
498
499 /* LFSCK wrap functions */
500
501 void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
502                 bool new_checked)
503 {
504         struct lfsck_component *com;
505
506         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
507                 com->lc_ops->lfsck_fail(env, com, new_checked);
508         }
509 }
510
511 int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
512 {
513         struct lfsck_component *com;
514         int                     rc;
515
516         if (likely(cfs_time_beforeq(cfs_time_current(),
517                                     lfsck->li_time_next_checkpoint)))
518                 return 0;
519
520         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
521         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
522                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
523                 if (rc != 0)
524                         return rc;;
525         }
526
527         lfsck->li_time_last_checkpoint = cfs_time_current();
528         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
529                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
530         return 0;
531 }
532
533 int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
534 {
535         struct dt_object       *obj     = NULL;
536         struct lfsck_component *com;
537         struct lfsck_component *next;
538         struct lfsck_position  *pos     = NULL;
539         const struct dt_it_ops *iops    =
540                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
541         struct dt_it           *di;
542         int                     rc;
543         ENTRY;
544
545         LASSERT(lfsck->li_obj_dir == NULL);
546         LASSERT(lfsck->li_di_dir == NULL);
547
548         lfsck->li_current_oit_processed = 0;
549         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
550                 com->lc_new_checked = 0;
551                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
552                         com->lc_journal = 0;
553
554                 rc = com->lc_ops->lfsck_prep(env, com);
555                 if (rc != 0)
556                         RETURN(rc);
557
558                 if ((pos == NULL) ||
559                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
560                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
561                         pos = &com->lc_pos_start;
562         }
563
564         /* Init otable-based iterator. */
565         if (pos == NULL) {
566                 rc = iops->load(env, lfsck->li_di_oit, 0);
567                 if (rc > 0) {
568                         lfsck->li_oit_over = 1;
569                         rc = 0;
570                 }
571
572                 GOTO(out, rc);
573         }
574
575         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
576         if (rc < 0)
577                 GOTO(out, rc);
578         else if (rc > 0)
579                 lfsck->li_oit_over = 1;
580
581         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
582                 GOTO(out, rc = 0);
583
584         /* Find the directory for namespace-based traverse. */
585         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
586         if (obj == NULL)
587                 GOTO(out, rc = 0);
588         else if (IS_ERR(obj))
589                 RETURN(PTR_ERR(obj));
590
591         /* XXX: Currently, skip remote object, the consistency for
592          *      remote object will be processed in LFSCK phase III. */
593         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
594             unlikely(!S_ISDIR(lfsck_object_type(obj))))
595                 GOTO(out, rc = 0);
596
597         if (unlikely(!dt_try_as_dir(env, obj)))
598                 GOTO(out, rc = -ENOTDIR);
599
600         /* Init the namespace-based directory traverse. */
601         iops = &obj->do_index_ops->dio_it;
602         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
603         if (IS_ERR(di))
604                 GOTO(out, rc = PTR_ERR(di));
605
606         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
607
608         rc = iops->load(env, di, pos->lp_dir_cookie);
609         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
610                 rc = iops->next(env, di);
611         else if (rc > 0)
612                 rc = 0;
613
614         if (rc != 0) {
615                 iops->put(env, di);
616                 iops->fini(env, di);
617                 GOTO(out, rc);
618         }
619
620         lfsck->li_obj_dir = lfsck_object_get(obj);
621         lfsck->li_cookie_dir = iops->store(env, di);
622         spin_lock(&lfsck->li_lock);
623         lfsck->li_di_dir = di;
624         spin_unlock(&lfsck->li_lock);
625
626         GOTO(out, rc = 0);
627
628 out:
629         if (obj != NULL)
630                 lfsck_object_put(env, obj);
631
632         if (rc < 0) {
633                 cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
634                                              lc_link)
635                         com->lc_ops->lfsck_post(env, com, rc, true);
636
637                 return rc;
638         }
639
640         rc = 0;
641         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
642         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
643                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
644                 if (rc != 0)
645                         break;
646         }
647
648         lfsck->li_time_last_checkpoint = cfs_time_current();
649         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
650                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
651         return rc;
652 }
653
654 int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
655                    struct dt_object *obj)
656 {
657         struct lfsck_component *com;
658         const struct dt_it_ops *iops;
659         struct dt_it           *di;
660         int                     rc;
661         ENTRY;
662
663         LASSERT(lfsck->li_obj_dir == NULL);
664
665         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
666                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
667                 if (rc != 0)
668                         RETURN(rc);
669         }
670
671         rc = lfsck_needs_scan_dir(env, lfsck, obj);
672         if (rc <= 0)
673                 GOTO(out, rc);
674
675         if (unlikely(!dt_try_as_dir(env, obj)))
676                 GOTO(out, rc = -ENOTDIR);
677
678         iops = &obj->do_index_ops->dio_it;
679         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
680         if (IS_ERR(di))
681                 GOTO(out, rc = PTR_ERR(di));
682
683         rc = iops->load(env, di, 0);
684         if (rc == 0)
685                 rc = iops->next(env, di);
686         else if (rc > 0)
687                 rc = 0;
688
689         if (rc != 0) {
690                 iops->put(env, di);
691                 iops->fini(env, di);
692                 GOTO(out, rc);
693         }
694
695         lfsck->li_obj_dir = lfsck_object_get(obj);
696         lfsck->li_cookie_dir = iops->store(env, di);
697         spin_lock(&lfsck->li_lock);
698         lfsck->li_di_dir = di;
699         spin_unlock(&lfsck->li_lock);
700
701         GOTO(out, rc = 0);
702
703 out:
704         if (rc < 0)
705                 lfsck_fail(env, lfsck, false);
706         return (rc > 0 ? 0 : rc);
707 }
708
709 int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
710                    struct dt_object *obj, struct lu_dirent *ent)
711 {
712         struct lfsck_component *com;
713         int                     rc;
714
715         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
716                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
717                 if (rc != 0)
718                         return rc;
719         }
720         return 0;
721 }
722
723 int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
724                int result)
725 {
726         struct lfsck_component *com;
727         struct lfsck_component *next;
728         int                     rc;
729
730         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
731         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
732                 rc = com->lc_ops->lfsck_post(env, com, result, false);
733                 if (rc != 0)
734                         return rc;
735         }
736
737         lfsck->li_time_last_checkpoint = cfs_time_current();
738         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
739                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
740         return result;
741 }
742
743 int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
744 {
745         struct lfsck_component *com;
746         struct lfsck_component *next;
747         int                     rc;
748
749         cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
750                                      lc_link) {
751                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
752                         com->lc_journal = 0;
753
754                 rc = com->lc_ops->lfsck_double_scan(env, com);
755                 if (rc != 0)
756                         return rc;
757         }
758         return 0;
759 }
760
761 /* external interfaces */
762
763 int lfsck_get_speed(struct dt_device *key, void *buf, int len)
764 {
765         struct lu_env           env;
766         struct lfsck_instance  *lfsck;
767         int                     rc;
768         ENTRY;
769
770         lfsck = lfsck_instance_find(key, true, false);
771         if (unlikely(lfsck == NULL))
772                 RETURN(-ENODEV);
773
774         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
775         if (rc != 0)
776                 GOTO(out, rc);
777
778         rc = snprintf(buf, len, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
779         lu_env_fini(&env);
780
781         GOTO(out, rc);
782
783 out:
784         lfsck_instance_put(&env, lfsck);
785         return rc;
786 }
787 EXPORT_SYMBOL(lfsck_get_speed);
788
789 int lfsck_set_speed(struct dt_device *key, int val)
790 {
791         struct lu_env           env;
792         struct lfsck_instance  *lfsck;
793         int                     rc;
794         ENTRY;
795
796         lfsck = lfsck_instance_find(key, true, false);
797         if (unlikely(lfsck == NULL))
798                 RETURN(-ENODEV);
799
800         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
801         if (rc != 0)
802                 GOTO(out, rc);
803
804         mutex_lock(&lfsck->li_mutex);
805         __lfsck_set_speed(lfsck, val);
806         rc = lfsck_bookmark_store(&env, lfsck);
807         mutex_unlock(&lfsck->li_mutex);
808         lu_env_fini(&env);
809
810         GOTO(out, rc);
811
812 out:
813         lfsck_instance_put(&env, lfsck);
814         return rc;
815 }
816 EXPORT_SYMBOL(lfsck_set_speed);
817
818 int lfsck_dump(struct dt_device *key, void *buf, int len, __u16 type)
819 {
820         struct lu_env           env;
821         struct lfsck_instance  *lfsck;
822         struct lfsck_component *com   = NULL;
823         int                     rc;
824         ENTRY;
825
826         lfsck = lfsck_instance_find(key, true, false);
827         if (unlikely(lfsck == NULL))
828                 RETURN(-ENODEV);
829
830         com = lfsck_component_find(lfsck, type);
831         if (com == NULL)
832                 GOTO(out, rc = -ENOTSUPP);
833
834         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
835         if (rc != 0)
836                 GOTO(out, rc);
837
838         rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
839         lu_env_fini(&env);
840
841         GOTO(out, rc);
842
843 out:
844         if (com != NULL)
845                 lfsck_component_put(&env, com);
846         lfsck_instance_put(&env, lfsck);
847         return rc;
848 }
849 EXPORT_SYMBOL(lfsck_dump);
850
851 int lfsck_start(const struct lu_env *env, struct dt_device *key,
852                 struct lfsck_start_param *lsp)
853 {
854         struct lfsck_start     *start  = lsp->lsp_start;
855         struct lfsck_instance  *lfsck;
856         struct lfsck_bookmark  *bk;
857         struct ptlrpc_thread   *thread;
858         struct lfsck_component *com;
859         struct l_wait_info      lwi    = { 0 };
860         bool                    dirty  = false;
861         long                    rc     = 0;
862         __u16                   valid  = 0;
863         __u16                   flags  = 0;
864         ENTRY;
865
866         lfsck = lfsck_instance_find(key, true, false);
867         if (unlikely(lfsck == NULL))
868                 RETURN(-ENODEV);
869
870         /* start == NULL means auto trigger paused LFSCK. */
871         if ((start == NULL) &&
872             (cfs_list_empty(&lfsck->li_list_scan) ||
873              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
874                 GOTO(put, rc = 0);
875
876         bk = &lfsck->li_bookmark_ram;
877         thread = &lfsck->li_thread;
878         mutex_lock(&lfsck->li_mutex);
879         spin_lock(&lfsck->li_lock);
880         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
881                 spin_unlock(&lfsck->li_lock);
882                 GOTO(out, rc = -EALREADY);
883         }
884
885         spin_unlock(&lfsck->li_lock);
886
887         lfsck->li_namespace = lsp->lsp_namespace;
888         lfsck->li_paused = 0;
889         lfsck->li_oit_over = 0;
890         lfsck->li_drop_dryrun = 0;
891         lfsck->li_new_scanned = 0;
892
893         /* For auto trigger. */
894         if (start == NULL)
895                 goto trigger;
896
897         start->ls_version = bk->lb_version;
898         if (start->ls_valid & LSV_SPEED_LIMIT) {
899                 __lfsck_set_speed(lfsck, start->ls_speed_limit);
900                 dirty = true;
901         }
902
903         if (start->ls_valid & LSV_ERROR_HANDLE) {
904                 valid |= DOIV_ERROR_HANDLE;
905                 if (start->ls_flags & LPF_FAILOUT)
906                         flags |= DOIF_FAILOUT;
907
908                 if ((start->ls_flags & LPF_FAILOUT) &&
909                     !(bk->lb_param & LPF_FAILOUT)) {
910                         bk->lb_param |= LPF_FAILOUT;
911                         dirty = true;
912                 } else if (!(start->ls_flags & LPF_FAILOUT) &&
913                            (bk->lb_param & LPF_FAILOUT)) {
914                         bk->lb_param &= ~LPF_FAILOUT;
915                         dirty = true;
916                 }
917         }
918
919         if (start->ls_valid & LSV_DRYRUN) {
920                 valid |= DOIV_DRYRUN;
921                 if (start->ls_flags & LPF_DRYRUN)
922                         flags |= DOIF_DRYRUN;
923
924                 if ((start->ls_flags & LPF_DRYRUN) &&
925                     !(bk->lb_param & LPF_DRYRUN)) {
926                         bk->lb_param |= LPF_DRYRUN;
927                         dirty = true;
928                 } else if (!(start->ls_flags & LPF_DRYRUN) &&
929                            (bk->lb_param & LPF_DRYRUN)) {
930                         bk->lb_param &= ~LPF_DRYRUN;
931                         lfsck->li_drop_dryrun = 1;
932                         dirty = true;
933                 }
934         }
935
936         if (dirty) {
937                 rc = lfsck_bookmark_store(env, lfsck);
938                 if (rc != 0)
939                         GOTO(out, rc);
940         }
941
942         if (start->ls_flags & LPF_RESET)
943                 flags |= DOIF_RESET;
944
945         if (start->ls_active != 0) {
946                 struct lfsck_component *next;
947                 __u16 type = 1;
948
949                 if (start->ls_active == LFSCK_TYPES_ALL)
950                         start->ls_active = LFSCK_TYPES_SUPPORTED;
951
952                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
953                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
954                         GOTO(out, rc = -ENOTSUPP);
955                 }
956
957                 cfs_list_for_each_entry_safe(com, next,
958                                              &lfsck->li_list_scan, lc_link) {
959                         if (!(com->lc_type & start->ls_active)) {
960                                 rc = com->lc_ops->lfsck_post(env, com, 0,
961                                                              false);
962                                 if (rc != 0)
963                                         GOTO(out, rc);
964                         }
965                 }
966
967                 while (start->ls_active != 0) {
968                         if (type & start->ls_active) {
969                                 com = __lfsck_component_find(lfsck, type,
970                                                         &lfsck->li_list_idle);
971                                 if (com != NULL) {
972                                         /* The component status will be updated
973                                          * when its prep() is called later by
974                                          * the LFSCK main engine. */
975                                         cfs_list_del_init(&com->lc_link);
976                                         cfs_list_add_tail(&com->lc_link,
977                                                           &lfsck->li_list_scan);
978                                 }
979                                 start->ls_active &= ~type;
980                         }
981                         type <<= 1;
982                 }
983         }
984
985         cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
986                 start->ls_active |= com->lc_type;
987                 if (flags & DOIF_RESET) {
988                         rc = com->lc_ops->lfsck_reset(env, com, false);
989                         if (rc != 0)
990                                 GOTO(out, rc);
991                 }
992         }
993
994 trigger:
995         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
996         if (bk->lb_param & LPF_DRYRUN) {
997                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
998                 valid |= DOIV_DRYRUN;
999                 flags |= DOIF_DRYRUN;
1000         }
1001
1002         if (bk->lb_param & LPF_FAILOUT) {
1003                 valid |= DOIV_ERROR_HANDLE;
1004                 flags |= DOIF_FAILOUT;
1005         }
1006
1007         if (!cfs_list_empty(&lfsck->li_list_scan))
1008                 flags |= DOIF_OUTUSED;
1009
1010         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1011         thread_set_flags(thread, 0);
1012         rc = PTR_ERR(kthread_run(lfsck_master_engine, lfsck, "lfsck"));
1013         if (IS_ERR_VALUE(rc)) {
1014                 CERROR("%s: cannot start LFSCK thread, rc = %ld\n",
1015                        lfsck_lfsck2name(lfsck), rc);
1016         } else {
1017                 rc = 0;
1018                 l_wait_event(thread->t_ctl_waitq,
1019                              thread_is_running(thread) ||
1020                              thread_is_stopped(thread),
1021                              &lwi);
1022         }
1023
1024         GOTO(out, rc);
1025
1026 out:
1027         mutex_unlock(&lfsck->li_mutex);
1028 put:
1029         lfsck_instance_put(env, lfsck);
1030         return (rc < 0 ? rc : 0);
1031 }
1032 EXPORT_SYMBOL(lfsck_start);
1033
1034 int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
1035 {
1036         struct lfsck_instance   *lfsck;
1037         struct ptlrpc_thread    *thread;
1038         struct l_wait_info       lwi    = { 0 };
1039         ENTRY;
1040
1041         lfsck = lfsck_instance_find(key, true, false);
1042         if (unlikely(lfsck == NULL))
1043                 RETURN(-ENODEV);
1044
1045         thread = &lfsck->li_thread;
1046         mutex_lock(&lfsck->li_mutex);
1047         spin_lock(&lfsck->li_lock);
1048         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1049                 spin_unlock(&lfsck->li_lock);
1050                 mutex_unlock(&lfsck->li_mutex);
1051                 lfsck_instance_put(env, lfsck);
1052                 RETURN(-EALREADY);
1053         }
1054
1055         if (pause)
1056                 lfsck->li_paused = 1;
1057         thread_set_flags(thread, SVC_STOPPING);
1058         spin_unlock(&lfsck->li_lock);
1059
1060         wake_up_all(&thread->t_ctl_waitq);
1061         l_wait_event(thread->t_ctl_waitq,
1062                      thread_is_stopped(thread),
1063                      &lwi);
1064         mutex_unlock(&lfsck->li_mutex);
1065         lfsck_instance_put(env, lfsck);
1066
1067         RETURN(0);
1068 }
1069 EXPORT_SYMBOL(lfsck_stop);
1070
1071 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1072                    struct dt_device *next, bool master)
1073 {
1074         struct lfsck_instance   *lfsck;
1075         struct dt_object        *root  = NULL;
1076         struct dt_object        *obj;
1077         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1078         int                      rc;
1079         ENTRY;
1080
1081         lfsck = lfsck_instance_find(key, false, false);
1082         if (unlikely(lfsck != NULL))
1083                 RETURN(-EEXIST);
1084
1085         OBD_ALLOC_PTR(lfsck);
1086         if (lfsck == NULL)
1087                 RETURN(-ENOMEM);
1088
1089         mutex_init(&lfsck->li_mutex);
1090         spin_lock_init(&lfsck->li_lock);
1091         CFS_INIT_LIST_HEAD(&lfsck->li_link);
1092         CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
1093         CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
1094         CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
1095         CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
1096         atomic_set(&lfsck->li_ref, 1);
1097         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
1098         lfsck->li_next = next;
1099         lfsck->li_bottom = key;
1100
1101         fid->f_seq = FID_SEQ_LOCAL_NAME;
1102         fid->f_oid = 1;
1103         fid->f_ver = 0;
1104         rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
1105         if (rc != 0)
1106                 GOTO(out, rc);
1107
1108         rc = dt_root_get(env, key, fid);
1109         if (rc != 0)
1110                 GOTO(out, rc);
1111
1112         root = dt_locate(env, lfsck->li_bottom, fid);
1113         if (IS_ERR(root))
1114                 GOTO(out, rc = PTR_ERR(root));
1115
1116         lfsck->li_local_root_fid = *fid;
1117         dt_try_as_dir(env, root);
1118         if (master) {
1119                 lfsck->li_master = 1;
1120                 if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
1121                         rc = dt_lookup(env, root,
1122                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
1123                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
1124                         if (rc != 0)
1125                                 GOTO(out, rc);
1126                 }
1127         }
1128
1129         fid->f_seq = FID_SEQ_LOCAL_FILE;
1130         fid->f_oid = OTABLE_IT_OID;
1131         fid->f_ver = 0;
1132         obj = dt_locate(env, lfsck->li_bottom, fid);
1133         if (IS_ERR(obj))
1134                 GOTO(out, rc = PTR_ERR(obj));
1135
1136         lfsck->li_obj_oit = obj;
1137         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
1138         if (rc != 0) {
1139                 if (rc == -ENOTSUPP)
1140                         GOTO(add, rc = 0);
1141
1142                 GOTO(out, rc);
1143         }
1144
1145         rc = lfsck_bookmark_setup(env, lfsck);
1146         if (rc != 0)
1147                 GOTO(out, rc);
1148
1149         if (master) {
1150                 rc = lfsck_namespace_setup(env, lfsck);
1151                 if (rc < 0)
1152                         GOTO(out, rc);
1153         }
1154
1155         /* XXX: more LFSCK components initialization to be added here. */
1156
1157 add:
1158         rc = lfsck_instance_add(lfsck);
1159 out:
1160         if (root != NULL && !IS_ERR(root))
1161                 lu_object_put(env, &root->do_lu);
1162         if (rc != 0)
1163                 lfsck_instance_cleanup(env, lfsck);
1164         return rc;
1165 }
1166 EXPORT_SYMBOL(lfsck_register);
1167
1168 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
1169 {
1170         struct lfsck_instance *lfsck;
1171
1172         lfsck = lfsck_instance_find(key, false, true);
1173         if (lfsck != NULL)
1174                 lfsck_instance_put(env, lfsck);
1175 }
1176 EXPORT_SYMBOL(lfsck_degister);
1177
1178 static int __init lfsck_init(void)
1179 {
1180         int rc;
1181
1182         lfsck_key_init_generic(&lfsck_thread_key, NULL);
1183         rc = lu_context_key_register(&lfsck_thread_key);
1184         return rc;
1185 }
1186
1187 static void __exit lfsck_exit(void)
1188 {
1189         LASSERT(cfs_list_empty(&lfsck_instance_list));
1190
1191         lu_context_key_degister(&lfsck_thread_key);
1192 }
1193
1194 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
1195 MODULE_DESCRIPTION("LFSCK");
1196 MODULE_LICENSE("GPL");
1197
1198 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);