Whamcloud - gitweb
028c584bd66112c7884708c052c5eb9258213c8d
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
39
40 #include "lfsck_internal.h"
41
42 static int lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie, __u16 *type)
43 {
44         struct luda_type        *lt;
45         int                      align = sizeof(*lt) - 1;
46         int                      len;
47
48         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
49         *cookie = le64_to_cpu(ent->lde_hash);
50         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
51         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
52         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
53
54         if (unlikely(!(ent->lde_attrs & LUDA_TYPE)))
55                 return -EINVAL;
56
57         len = (ent->lde_namelen + align) & ~align;
58         lt = (struct luda_type *)(ent->lde_name + len);
59         *type = le16_to_cpu(lt->lt_type);
60
61         /* Make sure the name is terminated with '\0'. The data (object type)
62          * after ent::lde_name maybe broken, but we have stored such data in
63          * the output parameter @type as above. */
64         ent->lde_name[ent->lde_namelen] = '\0';
65
66         return 0;
67 }
68
69 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
70 {
71         const struct dt_it_ops  *iops;
72         struct dt_it            *di;
73
74         spin_lock(&lfsck->li_lock);
75         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
76         di = lfsck->li_di_oit;
77         lfsck->li_di_oit = NULL;
78         spin_unlock(&lfsck->li_lock);
79         iops->put(env, di);
80 }
81
82 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
83 {
84         const struct dt_it_ops  *iops;
85         struct dt_it            *di;
86
87         spin_lock(&lfsck->li_lock);
88         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
89         di = lfsck->li_di_dir;
90         lfsck->li_di_dir = NULL;
91         lfsck->li_cookie_dir = 0;
92         spin_unlock(&lfsck->li_lock);
93         iops->put(env, di);
94 }
95
96 static void lfsck_close_dir(const struct lu_env *env,
97                             struct lfsck_instance *lfsck)
98 {
99         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
100         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
101         struct dt_it            *dir_di   = lfsck->li_di_dir;
102
103         lfsck_di_dir_put(env, lfsck);
104         dir_iops->fini(env, dir_di);
105         lfsck->li_obj_dir = NULL;
106         lfsck_object_put(env, dir_obj);
107 }
108
109 static int lfsck_update_lma(const struct lu_env *env,
110                             struct lfsck_instance *lfsck, struct dt_object *obj)
111 {
112         struct lfsck_thread_info        *info   = lfsck_env_info(env);
113         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
114         struct dt_device                *dt     = lfsck->li_bottom;
115         struct lustre_mdt_attrs         *lma    = &info->lti_lma;
116         struct lu_buf                   *buf;
117         struct thandle                  *th;
118         int                              fl;
119         int                              rc;
120         ENTRY;
121
122         if (bk->lb_param & LPF_DRYRUN)
123                 RETURN(0);
124
125         buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
126         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
127         if (rc < 0) {
128                 if (rc != -ENODATA)
129                         RETURN(rc);
130
131                 fl = LU_XATTR_CREATE;
132                 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
133         } else {
134                 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
135                         RETURN(-EINVAL);
136
137                 fl = LU_XATTR_REPLACE;
138                 lustre_lma_swab(lma);
139                 lustre_lma_init(lma, lfsck_dto2fid(obj),
140                                 lma->lma_compat | LMAC_FID_ON_OST,
141                                 lma->lma_incompat);
142         }
143         lustre_lma_swab(lma);
144
145         th = dt_trans_create(env, dt);
146         if (IS_ERR(th))
147                 RETURN(PTR_ERR(th));
148
149         buf = lfsck_buf_get(env, lma, sizeof(*lma));
150         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
151         if (rc != 0)
152                 GOTO(stop, rc);
153
154         rc = dt_trans_start(env, dt, th);
155         if (rc != 0)
156                 GOTO(stop, rc);
157
158         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
159
160         GOTO(stop, rc);
161
162 stop:
163         dt_trans_stop(env, dt, th);
164         return rc;
165 }
166
167 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
168                             struct lu_fid *fid)
169 {
170         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
171                      !dt_try_as_dir(env, obj)))
172                 return -ENOTDIR;
173
174         return dt_lookup(env, obj, (struct dt_rec *)fid,
175                          (const struct dt_key *)"..", BYPASS_CAPA);
176 }
177
178 static int lfsck_needs_scan_dir(const struct lu_env *env,
179                                 struct lfsck_instance *lfsck,
180                                 struct dt_object *obj)
181 {
182         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
183         int            depth = 0;
184         int            rc;
185
186         if (list_empty(&lfsck->li_list_dir) || !S_ISDIR(lfsck_object_type(obj)))
187                 RETURN(0);
188
189         while (1) {
190                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
191                  *      which is the agent directory to manage the objects
192                  *      which name entries reside on remote MDTs. Related
193                  *      consistency verification will be processed in LFSCK
194                  *      phase III. */
195                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
196                         if (depth > 0)
197                                 lfsck_object_put(env, obj);
198                         return 1;
199                 }
200
201                 /* No need to check .lustre and its children. */
202                 if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj)))) {
203                         if (depth > 0)
204                                 lfsck_object_put(env, obj);
205                         return 0;
206                 }
207
208                 dt_read_lock(env, obj, MOR_TGT_CHILD);
209                 if (unlikely(lfsck_is_dead_obj(obj))) {
210                         dt_read_unlock(env, obj);
211                         if (depth > 0)
212                                 lfsck_object_put(env, obj);
213                         return 0;
214                 }
215
216                 rc = dt_xattr_get(env, obj,
217                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
218                                   BYPASS_CAPA);
219                 dt_read_unlock(env, obj);
220                 if (rc >= 0) {
221                         if (depth > 0)
222                                 lfsck_object_put(env, obj);
223                         return 1;
224                 }
225
226                 if (rc < 0 && rc != -ENODATA) {
227                         if (depth > 0)
228                                 lfsck_object_put(env, obj);
229                         return rc;
230                 }
231
232                 rc = lfsck_parent_fid(env, obj, fid);
233                 if (depth > 0)
234                         lfsck_object_put(env, obj);
235                 if (rc != 0)
236                         return rc;
237
238                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
239                         return 0;
240
241                 obj = lfsck_object_find(env, lfsck, fid);
242                 if (IS_ERR(obj))
243                         return PTR_ERR(obj);
244
245                 if (!dt_object_exists(obj)) {
246                         lfsck_object_put(env, obj);
247                         return 0;
248                 }
249
250                 if (dt_object_remote(obj)) {
251                         /* .lustre/lost+found/MDTxxx can be remote directory. */
252                         if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj))))
253                                 rc = 0;
254                         else
255                                 /* Other remote directory should be client
256                                  * visible and need to be checked. */
257                                 rc = 1;
258                         lfsck_object_put(env, obj);
259                         return rc;
260                 }
261
262                 depth++;
263         }
264         return 0;
265 }
266
267 /* LFSCK wrap functions */
268
269 static void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
270                        bool new_checked)
271 {
272         struct lfsck_component *com;
273
274         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
275                 com->lc_ops->lfsck_fail(env, com, new_checked);
276         }
277 }
278
279 static int lfsck_checkpoint(const struct lu_env *env,
280                             struct lfsck_instance *lfsck)
281 {
282         struct lfsck_component *com;
283         int                     rc  = 0;
284         int                     rc1 = 0;
285
286         if (likely(cfs_time_beforeq(cfs_time_current(),
287                                     lfsck->li_time_next_checkpoint)))
288                 return 0;
289
290         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
291         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
292                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
293                 if (rc != 0)
294                         rc1 = rc;
295         }
296
297         lfsck->li_time_last_checkpoint = cfs_time_current();
298         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
299                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
300         return rc1 != 0 ? rc1 : rc;
301 }
302
303 static int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
304                       struct lfsck_start_param *lsp)
305 {
306         struct dt_object       *obj     = NULL;
307         struct lfsck_component *com;
308         struct lfsck_component *next;
309         struct lfsck_position  *pos     = NULL;
310         const struct dt_it_ops *iops    =
311                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
312         struct dt_it           *di;
313         int                     rc;
314         ENTRY;
315
316         LASSERT(lfsck->li_obj_dir == NULL);
317         LASSERT(lfsck->li_di_dir == NULL);
318
319         lfsck->li_current_oit_processed = 0;
320         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
321                 com->lc_new_checked = 0;
322                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
323                         com->lc_journal = 0;
324
325                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
326                 if (rc != 0)
327                         GOTO(out, rc);
328
329                 if ((pos == NULL) ||
330                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
331                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
332                         pos = &com->lc_pos_start;
333         }
334
335         /* Init otable-based iterator. */
336         if (pos == NULL) {
337                 rc = iops->load(env, lfsck->li_di_oit, 0);
338                 if (rc > 0) {
339                         lfsck->li_oit_over = 1;
340                         rc = 0;
341                 }
342
343                 GOTO(out, rc);
344         }
345
346         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
347         if (rc < 0)
348                 GOTO(out, rc);
349         else if (rc > 0)
350                 lfsck->li_oit_over = 1;
351
352         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
353                 GOTO(out, rc = 0);
354
355         /* Find the directory for namespace-based traverse. */
356         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
357         if (IS_ERR(obj))
358                 RETURN(PTR_ERR(obj));
359
360         /* XXX: Currently, skip remote object, the consistency for
361          *      remote object will be processed in LFSCK phase III. */
362         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
363             unlikely(!S_ISDIR(lfsck_object_type(obj))))
364                 GOTO(out, rc = 0);
365
366         if (unlikely(!dt_try_as_dir(env, obj)))
367                 GOTO(out, rc = -ENOTDIR);
368
369         /* Init the namespace-based directory traverse. */
370         iops = &obj->do_index_ops->dio_it;
371         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
372         if (IS_ERR(di))
373                 GOTO(out, rc = PTR_ERR(di));
374
375         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
376
377         rc = iops->load(env, di, pos->lp_dir_cookie);
378         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
379                 rc = iops->next(env, di);
380         else if (rc > 0)
381                 rc = 0;
382
383         if (rc != 0) {
384                 iops->put(env, di);
385                 iops->fini(env, di);
386                 GOTO(out, rc);
387         }
388
389         lfsck->li_obj_dir = lfsck_object_get(obj);
390         lfsck->li_cookie_dir = iops->store(env, di);
391         spin_lock(&lfsck->li_lock);
392         lfsck->li_di_dir = di;
393         spin_unlock(&lfsck->li_lock);
394
395         GOTO(out, rc = 0);
396
397 out:
398         if (obj != NULL)
399                 lfsck_object_put(env, obj);
400
401         if (rc < 0) {
402                 list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
403                                          lc_link)
404                         com->lc_ops->lfsck_post(env, com, rc, true);
405
406                 return rc;
407         }
408
409         rc = 0;
410         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, true);
411         lfsck->li_pos_current = lfsck->li_pos_checkpoint;
412         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
413                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
414                 if (rc != 0)
415                         break;
416         }
417
418         lfsck->li_time_last_checkpoint = cfs_time_current();
419         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
420                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
421         return rc;
422 }
423
424 static int lfsck_exec_oit(const struct lu_env *env,
425                           struct lfsck_instance *lfsck, struct dt_object *obj)
426 {
427         struct lfsck_component *com;
428         const struct dt_it_ops *iops;
429         struct dt_it           *di;
430         int                     rc;
431         ENTRY;
432
433         LASSERT(lfsck->li_obj_dir == NULL);
434
435         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
436                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
437                 if (rc != 0)
438                         RETURN(rc);
439         }
440
441         rc = lfsck_needs_scan_dir(env, lfsck, obj);
442         if (rc <= 0)
443                 GOTO(out, rc);
444
445         if (unlikely(!dt_try_as_dir(env, obj)))
446                 GOTO(out, rc = -ENOTDIR);
447
448         iops = &obj->do_index_ops->dio_it;
449         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
450         if (IS_ERR(di))
451                 GOTO(out, rc = PTR_ERR(di));
452
453         rc = iops->load(env, di, 0);
454         if (rc == 0)
455                 rc = iops->next(env, di);
456         else if (rc > 0)
457                 rc = 0;
458
459         if (rc != 0) {
460                 iops->put(env, di);
461                 iops->fini(env, di);
462                 GOTO(out, rc);
463         }
464
465         lfsck->li_obj_dir = lfsck_object_get(obj);
466         lfsck->li_cookie_dir = iops->store(env, di);
467         spin_lock(&lfsck->li_lock);
468         lfsck->li_di_dir = di;
469         spin_unlock(&lfsck->li_lock);
470
471         GOTO(out, rc = 0);
472
473 out:
474         if (rc < 0)
475                 lfsck_fail(env, lfsck, false);
476         return (rc > 0 ? 0 : rc);
477 }
478
479 static int lfsck_exec_dir(const struct lu_env *env,
480                           struct lfsck_instance *lfsck,
481                           struct lu_dirent *ent, __u16 type)
482 {
483         struct lfsck_component *com;
484         int                     rc;
485
486         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
487                 rc = com->lc_ops->lfsck_exec_dir(env, com, ent, type);
488                 if (rc != 0)
489                         return rc;
490         }
491         return 0;
492 }
493
494 static int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
495                       int result)
496 {
497         struct lfsck_component *com;
498         struct lfsck_component *next;
499         int                     rc  = 0;
500         int                     rc1 = 0;
501
502         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
503         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
504                 rc = com->lc_ops->lfsck_post(env, com, result, false);
505                 if (rc != 0)
506                         rc1 = rc;
507         }
508
509         lfsck->li_time_last_checkpoint = cfs_time_current();
510         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
511                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
512
513         /* Ignore some component post failure to make other can go ahead. */
514         return result;
515 }
516
517 static int lfsck_double_scan(const struct lu_env *env,
518                              struct lfsck_instance *lfsck)
519 {
520         struct lfsck_component *com;
521         struct lfsck_component *next;
522         struct l_wait_info      lwi = { 0 };
523         int                     rc  = 0;
524         int                     rc1 = 0;
525
526         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
527                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
528                         com->lc_journal = 0;
529
530                 rc = com->lc_ops->lfsck_double_scan(env, com);
531                 if (rc != 0)
532                         rc1 = rc;
533         }
534
535         l_wait_event(lfsck->li_thread.t_ctl_waitq,
536                      atomic_read(&lfsck->li_double_scan_count) == 0,
537                      &lwi);
538
539         if (lfsck->li_status != LS_PAUSED &&
540             lfsck->li_status != LS_CO_PAUSED) {
541                 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
542                                          lc_link) {
543                         spin_lock(&lfsck->li_lock);
544                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
545                         spin_unlock(&lfsck->li_lock);
546                 }
547         }
548
549         return rc1 != 0 ? rc1 : rc;
550 }
551
552 static void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
553 {
554         struct lfsck_component *com;
555         struct lfsck_component *next;
556
557         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
558                                  lc_link) {
559                 if (com->lc_ops->lfsck_quit != NULL)
560                         com->lc_ops->lfsck_quit(env, com);
561
562                 spin_lock(&lfsck->li_lock);
563                 list_del_init(&com->lc_link_dir);
564                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
565                 spin_unlock(&lfsck->li_lock);
566         }
567
568         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
569                                  lc_link) {
570                 if (com->lc_ops->lfsck_quit != NULL)
571                         com->lc_ops->lfsck_quit(env, com);
572
573                 spin_lock(&lfsck->li_lock);
574                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
575                 spin_unlock(&lfsck->li_lock);
576         }
577 }
578
579 /* LFSCK engines */
580
581 static int lfsck_master_dir_engine(const struct lu_env *env,
582                                    struct lfsck_instance *lfsck)
583 {
584         struct lfsck_thread_info        *info   = lfsck_env_info(env);
585         struct dt_object                *dir    = lfsck->li_obj_dir;
586         const struct dt_it_ops          *iops   = &dir->do_index_ops->dio_it;
587         struct dt_it                    *di     = lfsck->li_di_dir;
588         struct lu_dirent                *ent    =
589                         (struct lu_dirent *)info->lti_key;
590         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
591         struct ptlrpc_thread            *thread = &lfsck->li_thread;
592         int                              rc;
593         __u16                            type;
594         ENTRY;
595
596         do {
597                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
598                     cfs_fail_val > 0) {
599                         struct l_wait_info lwi;
600
601                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
602                                           NULL, NULL);
603                         l_wait_event(thread->t_ctl_waitq,
604                                      !thread_is_running(thread),
605                                      &lwi);
606                 }
607
608                 lfsck->li_new_scanned++;
609                 rc = iops->rec(env, di, (struct dt_rec *)ent,
610                                lfsck->li_args_dir);
611                 if (rc == 0)
612                         rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir,
613                                               &type);
614
615                 if (rc != 0) {
616                         CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
617                                "parent "DFID", cookie "LPX64": rc = %d\n",
618                                lfsck_lfsck2name(lfsck),
619                                PFID(lfsck_dto2fid(dir)),
620                                lfsck->li_cookie_dir, rc);
621                         lfsck_fail(env, lfsck, true);
622                         if (bk->lb_param & LPF_FAILOUT)
623                                 RETURN(rc);
624                         else
625                                 goto checkpoint;
626                 }
627
628                 if (ent->lde_attrs & LUDA_IGNORE)
629                         goto checkpoint;
630
631                 /* The type in the @ent structure may has been overwritten,
632                  * so we need to pass the @type parameter independently. */
633                 rc = lfsck_exec_dir(env, lfsck, ent, type);
634                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
635                         RETURN(rc);
636
637 checkpoint:
638                 rc = lfsck_checkpoint(env, lfsck);
639                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
640                         RETURN(rc);
641
642                 /* Rate control. */
643                 lfsck_control_speed(lfsck);
644                 if (unlikely(!thread_is_running(thread))) {
645                         CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
646                                "parent "DFID", cookie "LPX64"\n",
647                                lfsck_lfsck2name(lfsck),
648                                PFID(lfsck_dto2fid(dir)),
649                                lfsck->li_cookie_dir);
650                         RETURN(0);
651                 }
652
653                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
654                         spin_lock(&lfsck->li_lock);
655                         thread_set_flags(thread, SVC_STOPPING);
656                         spin_unlock(&lfsck->li_lock);
657                         RETURN(-EINVAL);
658                 }
659
660                 rc = iops->next(env, di);
661         } while (rc == 0);
662
663         if (rc > 0 && !lfsck->li_oit_over)
664                 lfsck_close_dir(env, lfsck);
665
666         RETURN(rc);
667 }
668
669 static int lfsck_master_oit_engine(const struct lu_env *env,
670                                    struct lfsck_instance *lfsck)
671 {
672         struct lfsck_thread_info        *info   = lfsck_env_info(env);
673         const struct dt_it_ops          *iops   =
674                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
675         struct dt_it                    *di     = lfsck->li_di_oit;
676         struct lu_fid                   *fid    = &info->lti_fid;
677         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
678         struct ptlrpc_thread            *thread = &lfsck->li_thread;
679         __u32                            idx    =
680                                 lfsck_dev_idx(lfsck->li_bottom);
681         int                              rc;
682         ENTRY;
683
684         do {
685                 struct dt_object *target;
686                 bool              update_lma = false;
687
688                 if (lfsck->li_di_dir != NULL) {
689                         rc = lfsck_master_dir_engine(env, lfsck);
690                         if (rc <= 0)
691                                 RETURN(rc);
692                 }
693
694                 if (unlikely(lfsck->li_oit_over))
695                         RETURN(1);
696
697                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
698                     cfs_fail_val > 0) {
699                         struct l_wait_info lwi;
700
701                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
702                                           NULL, NULL);
703                         l_wait_event(thread->t_ctl_waitq,
704                                      !thread_is_running(thread),
705                                      &lwi);
706                 }
707
708                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
709                         RETURN(0);
710
711                 lfsck->li_current_oit_processed = 1;
712                 lfsck->li_new_scanned++;
713                 lfsck->li_pos_current.lp_oit_cookie = iops->store(env, di);
714                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
715                 if (rc != 0) {
716                         CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
717                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
718                         lfsck_fail(env, lfsck, true);
719                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
720                                 RETURN(rc);
721                         else
722                                 goto checkpoint;
723                 }
724
725                 if (fid_is_idif(fid)) {
726                         __u32 idx1 = fid_idif_ost_idx(fid);
727
728                         LASSERT(!lfsck->li_master);
729
730                         /* It is an old format device, update the LMA. */
731                         if (idx != idx1) {
732                                 struct ost_id *oi = &info->lti_oi;
733
734                                 fid_to_ostid(fid, oi);
735                                 ostid_to_fid(fid, oi, idx);
736                                 update_lma = true;
737                         }
738                 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
739                            !fid_is_last_id(fid) && !fid_is_root(fid) &&
740                            !fid_seq_is_dot(fid_seq(fid))) {
741                         /* If the FID/object is only used locally and invisible
742                          * to external nodes, then LFSCK will not handle it. */
743                         goto checkpoint;
744                 }
745
746                 target = lfsck_object_find(env, lfsck, fid);
747                 if (IS_ERR(target)) {
748                         CDEBUG(D_LFSCK, "%s: OIT scan failed at find target "
749                                DFID", cookie "LPU64": rc = %d\n",
750                                lfsck_lfsck2name(lfsck), PFID(fid),
751                                iops->store(env, di), rc);
752                         lfsck_fail(env, lfsck, true);
753                         if (bk->lb_param & LPF_FAILOUT)
754                                 RETURN(PTR_ERR(target));
755                         else
756                                 goto checkpoint;
757                 }
758
759                 /* XXX: Currently, skip remote object, the consistency for
760                  *      remote object will be processed in LFSCK phase III. */
761                 if (dt_object_exists(target) && !dt_object_remote(target)) {
762                         if (update_lma) {
763                                 rc = lfsck_update_lma(env, lfsck, target);
764                                 if (rc != 0)
765                                         CDEBUG(D_LFSCK, "%s: fail to update "
766                                                "LMA for "DFID": rc = %d\n",
767                                                lfsck_lfsck2name(lfsck),
768                                                PFID(lfsck_dto2fid(target)), rc);
769                         }
770                         if (rc == 0)
771                                 rc = lfsck_exec_oit(env, lfsck, target);
772                 }
773                 lfsck_object_put(env, target);
774                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
775                         RETURN(rc);
776
777 checkpoint:
778                 rc = lfsck_checkpoint(env, lfsck);
779                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
780                         RETURN(rc);
781
782                 /* Rate control. */
783                 lfsck_control_speed(lfsck);
784
785                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
786                         spin_lock(&lfsck->li_lock);
787                         thread_set_flags(thread, SVC_STOPPING);
788                         spin_unlock(&lfsck->li_lock);
789                         RETURN(-EINVAL);
790                 }
791
792                 rc = iops->next(env, di);
793                 if (unlikely(rc > 0))
794                         lfsck->li_oit_over = 1;
795                 else if (likely(rc == 0))
796                         lfsck->li_current_oit_processed = 0;
797
798                 if (unlikely(!thread_is_running(thread))) {
799                         CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
800                                "cookie "LPU64"\n", lfsck_lfsck2name(lfsck),
801                                iops->store(env, di));
802                         RETURN(0);
803                 }
804         } while (rc == 0 || lfsck->li_di_dir != NULL);
805
806         RETURN(rc);
807 }
808
809 int lfsck_master_engine(void *args)
810 {
811         struct lfsck_thread_args *lta      = args;
812         struct lu_env            *env      = &lta->lta_env;
813         struct lfsck_instance    *lfsck    = lta->lta_lfsck;
814         struct ptlrpc_thread     *thread   = &lfsck->li_thread;
815         struct dt_object         *oit_obj  = lfsck->li_obj_oit;
816         const struct dt_it_ops   *oit_iops = &oit_obj->do_index_ops->dio_it;
817         struct dt_it             *oit_di;
818         struct l_wait_info        lwi      = { 0 };
819         int                       rc;
820         ENTRY;
821
822         if (lfsck->li_master &&
823             (!list_empty(&lfsck->li_list_scan) ||
824              !list_empty(&lfsck->li_list_double_scan))) {
825                 rc = lfsck_verify_lpf(env, lfsck);
826                 /* Fail to verify the .lustre/lost+found/MDTxxxx/ may be not
827                  * fatal, because the .lustre/lost+found/ maybe not accessed
828                  * by the LFSCK if it does not add orphans or others to such
829                  * directory. So go ahead until hit failure when really uses
830                  * the directory. */
831                 if (rc != 0)
832                         CDEBUG(D_LFSCK, "%s: master engine fail to verify the "
833                                ".lustre/lost+found/, go ahead: rc = %d\n",
834                                lfsck_lfsck2name(lfsck), rc);
835         }
836
837         oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
838         if (IS_ERR(oit_di)) {
839                 rc = PTR_ERR(oit_di);
840                 CDEBUG(D_LFSCK, "%s: master engine fail to init iteration: "
841                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
842
843                 GOTO(fini_args, rc);
844         }
845
846         spin_lock(&lfsck->li_lock);
847         lfsck->li_di_oit = oit_di;
848         spin_unlock(&lfsck->li_lock);
849         rc = lfsck_prep(env, lfsck, lta->lta_lsp);
850         if (rc != 0)
851                 GOTO(fini_oit, rc);
852
853         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
854                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
855                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
856                lfsck->li_pos_checkpoint.lp_oit_cookie,
857                lfsck->li_pos_checkpoint.lp_dir_cookie,
858                PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
859                current_pid());
860
861         spin_lock(&lfsck->li_lock);
862         thread_set_flags(thread, SVC_RUNNING);
863         spin_unlock(&lfsck->li_lock);
864         wake_up_all(&thread->t_ctl_waitq);
865
866         l_wait_event(thread->t_ctl_waitq,
867                      lfsck->li_start_unplug ||
868                      !thread_is_running(thread),
869                      &lwi);
870         if (!thread_is_running(thread))
871                 GOTO(fini_oit, rc = 0);
872
873         if (!list_empty(&lfsck->li_list_scan) ||
874             list_empty(&lfsck->li_list_double_scan))
875                 rc = lfsck_master_oit_engine(env, lfsck);
876         else
877                 rc = 1;
878
879         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
880                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
881                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
882                lfsck->li_pos_checkpoint.lp_oit_cookie,
883                lfsck->li_pos_checkpoint.lp_dir_cookie,
884                PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
885                current_pid(), rc);
886
887         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
888                 rc = lfsck_post(env, lfsck, rc);
889
890         if (lfsck->li_di_dir != NULL)
891                 lfsck_close_dir(env, lfsck);
892
893 fini_oit:
894         lfsck_di_oit_put(env, lfsck);
895         oit_iops->fini(env, oit_di);
896         if (rc == 1) {
897                 if (!list_empty(&lfsck->li_list_double_scan))
898                         rc = lfsck_double_scan(env, lfsck);
899                 else
900                         rc = 0;
901         } else {
902                 lfsck_quit(env, lfsck);
903         }
904
905         /* XXX: Purge the pinned objects in the future. */
906
907 fini_args:
908         spin_lock(&lfsck->li_lock);
909         thread_set_flags(thread, SVC_STOPPED);
910         spin_unlock(&lfsck->li_lock);
911         wake_up_all(&thread->t_ctl_waitq);
912         lfsck_thread_args_fini(lta);
913         return rc;
914 }
915
916 static inline bool lfsck_assistant_req_empty(struct lfsck_assistant_data *lad)
917 {
918         bool empty = false;
919
920         spin_lock(&lad->lad_lock);
921         if (list_empty(&lad->lad_req_list))
922                 empty = true;
923         spin_unlock(&lad->lad_lock);
924
925         return empty;
926 }
927
928 /**
929  * Query the LFSCK status from the instatnces on remote servers.
930  *
931  * The LFSCK assistant thread queries the LFSCK instances on other
932  * servers (MDT/OST) about their status, such as whether they have
933  * finished the phase1/phase2 scanning or not, and so on.
934  *
935  * \param[in] env       pointer to the thread context
936  * \param[in] com       pointer to the lfsck component
937  *
938  * \retval              0 for success
939  * \retval              negative error number on failure
940  */
941 static int lfsck_assistant_query_others(const struct lu_env *env,
942                                         struct lfsck_component *com)
943 {
944         struct lfsck_thread_info          *info  = lfsck_env_info(env);
945         struct lfsck_request              *lr    = &info->lti_lr;
946         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
947         struct lfsck_instance             *lfsck = com->lc_lfsck;
948         struct lfsck_assistant_data       *lad   = com->lc_data;
949         struct ptlrpc_request_set         *set;
950         struct lfsck_tgt_descs            *ltds;
951         struct lfsck_tgt_desc             *ltd;
952         struct list_head                  *phase_head;
953         int                                rc    = 0;
954         int                                rc1   = 0;
955         ENTRY;
956
957         set = ptlrpc_prep_set();
958         if (set == NULL)
959                 RETURN(-ENOMEM);
960
961         lad->lad_touch_gen++;
962         memset(lr, 0, sizeof(*lr));
963         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
964         lr->lr_event = LE_QUERY;
965         lr->lr_active = com->lc_type;
966         laia->laia_com = com;
967         laia->laia_lr = lr;
968         laia->laia_shared = 0;
969
970         if (!list_empty(&lad->lad_mdt_phase1_list)) {
971                 ltds = &lfsck->li_mdt_descs;
972                 lr->lr_flags = 0;
973                 phase_head = &lad->lad_mdt_phase1_list;
974         } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
975                 goto out;
976         } else {
977
978 again:
979                 ltds = &lfsck->li_ost_descs;
980                 lr->lr_flags = LEF_TO_OST;
981                 phase_head = &lad->lad_ost_phase1_list;
982         }
983
984         laia->laia_ltds = ltds;
985         spin_lock(&ltds->ltd_lock);
986         while (!list_empty(phase_head)) {
987                 struct list_head *phase_list;
988                 __u32            *gen;
989
990                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
991                         ltd = list_entry(phase_head->next,
992                                          struct lfsck_tgt_desc,
993                                          ltd_layout_phase_list);
994                         phase_list = &ltd->ltd_layout_phase_list;
995                         gen = &ltd->ltd_layout_gen;
996                 } else {
997                         ltd = list_entry(phase_head->next,
998                                          struct lfsck_tgt_desc,
999                                          ltd_namespace_phase_list);
1000                         phase_list = &ltd->ltd_namespace_phase_list;
1001                         gen = &ltd->ltd_namespace_gen;
1002                 }
1003
1004                 if (*gen == lad->lad_touch_gen)
1005                         break;
1006
1007                 *gen = lad->lad_touch_gen;
1008                 list_move_tail(phase_list, phase_head);
1009                 atomic_inc(&ltd->ltd_ref);
1010                 laia->laia_ltd = ltd;
1011                 spin_unlock(&ltds->ltd_lock);
1012                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1013                                          lfsck_async_interpret_common,
1014                                          laia, LFSCK_QUERY);
1015                 if (rc != 0) {
1016                         CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to query "
1017                                "%s %x for %s: rc = %d\n",
1018                                lfsck_lfsck2name(lfsck),
1019                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1020                                ltd->ltd_index, lad->lad_name, rc);
1021                         lfsck_tgt_put(ltd);
1022                         rc1 = rc;
1023                 }
1024                 spin_lock(&ltds->ltd_lock);
1025         }
1026         spin_unlock(&ltds->ltd_lock);
1027
1028         rc = ptlrpc_set_wait(set);
1029         if (rc < 0) {
1030                 ptlrpc_set_destroy(set);
1031                 RETURN(rc);
1032         }
1033
1034         if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST) &&
1035             list_empty(&lad->lad_mdt_phase1_list))
1036                 goto again;
1037
1038 out:
1039         ptlrpc_set_destroy(set);
1040
1041         RETURN(rc1 != 0 ? rc1 : rc);
1042 }
1043
1044 /**
1045  * Notify the LFSCK event to the instatnces on remote servers.
1046  *
1047  * The LFSCK assistant thread notifies the LFSCK instances on other
1048  * servers (MDT/OST) about some events, such as start new scanning,
1049  * stop the scanning, this LFSCK instance will exit, and so on.
1050  *
1051  * \param[in] env       pointer to the thread context
1052  * \param[in] com       pointer to the lfsck component
1053  * \param[in] lr        pointer to the LFSCK event request
1054  *
1055  * \retval              0 for success
1056  * \retval              negative error number on failure
1057  */
1058 static int lfsck_assistant_notify_others(const struct lu_env *env,
1059                                          struct lfsck_component *com,
1060                                          struct lfsck_request *lr)
1061 {
1062         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1063         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1064         struct lfsck_instance             *lfsck = com->lc_lfsck;
1065         struct lfsck_assistant_data       *lad   = com->lc_data;
1066         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1067         struct ptlrpc_request_set         *set;
1068         struct lfsck_tgt_descs            *ltds;
1069         struct lfsck_tgt_desc             *ltd;
1070         struct lfsck_tgt_desc             *next;
1071         __u32                              idx;
1072         int                                rc    = 0;
1073         int                                rc1   = 0;
1074         ENTRY;
1075
1076         set = ptlrpc_prep_set();
1077         if (set == NULL)
1078                 RETURN(-ENOMEM);
1079
1080         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1081         lr->lr_active = com->lc_type;
1082         laia->laia_com = com;
1083         laia->laia_lr = lr;
1084         laia->laia_shared = 0;
1085
1086         switch (lr->lr_event) {
1087         case LE_START:
1088                 if (com->lc_type != LFSCK_TYPE_LAYOUT)
1089                         goto next;
1090
1091                 lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1092                                LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
1093                 lr->lr_speed = bk->lb_speed_limit;
1094                 lr->lr_version = bk->lb_version;
1095                 lr->lr_param |= bk->lb_param;
1096                 lr->lr_async_windows = bk->lb_async_windows;
1097                 lr->lr_flags = LEF_TO_OST;
1098
1099                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1100                 ltds = &lfsck->li_ost_descs;
1101                 laia->laia_ltds = ltds;
1102                 down_read(&ltds->ltd_rw_sem);
1103                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1104                         ltd = lfsck_tgt_get(ltds, idx);
1105                         LASSERT(ltd != NULL);
1106
1107                         laia->laia_ltd = ltd;
1108                         ltd->ltd_layout_done = 0;
1109                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1110                                         lfsck_async_interpret_common,
1111                                         laia, LFSCK_NOTIFY);
1112                         if (rc != 0) {
1113                                 lfsck_lad_set_bitmap(env, com, idx);
1114                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1115                                        "notify OST %x for %s start: rc = %d\n",
1116                                        lfsck_lfsck2name(lfsck), idx,
1117                                        lad->lad_name, rc);
1118                                 lfsck_tgt_put(ltd);
1119                         }
1120                 }
1121                 up_read(&ltds->ltd_rw_sem);
1122
1123                 /* Sync up */
1124                 rc = ptlrpc_set_wait(set);
1125                 if (rc < 0) {
1126                         ptlrpc_set_destroy(set);
1127                         RETURN(rc);
1128                 }
1129
1130 next:
1131                 if (!(bk->lb_param & LPF_ALL_TGT))
1132                         break;
1133
1134                 /* link other MDT targets locallly. */
1135                 ltds = &lfsck->li_mdt_descs;
1136                 spin_lock(&ltds->ltd_lock);
1137                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1138                         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1139                                 ltd = LTD_TGT(ltds, idx);
1140                                 LASSERT(ltd != NULL);
1141
1142                                 if (!list_empty(&ltd->ltd_layout_list))
1143                                         continue;
1144
1145                                 list_add_tail(&ltd->ltd_layout_list,
1146                                               &lad->lad_mdt_list);
1147                                 list_add_tail(&ltd->ltd_layout_phase_list,
1148                                               &lad->lad_mdt_phase1_list);
1149                         }
1150                 } else {
1151                         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1152                                 ltd = LTD_TGT(ltds, idx);
1153                                 LASSERT(ltd != NULL);
1154
1155                                 if (!list_empty(&ltd->ltd_namespace_list))
1156                                         continue;
1157
1158                                 list_add_tail(&ltd->ltd_namespace_list,
1159                                               &lad->lad_mdt_list);
1160                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1161                                               &lad->lad_mdt_phase1_list);
1162                         }
1163                 }
1164                 spin_unlock(&ltds->ltd_lock);
1165                 break;
1166         case LE_STOP:
1167         case LE_PHASE2_DONE:
1168         case LE_PEER_EXIT: {
1169                 struct list_head *phase_head;
1170
1171                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1172                 if (bk->lb_param & LPF_ALL_TGT) {
1173                         phase_head = &lad->lad_mdt_list;
1174                         ltds = &lfsck->li_mdt_descs;
1175                         if (lr->lr_event == LE_STOP) {
1176                                 /* unlink other MDT targets locallly. */
1177                                 spin_lock(&ltds->ltd_lock);
1178                                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1179                                         list_for_each_entry_safe(ltd, next,
1180                                                 phase_head, ltd_layout_list) {
1181                                                 list_del_init(
1182                                                 &ltd->ltd_layout_phase_list);
1183                                                 list_del_init(
1184                                                 &ltd->ltd_layout_list);
1185                                         }
1186                                 } else {
1187                                         list_for_each_entry_safe(ltd, next,
1188                                                         phase_head,
1189                                                         ltd_namespace_list) {
1190                                                 list_del_init(
1191                                                 &ltd->ltd_namespace_phase_list);
1192                                                 list_del_init(
1193                                                 &ltd->ltd_namespace_list);
1194                                         }
1195                                 }
1196                                 spin_unlock(&ltds->ltd_lock);
1197
1198                                 if (com->lc_type != LFSCK_TYPE_LAYOUT)
1199                                         break;
1200
1201                                 lr->lr_flags |= LEF_TO_OST;
1202                                 phase_head = &lad->lad_ost_list;
1203                                 ltds = &lfsck->li_ost_descs;
1204                         } else {
1205                                 lr->lr_flags &= ~LEF_TO_OST;
1206                         }
1207                 } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
1208                         break;
1209                 } else {
1210                         lr->lr_flags |= LEF_TO_OST;
1211                         phase_head = &lad->lad_ost_list;
1212                         ltds = &lfsck->li_ost_descs;
1213                 }
1214
1215 again:
1216                 laia->laia_ltds = ltds;
1217                 spin_lock(&ltds->ltd_lock);
1218                 while (!list_empty(phase_head)) {
1219                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1220                                 ltd = list_entry(phase_head->next,
1221                                                  struct lfsck_tgt_desc,
1222                                                  ltd_layout_list);
1223                                 if (!list_empty(&ltd->ltd_layout_phase_list))
1224                                         list_del_init(
1225                                                 &ltd->ltd_layout_phase_list);
1226                                 list_del_init(&ltd->ltd_layout_list);
1227                         } else {
1228                                 ltd = list_entry(phase_head->next,
1229                                                  struct lfsck_tgt_desc,
1230                                                  ltd_namespace_list);
1231                                 if (!list_empty(&ltd->ltd_namespace_phase_list))
1232                                         list_del_init(
1233                                                 &ltd->ltd_namespace_phase_list);
1234                                 list_del_init(&ltd->ltd_namespace_list);
1235                         }
1236                         atomic_inc(&ltd->ltd_ref);
1237                         laia->laia_ltd = ltd;
1238                         spin_unlock(&ltds->ltd_lock);
1239                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1240                                         lfsck_async_interpret_common,
1241                                         laia, LFSCK_NOTIFY);
1242                         if (rc != 0) {
1243                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1244                                        "notify %s %x for %s stop/phase2_done/"
1245                                        "peer_exit: rc = %d\n",
1246                                        lfsck_lfsck2name(lfsck),
1247                                        (lr->lr_flags & LEF_TO_OST) ?
1248                                        "OST" : "MDT", ltd->ltd_index,
1249                                        lad->lad_name, rc);
1250                                 lfsck_tgt_put(ltd);
1251                         }
1252                         spin_lock(&ltds->ltd_lock);
1253                 }
1254                 spin_unlock(&ltds->ltd_lock);
1255
1256                 rc = ptlrpc_set_wait(set);
1257                 if (rc < 0) {
1258                         ptlrpc_set_destroy(set);
1259                         RETURN(rc);
1260                 }
1261
1262                 if (com->lc_type == LFSCK_TYPE_LAYOUT &&
1263                     !(lr->lr_flags & LEF_TO_OST)) {
1264                         lr->lr_flags |= LEF_TO_OST;
1265                         phase_head = &lad->lad_ost_list;
1266                         ltds = &lfsck->li_ost_descs;
1267                         goto again;
1268                 }
1269                 break;
1270         }
1271         case LE_PHASE1_DONE:
1272                 lad->lad_ops->la_sync_failures(env, com, lr);
1273                 lad->lad_touch_gen++;
1274                 ltds = &lfsck->li_mdt_descs;
1275                 laia->laia_ltds = ltds;
1276                 spin_lock(&ltds->ltd_lock);
1277                 while (!list_empty(&lad->lad_mdt_list)) {
1278                         struct list_head *list;
1279                         __u32            *gen;
1280
1281                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1282                                 ltd = list_entry(lad->lad_mdt_list.next,
1283                                                  struct lfsck_tgt_desc,
1284                                                  ltd_layout_list);
1285                                 list = &ltd->ltd_layout_list;
1286                                 gen = &ltd->ltd_layout_gen;
1287                         } else {
1288                                 ltd = list_entry(lad->lad_mdt_list.next,
1289                                                  struct lfsck_tgt_desc,
1290                                                  ltd_namespace_list);
1291                                 list = &ltd->ltd_namespace_list;
1292                                 gen = &ltd->ltd_namespace_gen;
1293                         }
1294
1295                         if (*gen == lad->lad_touch_gen)
1296                                 break;
1297
1298                         *gen = lad->lad_touch_gen;
1299                         list_move_tail(list, &lad->lad_mdt_list);
1300                         atomic_inc(&ltd->ltd_ref);
1301                         laia->laia_ltd = ltd;
1302                         spin_unlock(&ltds->ltd_lock);
1303                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1304                                         lfsck_async_interpret_common,
1305                                         laia, LFSCK_NOTIFY);
1306                         if (rc != 0) {
1307                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1308                                        "notify MDT %x for %s phase1 done: "
1309                                        "rc = %d\n", lfsck_lfsck2name(lfsck),
1310                                        ltd->ltd_index, lad->lad_name, rc);
1311                                 lfsck_tgt_put(ltd);
1312                         }
1313                         spin_lock(&ltds->ltd_lock);
1314                 }
1315                 spin_unlock(&ltds->ltd_lock);
1316                 break;
1317         default:
1318                 CDEBUG(D_LFSCK, "%s: LFSCK assistant unexpected LFSCK event: "
1319                        "rc = %d\n", lfsck_lfsck2name(lfsck), lr->lr_event);
1320                 rc = -EINVAL;
1321                 break;
1322         }
1323
1324         rc1 = ptlrpc_set_wait(set);
1325         ptlrpc_set_destroy(set);
1326
1327         RETURN(rc != 0 ? rc : rc1);
1328 }
1329
1330 /**
1331  * The LFSCK assistant thread is triggered by the LFSCK main engine.
1332  * They co-work together as an asynchronous pipeline: the LFSCK main
1333  * engine scans the system and pre-fetches the objects, attributes,
1334  * or name entries, etc, and pushes them into the pipeline as input
1335  * requests for the LFSCK assistant thread; on the other end of the
1336  * pipeline, the LFSCK assistant thread performs the real check and
1337  * repair for every request from the main engine.
1338  *
1339  * Generally, the assistant engine may be blocked when check/repair
1340  * something, so the LFSCK main engine will run some faster. On the
1341  * other hand, the LFSCK main engine will drive multiple assistant
1342  * threads in parallel, means for each LFSCK component on the master
1343  * (such as layout LFSCK, namespace LFSCK), there is an independent
1344  * LFSCK assistant thread. So under such 1:N multiple asynchronous
1345  * pipelines mode, the whole LFSCK performance will be much better
1346  * than check/repair everything by the LFSCK main engine itself.
1347  */
1348 int lfsck_assistant_engine(void *args)
1349 {
1350         struct lfsck_thread_args          *lta     = args;
1351         struct lu_env                     *env     = &lta->lta_env;
1352         struct lfsck_component            *com     = lta->lta_com;
1353         struct lfsck_instance             *lfsck   = lta->lta_lfsck;
1354         struct lfsck_bookmark             *bk      = &lfsck->li_bookmark_ram;
1355         struct lfsck_position             *pos     = &com->lc_pos_start;
1356         struct lfsck_thread_info          *info    = lfsck_env_info(env);
1357         struct lfsck_request              *lr      = &info->lti_lr;
1358         struct lfsck_assistant_data       *lad     = com->lc_data;
1359         struct ptlrpc_thread              *mthread = &lfsck->li_thread;
1360         struct ptlrpc_thread              *athread = &lad->lad_thread;
1361         struct lfsck_assistant_operations *lao     = lad->lad_ops;
1362         struct lfsck_assistant_req        *lar;
1363         struct l_wait_info                 lwi     = { 0 };
1364         int                                rc      = 0;
1365         int                                rc1     = 0;
1366         ENTRY;
1367
1368         CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread start\n",
1369                lfsck_lfsck2name(lfsck), lad->lad_name);
1370
1371         memset(lr, 0, sizeof(*lr));
1372         lr->lr_event = LE_START;
1373         if (pos->lp_oit_cookie <= 1)
1374                 lr->lr_param = LPF_RESET;
1375         rc = lfsck_assistant_notify_others(env, com, lr);
1376         if (rc != 0) {
1377                 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to notify others "
1378                        "to start %s: rc = %d\n",
1379                        lfsck_lfsck2name(lfsck), lad->lad_name, rc);
1380                 GOTO(fini, rc);
1381         }
1382
1383         spin_lock(&lad->lad_lock);
1384         thread_set_flags(athread, SVC_RUNNING);
1385         spin_unlock(&lad->lad_lock);
1386         wake_up_all(&mthread->t_ctl_waitq);
1387
1388         while (1) {
1389                 while (!list_empty(&lad->lad_req_list)) {
1390                         bool wakeup = false;
1391
1392                         if (unlikely(lad->lad_exit ||
1393                                      !thread_is_running(mthread)))
1394                                 GOTO(cleanup1, rc = lad->lad_post_result);
1395
1396                         lar = list_entry(lad->lad_req_list.next,
1397                                          struct lfsck_assistant_req,
1398                                          lar_list);
1399                         /* Only the lfsck_assistant_engine thread itself can
1400                          * remove the "lar" from the head of the list, LFSCK
1401                          * engine thread only inserts other new "lar" at the
1402                          * end of the list. So it is safe to handle current
1403                          * "lar" without the spin_lock. */
1404                         rc = lao->la_handler_p1(env, com, lar);
1405                         spin_lock(&lad->lad_lock);
1406                         list_del_init(&lar->lar_list);
1407                         lad->lad_prefetched--;
1408                         /* Wake up the main engine thread only when the list
1409                          * is empty or half of the prefetched items have been
1410                          * handled to avoid too frequent thread schedule. */
1411                         if (lad->lad_prefetched == 0 ||
1412                             (bk->lb_async_windows != 0 &&
1413                              bk->lb_async_windows / 2 ==
1414                              lad->lad_prefetched))
1415                                 wakeup = true;
1416                         spin_unlock(&lad->lad_lock);
1417                         if (wakeup)
1418                                 wake_up_all(&mthread->t_ctl_waitq);
1419
1420                         lao->la_req_fini(env, lar);
1421                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1422                                 GOTO(cleanup1, rc);
1423                 }
1424
1425                 l_wait_event(athread->t_ctl_waitq,
1426                              !lfsck_assistant_req_empty(lad) ||
1427                              lad->lad_exit ||
1428                              lad->lad_to_post ||
1429                              lad->lad_to_double_scan,
1430                              &lwi);
1431
1432                 if (unlikely(lad->lad_exit))
1433                         GOTO(cleanup1, rc = lad->lad_post_result);
1434
1435                 if (!list_empty(&lad->lad_req_list))
1436                         continue;
1437
1438                 if (lad->lad_to_post) {
1439                         CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread post\n",
1440                                lfsck_lfsck2name(lfsck), lad->lad_name);
1441
1442                         if (unlikely(lad->lad_exit))
1443                                 GOTO(cleanup1, rc = lad->lad_post_result);
1444
1445                         lad->lad_to_post = 0;
1446                         LASSERT(lad->lad_post_result > 0);
1447
1448                         memset(lr, 0, sizeof(*lr));
1449                         lr->lr_event = LE_PHASE1_DONE;
1450                         lr->lr_status = lad->lad_post_result;
1451                         rc = lfsck_assistant_notify_others(env, com, lr);
1452                         if (rc != 0)
1453                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to "
1454                                        "notify others for %s post: rc = %d\n",
1455                                        lfsck_lfsck2name(lfsck),
1456                                        lad->lad_name, rc);
1457
1458                         /* Wakeup the master engine to go ahead. */
1459                         wake_up_all(&mthread->t_ctl_waitq);
1460                 }
1461
1462                 if (lad->lad_to_double_scan) {
1463                         lad->lad_to_double_scan = 0;
1464                         atomic_inc(&lfsck->li_double_scan_count);
1465                         lad->lad_in_double_scan = 1;
1466                         wake_up_all(&mthread->t_ctl_waitq);
1467
1468                         com->lc_new_checked = 0;
1469                         com->lc_new_scanned = 0;
1470                         com->lc_time_last_checkpoint = cfs_time_current();
1471                         com->lc_time_next_checkpoint =
1472                                 com->lc_time_last_checkpoint +
1473                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1474
1475                         /* Flush async updates before handling orphan. */
1476                         dt_sync(env, lfsck->li_next);
1477
1478                         CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 "
1479                                "scan start\n", lfsck_lfsck2name(lfsck));
1480
1481                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1482                                 GOTO(cleanup2, rc = 0);
1483
1484                         while (lad->lad_in_double_scan) {
1485                                 rc = lfsck_assistant_query_others(env, com);
1486                                 if (lfsck_phase2_next_ready(lad))
1487                                         goto p2_next;
1488
1489                                 if (rc < 0)
1490                                         GOTO(cleanup2, rc);
1491
1492                                 /* Pull LFSCK status on related targets once
1493                                  * per 30 seconds if we are not notified. */
1494                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
1495                                                            cfs_time_seconds(1),
1496                                                            NULL, NULL);
1497                                 rc = l_wait_event(athread->t_ctl_waitq,
1498                                         lfsck_phase2_next_ready(lad) ||
1499                                         lad->lad_exit ||
1500                                         !thread_is_running(mthread),
1501                                         &lwi);
1502
1503                                 if (unlikely(lad->lad_exit ||
1504                                              !thread_is_running(mthread)))
1505                                         GOTO(cleanup2, rc = 0);
1506
1507                                 if (rc == -ETIMEDOUT)
1508                                         continue;
1509
1510                                 if (rc < 0)
1511                                         GOTO(cleanup2, rc);
1512
1513 p2_next:
1514                                 rc = lao->la_handler_p2(env, com);
1515                                 if (rc != 0)
1516                                         GOTO(cleanup2, rc);
1517
1518                                 if (unlikely(lad->lad_exit ||
1519                                              !thread_is_running(mthread)))
1520                                         GOTO(cleanup2, rc = 0);
1521                         }
1522                 }
1523         }
1524
1525 cleanup1:
1526         /* Cleanup the unfinished requests. */
1527         spin_lock(&lad->lad_lock);
1528         if (rc < 0)
1529                 lad->lad_assistant_status = rc;
1530
1531         if (lad->lad_exit && lad->lad_post_result <= 0)
1532                 lao->la_fill_pos(env, com, &lfsck->li_pos_checkpoint);
1533
1534         while (!list_empty(&lad->lad_req_list)) {
1535                 lar = list_entry(lad->lad_req_list.next,
1536                                  struct lfsck_assistant_req,
1537                                  lar_list);
1538                 list_del_init(&lar->lar_list);
1539                 lad->lad_prefetched--;
1540                 spin_unlock(&lad->lad_lock);
1541                 lao->la_req_fini(env, lar);
1542                 spin_lock(&lad->lad_lock);
1543         }
1544         spin_unlock(&lad->lad_lock);
1545
1546         LASSERTF(lad->lad_prefetched == 0, "unmatched prefeteched objs %d\n",
1547                  lad->lad_prefetched);
1548
1549 cleanup2:
1550         memset(lr, 0, sizeof(*lr));
1551         if (rc > 0) {
1552                 lr->lr_event = LE_PHASE2_DONE;
1553                 lr->lr_status = rc;
1554         } else if (rc == 0) {
1555                 if (lfsck->li_flags & LPF_ALL_TGT) {
1556                         lr->lr_event = LE_STOP;
1557                         lr->lr_status = LS_STOPPED;
1558                 } else {
1559                         lr->lr_event = LE_PEER_EXIT;
1560                         switch (lfsck->li_status) {
1561                         case LS_PAUSED:
1562                         case LS_CO_PAUSED:
1563                                 lr->lr_status = LS_CO_PAUSED;
1564                                 break;
1565                         case LS_STOPPED:
1566                         case LS_CO_STOPPED:
1567                                 lr->lr_status = LS_CO_STOPPED;
1568                                 break;
1569                         default:
1570                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant unknown "
1571                                        "status: rc = %d\n",
1572                                        lfsck_lfsck2name(lfsck),
1573                                        lfsck->li_status);
1574                                 lr->lr_status = LS_CO_FAILED;
1575                                 break;
1576                         }
1577                 }
1578         } else {
1579                 if (lfsck->li_flags & LPF_ALL_TGT) {
1580                         lr->lr_event = LE_STOP;
1581                         lr->lr_status = LS_FAILED;
1582                 } else {
1583                         lr->lr_event = LE_PEER_EXIT;
1584                         lr->lr_status = LS_CO_FAILED;
1585                 }
1586         }
1587
1588         rc1 = lfsck_assistant_notify_others(env, com, lr);
1589         if (rc1 != 0) {
1590                 CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to notify "
1591                        "others for %s quit: rc = %d\n",
1592                        lfsck_lfsck2name(lfsck), lad->lad_name, rc1);
1593                 rc = rc1;
1594         }
1595
1596         /* Flush async updates before exit. */
1597         dt_sync(env, lfsck->li_next);
1598
1599         /* Under force exit case, some requests may be just freed without
1600          * verification, those objects should be re-handled when next run.
1601          * So not update the on-disk tracing file under such case. */
1602         if (lad->lad_in_double_scan) {
1603                 if (!lad->lad_exit)
1604                         rc1 = lao->la_double_scan_result(env, com, rc);
1605
1606                 CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 scan "
1607                        "finished: rc = %d\n",
1608                        lfsck_lfsck2name(lfsck), rc1 != 0 ? rc1 : rc);
1609         }
1610
1611 fini:
1612         if (lad->lad_in_double_scan)
1613                 atomic_dec(&lfsck->li_double_scan_count);
1614
1615         spin_lock(&lad->lad_lock);
1616         lad->lad_assistant_status = (rc1 != 0 ? rc1 : rc);
1617         thread_set_flags(athread, SVC_STOPPED);
1618         wake_up_all(&mthread->t_ctl_waitq);
1619         spin_unlock(&lad->lad_lock);
1620
1621         CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread exit: rc = %d\n",
1622                lfsck_lfsck2name(lfsck), lad->lad_name,
1623                lad->lad_assistant_status);
1624
1625         lfsck_thread_args_fini(lta);
1626
1627         return rc;
1628 }