Whamcloud - gitweb
LU-4788 lfsck: verify .lustre/lost+found at the LFSCK start
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
39
40 #include "lfsck_internal.h"
41
42 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
43 {
44         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
45         *cookie = le64_to_cpu(ent->lde_hash);
46         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
47         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
48         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
49
50         /* Make sure the name is terminated with '0'.
51          * The data (type) after ent::lde_name maybe
52          * broken, but we do not care. */
53         ent->lde_name[ent->lde_namelen] = 0;
54 }
55
56 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
57 {
58         const struct dt_it_ops  *iops;
59         struct dt_it            *di;
60
61         spin_lock(&lfsck->li_lock);
62         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
63         di = lfsck->li_di_oit;
64         lfsck->li_di_oit = NULL;
65         spin_unlock(&lfsck->li_lock);
66         iops->put(env, di);
67 }
68
69 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
70 {
71         const struct dt_it_ops  *iops;
72         struct dt_it            *di;
73
74         spin_lock(&lfsck->li_lock);
75         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
76         di = lfsck->li_di_dir;
77         lfsck->li_di_dir = NULL;
78         lfsck->li_cookie_dir = 0;
79         spin_unlock(&lfsck->li_lock);
80         iops->put(env, di);
81 }
82
83 static void lfsck_close_dir(const struct lu_env *env,
84                             struct lfsck_instance *lfsck)
85 {
86         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
87         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
88         struct dt_it            *dir_di   = lfsck->li_di_dir;
89
90         lfsck_di_dir_put(env, lfsck);
91         dir_iops->fini(env, dir_di);
92         lfsck->li_obj_dir = NULL;
93         lfsck_object_put(env, dir_obj);
94 }
95
96 static int lfsck_update_lma(const struct lu_env *env,
97                             struct lfsck_instance *lfsck, struct dt_object *obj)
98 {
99         struct lfsck_thread_info        *info   = lfsck_env_info(env);
100         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
101         struct dt_device                *dt     = lfsck->li_bottom;
102         struct lustre_mdt_attrs         *lma    = &info->lti_lma;
103         struct lu_buf                   *buf;
104         struct thandle                  *th;
105         int                              fl;
106         int                              rc;
107         ENTRY;
108
109         if (bk->lb_param & LPF_DRYRUN)
110                 RETURN(0);
111
112         buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
113         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
114         if (rc < 0) {
115                 if (rc != -ENODATA)
116                         RETURN(rc);
117
118                 fl = LU_XATTR_CREATE;
119                 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
120         } else {
121                 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
122                         RETURN(-EINVAL);
123
124                 fl = LU_XATTR_REPLACE;
125                 lustre_lma_swab(lma);
126                 lustre_lma_init(lma, lfsck_dto2fid(obj),
127                                 lma->lma_compat | LMAC_FID_ON_OST,
128                                 lma->lma_incompat);
129         }
130         lustre_lma_swab(lma);
131
132         th = dt_trans_create(env, dt);
133         if (IS_ERR(th))
134                 RETURN(PTR_ERR(th));
135
136         buf = lfsck_buf_get(env, lma, sizeof(*lma));
137         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
138         if (rc != 0)
139                 GOTO(stop, rc);
140
141         rc = dt_trans_start(env, dt, th);
142         if (rc != 0)
143                 GOTO(stop, rc);
144
145         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
146
147         GOTO(stop, rc);
148
149 stop:
150         dt_trans_stop(env, dt, th);
151         return rc;
152 }
153
154 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
155                             struct lu_fid *fid)
156 {
157         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
158                      !dt_try_as_dir(env, obj)))
159                 return -ENOTDIR;
160
161         return dt_lookup(env, obj, (struct dt_rec *)fid,
162                          (const struct dt_key *)"..", BYPASS_CAPA);
163 }
164
165 static int lfsck_needs_scan_dir(const struct lu_env *env,
166                                 struct lfsck_instance *lfsck,
167                                 struct dt_object *obj)
168 {
169         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
170         int            depth = 0;
171         int            rc;
172
173         if (list_empty(&lfsck->li_list_dir) || !S_ISDIR(lfsck_object_type(obj)))
174                 RETURN(0);
175
176         while (1) {
177                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
178                  *      which is the agent directory to manage the objects
179                  *      which name entries reside on remote MDTs. Related
180                  *      consistency verification will be processed in LFSCK
181                  *      phase III. */
182                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
183                         if (depth > 0)
184                                 lfsck_object_put(env, obj);
185                         return 1;
186                 }
187
188                 /* No need to check .lustre and its children. */
189                 if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj)))) {
190                         if (depth > 0)
191                                 lfsck_object_put(env, obj);
192                         return 0;
193                 }
194
195                 dt_read_lock(env, obj, MOR_TGT_CHILD);
196                 if (unlikely(lfsck_is_dead_obj(obj))) {
197                         dt_read_unlock(env, obj);
198                         if (depth > 0)
199                                 lfsck_object_put(env, obj);
200                         return 0;
201                 }
202
203                 rc = dt_xattr_get(env, obj,
204                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
205                                   BYPASS_CAPA);
206                 dt_read_unlock(env, obj);
207                 if (rc >= 0) {
208                         if (depth > 0)
209                                 lfsck_object_put(env, obj);
210                         return 1;
211                 }
212
213                 if (rc < 0 && rc != -ENODATA) {
214                         if (depth > 0)
215                                 lfsck_object_put(env, obj);
216                         return rc;
217                 }
218
219                 rc = lfsck_parent_fid(env, obj, fid);
220                 if (depth > 0)
221                         lfsck_object_put(env, obj);
222                 if (rc != 0)
223                         return rc;
224
225                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
226                         return 0;
227
228                 obj = lfsck_object_find(env, lfsck, fid);
229                 if (IS_ERR(obj))
230                         return PTR_ERR(obj);
231
232                 if (!dt_object_exists(obj)) {
233                         lfsck_object_put(env, obj);
234                         return 0;
235                 }
236
237                 if (dt_object_remote(obj)) {
238                         /* .lustre/lost+found/MDTxxx can be remote directory. */
239                         if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj))))
240                                 rc = 0;
241                         else
242                                 /* Other remote directory should be client
243                                  * visible and need to be checked. */
244                                 rc = 1;
245                         lfsck_object_put(env, obj);
246                         return rc;
247                 }
248
249                 depth++;
250         }
251         return 0;
252 }
253
254 /* LFSCK wrap functions */
255
256 static void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
257                        bool new_checked)
258 {
259         struct lfsck_component *com;
260
261         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
262                 com->lc_ops->lfsck_fail(env, com, new_checked);
263         }
264 }
265
266 static int lfsck_checkpoint(const struct lu_env *env,
267                             struct lfsck_instance *lfsck)
268 {
269         struct lfsck_component *com;
270         int                     rc  = 0;
271         int                     rc1 = 0;
272
273         if (likely(cfs_time_beforeq(cfs_time_current(),
274                                     lfsck->li_time_next_checkpoint)))
275                 return 0;
276
277         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
278         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
279                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
280                 if (rc != 0)
281                         rc1 = rc;
282         }
283
284         lfsck->li_time_last_checkpoint = cfs_time_current();
285         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
286                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
287         return rc1 != 0 ? rc1 : rc;
288 }
289
290 static int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
291                       struct lfsck_start_param *lsp)
292 {
293         struct dt_object       *obj     = NULL;
294         struct lfsck_component *com;
295         struct lfsck_component *next;
296         struct lfsck_position  *pos     = NULL;
297         const struct dt_it_ops *iops    =
298                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
299         struct dt_it           *di;
300         int                     rc;
301         ENTRY;
302
303         LASSERT(lfsck->li_obj_dir == NULL);
304         LASSERT(lfsck->li_di_dir == NULL);
305
306         lfsck->li_current_oit_processed = 0;
307         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
308                 com->lc_new_checked = 0;
309                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
310                         com->lc_journal = 0;
311
312                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
313                 if (rc != 0)
314                         GOTO(out, rc);
315
316                 if ((pos == NULL) ||
317                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
318                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
319                         pos = &com->lc_pos_start;
320         }
321
322         /* Init otable-based iterator. */
323         if (pos == NULL) {
324                 rc = iops->load(env, lfsck->li_di_oit, 0);
325                 if (rc > 0) {
326                         lfsck->li_oit_over = 1;
327                         rc = 0;
328                 }
329
330                 GOTO(out, rc);
331         }
332
333         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
334         if (rc < 0)
335                 GOTO(out, rc);
336         else if (rc > 0)
337                 lfsck->li_oit_over = 1;
338
339         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
340                 GOTO(out, rc = 0);
341
342         /* Find the directory for namespace-based traverse. */
343         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
344         if (IS_ERR(obj))
345                 RETURN(PTR_ERR(obj));
346
347         /* XXX: Currently, skip remote object, the consistency for
348          *      remote object will be processed in LFSCK phase III. */
349         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
350             unlikely(!S_ISDIR(lfsck_object_type(obj))))
351                 GOTO(out, rc = 0);
352
353         if (unlikely(!dt_try_as_dir(env, obj)))
354                 GOTO(out, rc = -ENOTDIR);
355
356         /* Init the namespace-based directory traverse. */
357         iops = &obj->do_index_ops->dio_it;
358         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
359         if (IS_ERR(di))
360                 GOTO(out, rc = PTR_ERR(di));
361
362         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
363
364         rc = iops->load(env, di, pos->lp_dir_cookie);
365         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
366                 rc = iops->next(env, di);
367         else if (rc > 0)
368                 rc = 0;
369
370         if (rc != 0) {
371                 iops->put(env, di);
372                 iops->fini(env, di);
373                 GOTO(out, rc);
374         }
375
376         lfsck->li_obj_dir = lfsck_object_get(obj);
377         lfsck->li_cookie_dir = iops->store(env, di);
378         spin_lock(&lfsck->li_lock);
379         lfsck->li_di_dir = di;
380         spin_unlock(&lfsck->li_lock);
381
382         GOTO(out, rc = 0);
383
384 out:
385         if (obj != NULL)
386                 lfsck_object_put(env, obj);
387
388         if (rc < 0) {
389                 list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
390                                          lc_link)
391                         com->lc_ops->lfsck_post(env, com, rc, true);
392
393                 return rc;
394         }
395
396         rc = 0;
397         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
398         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
399                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
400                 if (rc != 0)
401                         break;
402         }
403
404         lfsck->li_time_last_checkpoint = cfs_time_current();
405         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
406                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
407         return rc;
408 }
409
410 static int lfsck_exec_oit(const struct lu_env *env,
411                           struct lfsck_instance *lfsck, struct dt_object *obj)
412 {
413         struct lfsck_component *com;
414         const struct dt_it_ops *iops;
415         struct dt_it           *di;
416         int                     rc;
417         ENTRY;
418
419         LASSERT(lfsck->li_obj_dir == NULL);
420
421         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
422                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
423                 if (rc != 0)
424                         RETURN(rc);
425         }
426
427         rc = lfsck_needs_scan_dir(env, lfsck, obj);
428         if (rc <= 0)
429                 GOTO(out, rc);
430
431         if (unlikely(!dt_try_as_dir(env, obj)))
432                 GOTO(out, rc = -ENOTDIR);
433
434         iops = &obj->do_index_ops->dio_it;
435         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
436         if (IS_ERR(di))
437                 GOTO(out, rc = PTR_ERR(di));
438
439         rc = iops->load(env, di, 0);
440         if (rc == 0)
441                 rc = iops->next(env, di);
442         else if (rc > 0)
443                 rc = 0;
444
445         if (rc != 0) {
446                 iops->put(env, di);
447                 iops->fini(env, di);
448                 GOTO(out, rc);
449         }
450
451         lfsck->li_obj_dir = lfsck_object_get(obj);
452         lfsck->li_cookie_dir = iops->store(env, di);
453         spin_lock(&lfsck->li_lock);
454         lfsck->li_di_dir = di;
455         spin_unlock(&lfsck->li_lock);
456
457         GOTO(out, rc = 0);
458
459 out:
460         if (rc < 0)
461                 lfsck_fail(env, lfsck, false);
462         return (rc > 0 ? 0 : rc);
463 }
464
465 static int lfsck_exec_dir(const struct lu_env *env,
466                           struct lfsck_instance *lfsck,
467                           struct dt_object *obj, struct lu_dirent *ent)
468 {
469         struct lfsck_component *com;
470         int                     rc;
471
472         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
473                 rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
474                 if (rc != 0)
475                         return rc;
476         }
477         return 0;
478 }
479
480 static int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
481                       int result)
482 {
483         struct lfsck_component *com;
484         struct lfsck_component *next;
485         int                     rc  = 0;
486         int                     rc1 = 0;
487
488         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
489         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
490                 rc = com->lc_ops->lfsck_post(env, com, result, false);
491                 if (rc != 0)
492                         rc1 = rc;
493         }
494
495         lfsck->li_time_last_checkpoint = cfs_time_current();
496         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
497                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
498
499         /* Ignore some component post failure to make other can go ahead. */
500         return result;
501 }
502
503 static int lfsck_double_scan(const struct lu_env *env,
504                              struct lfsck_instance *lfsck)
505 {
506         struct lfsck_component *com;
507         struct lfsck_component *next;
508         struct l_wait_info      lwi = { 0 };
509         int                     rc  = 0;
510         int                     rc1 = 0;
511
512         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
513                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
514                         com->lc_journal = 0;
515
516                 rc = com->lc_ops->lfsck_double_scan(env, com);
517                 if (rc != 0)
518                         rc1 = rc;
519         }
520
521         l_wait_event(lfsck->li_thread.t_ctl_waitq,
522                      atomic_read(&lfsck->li_double_scan_count) == 0,
523                      &lwi);
524
525         if (lfsck->li_status != LS_PAUSED &&
526             lfsck->li_status != LS_CO_PAUSED) {
527                 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
528                                          lc_link) {
529                         spin_lock(&lfsck->li_lock);
530                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
531                         spin_unlock(&lfsck->li_lock);
532                 }
533         }
534
535         return rc1 != 0 ? rc1 : rc;
536 }
537
538 static void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
539 {
540         struct lfsck_component *com;
541         struct lfsck_component *next;
542
543         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
544                                  lc_link) {
545                 if (com->lc_ops->lfsck_quit != NULL)
546                         com->lc_ops->lfsck_quit(env, com);
547
548                 spin_lock(&lfsck->li_lock);
549                 list_del_init(&com->lc_link_dir);
550                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
551                 spin_unlock(&lfsck->li_lock);
552         }
553
554         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
555                                  lc_link) {
556                 if (com->lc_ops->lfsck_quit != NULL)
557                         com->lc_ops->lfsck_quit(env, com);
558
559                 spin_lock(&lfsck->li_lock);
560                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
561                 spin_unlock(&lfsck->li_lock);
562         }
563 }
564
565 /* LFSCK engines */
566
567 static int lfsck_master_dir_engine(const struct lu_env *env,
568                                    struct lfsck_instance *lfsck)
569 {
570         struct lfsck_thread_info        *info   = lfsck_env_info(env);
571         struct dt_object                *dir    = lfsck->li_obj_dir;
572         const struct dt_it_ops          *iops   = &dir->do_index_ops->dio_it;
573         struct dt_it                    *di     = lfsck->li_di_dir;
574         struct lu_dirent                *ent    =
575                         (struct lu_dirent *)info->lti_key;
576         struct lu_fid                   *fid    = &info->lti_fid;
577         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
578         struct ptlrpc_thread            *thread = &lfsck->li_thread;
579         int                              rc;
580         ENTRY;
581
582         do {
583                 struct dt_object *child;
584
585                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
586                     cfs_fail_val > 0) {
587                         struct l_wait_info lwi;
588
589                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
590                                           NULL, NULL);
591                         l_wait_event(thread->t_ctl_waitq,
592                                      !thread_is_running(thread),
593                                      &lwi);
594                 }
595
596                 lfsck->li_new_scanned++;
597                 rc = iops->rec(env, di, (struct dt_rec *)ent,
598                                lfsck->li_args_dir);
599                 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
600                 if (rc != 0) {
601                         CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
602                                "parent "DFID", cookie "LPX64": rc = %d\n",
603                                lfsck_lfsck2name(lfsck),
604                                PFID(lfsck_dto2fid(dir)),
605                                lfsck->li_cookie_dir, rc);
606                         lfsck_fail(env, lfsck, true);
607                         if (bk->lb_param & LPF_FAILOUT)
608                                 RETURN(rc);
609                         else
610                                 goto checkpoint;
611                 }
612
613                 if (ent->lde_attrs & LUDA_IGNORE)
614                         goto checkpoint;
615
616                 *fid = ent->lde_fid;
617                 child = lfsck_object_find(env, lfsck, fid);
618                 if (IS_ERR(child)) {
619                         CDEBUG(D_LFSCK, "%s: scan dir failed at find target, "
620                                "parent "DFID", child %.*s "DFID": rc = %d\n",
621                                lfsck_lfsck2name(lfsck),
622                                PFID(lfsck_dto2fid(dir)),
623                                ent->lde_namelen, ent->lde_name,
624                                PFID(&ent->lde_fid), rc);
625                         lfsck_fail(env, lfsck, true);
626                         if (bk->lb_param & LPF_FAILOUT)
627                                 RETURN(PTR_ERR(child));
628                         else
629                                 goto checkpoint;
630                 }
631
632                 /* XXX: Currently, skip remote object, the consistency for
633                  *      remote object will be processed in LFSCK phase III. */
634                 if (dt_object_exists(child) && !dt_object_remote(child))
635                         rc = lfsck_exec_dir(env, lfsck, child, ent);
636                 lfsck_object_put(env, child);
637                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
638                         RETURN(rc);
639
640 checkpoint:
641                 rc = lfsck_checkpoint(env, lfsck);
642                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
643                         RETURN(rc);
644
645                 /* Rate control. */
646                 lfsck_control_speed(lfsck);
647                 if (unlikely(!thread_is_running(thread))) {
648                         CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
649                                "parent "DFID", cookie "LPX64"\n",
650                                lfsck_lfsck2name(lfsck),
651                                PFID(lfsck_dto2fid(dir)),
652                                lfsck->li_cookie_dir);
653                         RETURN(0);
654                 }
655
656                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
657                         spin_lock(&lfsck->li_lock);
658                         thread_set_flags(thread, SVC_STOPPING);
659                         spin_unlock(&lfsck->li_lock);
660                         RETURN(-EINVAL);
661                 }
662
663                 rc = iops->next(env, di);
664         } while (rc == 0);
665
666         if (rc > 0 && !lfsck->li_oit_over)
667                 lfsck_close_dir(env, lfsck);
668
669         RETURN(rc);
670 }
671
672 static int lfsck_master_oit_engine(const struct lu_env *env,
673                                    struct lfsck_instance *lfsck)
674 {
675         struct lfsck_thread_info        *info   = lfsck_env_info(env);
676         const struct dt_it_ops          *iops   =
677                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
678         struct dt_it                    *di     = lfsck->li_di_oit;
679         struct lu_fid                   *fid    = &info->lti_fid;
680         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
681         struct ptlrpc_thread            *thread = &lfsck->li_thread;
682         __u32                            idx    =
683                                 lfsck_dev_idx(lfsck->li_bottom);
684         int                              rc;
685         ENTRY;
686
687         do {
688                 struct dt_object *target;
689                 bool              update_lma = false;
690
691                 if (lfsck->li_di_dir != NULL) {
692                         rc = lfsck_master_dir_engine(env, lfsck);
693                         if (rc <= 0)
694                                 RETURN(rc);
695                 }
696
697                 if (unlikely(lfsck->li_oit_over))
698                         RETURN(1);
699
700                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
701                     cfs_fail_val > 0) {
702                         struct l_wait_info lwi;
703
704                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
705                                           NULL, NULL);
706                         l_wait_event(thread->t_ctl_waitq,
707                                      !thread_is_running(thread),
708                                      &lwi);
709                 }
710
711                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
712                         RETURN(0);
713
714                 lfsck->li_current_oit_processed = 1;
715                 lfsck->li_new_scanned++;
716                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
717                 if (rc != 0) {
718                         CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
719                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
720                         lfsck_fail(env, lfsck, true);
721                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
722                                 RETURN(rc);
723                         else
724                                 goto checkpoint;
725                 }
726
727                 if (fid_is_idif(fid)) {
728                         __u32 idx1 = fid_idif_ost_idx(fid);
729
730                         LASSERT(!lfsck->li_master);
731
732                         /* It is an old format device, update the LMA. */
733                         if (idx != idx1) {
734                                 struct ost_id *oi = &info->lti_oi;
735
736                                 fid_to_ostid(fid, oi);
737                                 ostid_to_fid(fid, oi, idx);
738                                 update_lma = true;
739                         }
740                 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
741                            !fid_is_last_id(fid) && !fid_is_root(fid) &&
742                            !fid_seq_is_dot(fid_seq(fid))) {
743                         /* If the FID/object is only used locally and invisible
744                          * to external nodes, then LFSCK will not handle it. */
745                         goto checkpoint;
746                 }
747
748                 target = lfsck_object_find(env, lfsck, fid);
749                 if (IS_ERR(target)) {
750                         CDEBUG(D_LFSCK, "%s: OIT scan failed at find target "
751                                DFID", cookie "LPU64": rc = %d\n",
752                                lfsck_lfsck2name(lfsck), PFID(fid),
753                                iops->store(env, di), rc);
754                         lfsck_fail(env, lfsck, true);
755                         if (bk->lb_param & LPF_FAILOUT)
756                                 RETURN(PTR_ERR(target));
757                         else
758                                 goto checkpoint;
759                 }
760
761                 /* XXX: Currently, skip remote object, the consistency for
762                  *      remote object will be processed in LFSCK phase III. */
763                 if (dt_object_exists(target) && !dt_object_remote(target)) {
764                         if (update_lma) {
765                                 rc = lfsck_update_lma(env, lfsck, target);
766                                 if (rc != 0)
767                                         CDEBUG(D_LFSCK, "%s: fail to update "
768                                                "LMA for "DFID": rc = %d\n",
769                                                lfsck_lfsck2name(lfsck),
770                                                PFID(lfsck_dto2fid(target)), rc);
771                         }
772                         if (rc == 0)
773                                 rc = lfsck_exec_oit(env, lfsck, target);
774                 }
775                 lfsck_object_put(env, target);
776                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
777                         RETURN(rc);
778
779 checkpoint:
780                 rc = lfsck_checkpoint(env, lfsck);
781                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
782                         RETURN(rc);
783
784                 /* Rate control. */
785                 lfsck_control_speed(lfsck);
786
787                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
788                         spin_lock(&lfsck->li_lock);
789                         thread_set_flags(thread, SVC_STOPPING);
790                         spin_unlock(&lfsck->li_lock);
791                         RETURN(-EINVAL);
792                 }
793
794                 rc = iops->next(env, di);
795                 if (unlikely(rc > 0))
796                         lfsck->li_oit_over = 1;
797                 else if (likely(rc == 0))
798                         lfsck->li_current_oit_processed = 0;
799
800                 if (unlikely(!thread_is_running(thread))) {
801                         CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
802                                "cookie "LPU64"\n", lfsck_lfsck2name(lfsck),
803                                iops->store(env, di));
804                         RETURN(0);
805                 }
806         } while (rc == 0 || lfsck->li_di_dir != NULL);
807
808         RETURN(rc);
809 }
810
811 int lfsck_master_engine(void *args)
812 {
813         struct lfsck_thread_args *lta      = args;
814         struct lu_env            *env      = &lta->lta_env;
815         struct lfsck_instance    *lfsck    = lta->lta_lfsck;
816         struct ptlrpc_thread     *thread   = &lfsck->li_thread;
817         struct dt_object         *oit_obj  = lfsck->li_obj_oit;
818         const struct dt_it_ops   *oit_iops = &oit_obj->do_index_ops->dio_it;
819         struct dt_it             *oit_di;
820         struct l_wait_info        lwi      = { 0 };
821         int                       rc;
822         ENTRY;
823
824         if (lfsck->li_master &&
825             (!list_empty(&lfsck->li_list_scan) ||
826              !list_empty(&lfsck->li_list_double_scan))) {
827                 rc = lfsck_verify_lpf(env, lfsck);
828                 /* Fail to verify the .lustre/lost+found/MDTxxxx/ may be not
829                  * fatal, because the .lustre/lost+found/ maybe not accessed
830                  * by the LFSCK if it does not add orphans or others to such
831                  * directory. So go ahead until hit failure when really uses
832                  * the directory. */
833                 if (rc != 0)
834                         CDEBUG(D_LFSCK, "%s: master engine fail to verify the "
835                                ".lustre/lost+found/, go ahead: rc = %d\n",
836                                lfsck_lfsck2name(lfsck), rc);
837         }
838
839         oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
840         if (IS_ERR(oit_di)) {
841                 rc = PTR_ERR(oit_di);
842                 CDEBUG(D_LFSCK, "%s: master engine fail to init iteration: "
843                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
844
845                 GOTO(fini_args, rc);
846         }
847
848         spin_lock(&lfsck->li_lock);
849         lfsck->li_di_oit = oit_di;
850         spin_unlock(&lfsck->li_lock);
851         rc = lfsck_prep(env, lfsck, lta->lta_lsp);
852         if (rc != 0)
853                 GOTO(fini_oit, rc);
854
855         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
856                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
857                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
858                lfsck->li_pos_current.lp_oit_cookie,
859                lfsck->li_pos_current.lp_dir_cookie,
860                PFID(&lfsck->li_pos_current.lp_dir_parent),
861                current_pid());
862
863         spin_lock(&lfsck->li_lock);
864         thread_set_flags(thread, SVC_RUNNING);
865         spin_unlock(&lfsck->li_lock);
866         wake_up_all(&thread->t_ctl_waitq);
867
868         l_wait_event(thread->t_ctl_waitq,
869                      lfsck->li_start_unplug ||
870                      !thread_is_running(thread),
871                      &lwi);
872         if (!thread_is_running(thread))
873                 GOTO(fini_oit, rc = 0);
874
875         if (!list_empty(&lfsck->li_list_scan) ||
876             list_empty(&lfsck->li_list_double_scan))
877                 rc = lfsck_master_oit_engine(env, lfsck);
878         else
879                 rc = 1;
880
881         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
882                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
883                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
884                lfsck->li_pos_current.lp_oit_cookie,
885                lfsck->li_pos_current.lp_dir_cookie,
886                PFID(&lfsck->li_pos_current.lp_dir_parent),
887                current_pid(), rc);
888
889         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
890                 rc = lfsck_post(env, lfsck, rc);
891
892         if (lfsck->li_di_dir != NULL)
893                 lfsck_close_dir(env, lfsck);
894
895 fini_oit:
896         lfsck_di_oit_put(env, lfsck);
897         oit_iops->fini(env, oit_di);
898         if (rc == 1) {
899                 if (!list_empty(&lfsck->li_list_double_scan))
900                         rc = lfsck_double_scan(env, lfsck);
901                 else
902                         rc = 0;
903         } else {
904                 lfsck_quit(env, lfsck);
905         }
906
907         /* XXX: Purge the pinned objects in the future. */
908
909 fini_args:
910         spin_lock(&lfsck->li_lock);
911         thread_set_flags(thread, SVC_STOPPED);
912         spin_unlock(&lfsck->li_lock);
913         wake_up_all(&thread->t_ctl_waitq);
914         lfsck_thread_args_fini(lta);
915         return rc;
916 }