Whamcloud - gitweb
LU-4788 lfsck: namespace LFSCK uses assistant thread
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
39
40 #include "lfsck_internal.h"
41
42 static int lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie, __u16 *type)
43 {
44         struct luda_type        *lt;
45         int                      align = sizeof(*lt) - 1;
46         int                      len;
47
48         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
49         *cookie = le64_to_cpu(ent->lde_hash);
50         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
51         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
52         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
53
54         if (unlikely(!(ent->lde_attrs & LUDA_TYPE)))
55                 return -EINVAL;
56
57         len = (ent->lde_namelen + align) & ~align;
58         lt = (struct luda_type *)(ent->lde_name + len);
59         *type = le16_to_cpu(lt->lt_type);
60
61         /* Make sure the name is terminated with '\0'. The data (object type)
62          * after ent::lde_name maybe broken, but we have stored such data in
63          * the output parameter @type as above. */
64         ent->lde_name[ent->lde_namelen] = '\0';
65
66         return 0;
67 }
68
69 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
70 {
71         const struct dt_it_ops  *iops;
72         struct dt_it            *di;
73
74         spin_lock(&lfsck->li_lock);
75         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
76         di = lfsck->li_di_oit;
77         lfsck->li_di_oit = NULL;
78         spin_unlock(&lfsck->li_lock);
79         iops->put(env, di);
80 }
81
82 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
83 {
84         const struct dt_it_ops  *iops;
85         struct dt_it            *di;
86
87         spin_lock(&lfsck->li_lock);
88         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
89         di = lfsck->li_di_dir;
90         lfsck->li_di_dir = NULL;
91         lfsck->li_cookie_dir = 0;
92         spin_unlock(&lfsck->li_lock);
93         iops->put(env, di);
94 }
95
96 static void lfsck_close_dir(const struct lu_env *env,
97                             struct lfsck_instance *lfsck)
98 {
99         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
100         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
101         struct dt_it            *dir_di   = lfsck->li_di_dir;
102
103         lfsck_di_dir_put(env, lfsck);
104         dir_iops->fini(env, dir_di);
105         lfsck->li_obj_dir = NULL;
106         lfsck_object_put(env, dir_obj);
107 }
108
109 static int lfsck_update_lma(const struct lu_env *env,
110                             struct lfsck_instance *lfsck, struct dt_object *obj)
111 {
112         struct lfsck_thread_info        *info   = lfsck_env_info(env);
113         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
114         struct dt_device                *dt     = lfsck->li_bottom;
115         struct lustre_mdt_attrs         *lma    = &info->lti_lma;
116         struct lu_buf                   *buf;
117         struct thandle                  *th;
118         int                              fl;
119         int                              rc;
120         ENTRY;
121
122         if (bk->lb_param & LPF_DRYRUN)
123                 RETURN(0);
124
125         buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
126         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
127         if (rc < 0) {
128                 if (rc != -ENODATA)
129                         RETURN(rc);
130
131                 fl = LU_XATTR_CREATE;
132                 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
133         } else {
134                 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
135                         RETURN(-EINVAL);
136
137                 fl = LU_XATTR_REPLACE;
138                 lustre_lma_swab(lma);
139                 lustre_lma_init(lma, lfsck_dto2fid(obj),
140                                 lma->lma_compat | LMAC_FID_ON_OST,
141                                 lma->lma_incompat);
142         }
143         lustre_lma_swab(lma);
144
145         th = dt_trans_create(env, dt);
146         if (IS_ERR(th))
147                 RETURN(PTR_ERR(th));
148
149         buf = lfsck_buf_get(env, lma, sizeof(*lma));
150         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
151         if (rc != 0)
152                 GOTO(stop, rc);
153
154         rc = dt_trans_start(env, dt, th);
155         if (rc != 0)
156                 GOTO(stop, rc);
157
158         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
159
160         GOTO(stop, rc);
161
162 stop:
163         dt_trans_stop(env, dt, th);
164         return rc;
165 }
166
167 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
168                             struct lu_fid *fid)
169 {
170         if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
171                      !dt_try_as_dir(env, obj)))
172                 return -ENOTDIR;
173
174         return dt_lookup(env, obj, (struct dt_rec *)fid,
175                          (const struct dt_key *)"..", BYPASS_CAPA);
176 }
177
178 static int lfsck_needs_scan_dir(const struct lu_env *env,
179                                 struct lfsck_instance *lfsck,
180                                 struct dt_object *obj)
181 {
182         struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
183         int            depth = 0;
184         int            rc;
185
186         if (list_empty(&lfsck->li_list_dir) || !S_ISDIR(lfsck_object_type(obj)))
187                 RETURN(0);
188
189         while (1) {
190                 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
191                  *      which is the agent directory to manage the objects
192                  *      which name entries reside on remote MDTs. Related
193                  *      consistency verification will be processed in LFSCK
194                  *      phase III. */
195                 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
196                         if (depth > 0)
197                                 lfsck_object_put(env, obj);
198                         return 1;
199                 }
200
201                 /* No need to check .lustre and its children. */
202                 if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj)))) {
203                         if (depth > 0)
204                                 lfsck_object_put(env, obj);
205                         return 0;
206                 }
207
208                 dt_read_lock(env, obj, MOR_TGT_CHILD);
209                 if (unlikely(lfsck_is_dead_obj(obj))) {
210                         dt_read_unlock(env, obj);
211                         if (depth > 0)
212                                 lfsck_object_put(env, obj);
213                         return 0;
214                 }
215
216                 rc = dt_xattr_get(env, obj,
217                                   lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
218                                   BYPASS_CAPA);
219                 dt_read_unlock(env, obj);
220                 if (rc >= 0) {
221                         if (depth > 0)
222                                 lfsck_object_put(env, obj);
223                         return 1;
224                 }
225
226                 if (rc < 0 && rc != -ENODATA) {
227                         if (depth > 0)
228                                 lfsck_object_put(env, obj);
229                         return rc;
230                 }
231
232                 rc = lfsck_parent_fid(env, obj, fid);
233                 if (depth > 0)
234                         lfsck_object_put(env, obj);
235                 if (rc != 0)
236                         return rc;
237
238                 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
239                         return 0;
240
241                 obj = lfsck_object_find(env, lfsck, fid);
242                 if (IS_ERR(obj))
243                         return PTR_ERR(obj);
244
245                 if (!dt_object_exists(obj)) {
246                         lfsck_object_put(env, obj);
247                         return 0;
248                 }
249
250                 if (dt_object_remote(obj)) {
251                         /* .lustre/lost+found/MDTxxx can be remote directory. */
252                         if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj))))
253                                 rc = 0;
254                         else
255                                 /* Other remote directory should be client
256                                  * visible and need to be checked. */
257                                 rc = 1;
258                         lfsck_object_put(env, obj);
259                         return rc;
260                 }
261
262                 depth++;
263         }
264         return 0;
265 }
266
267 /* LFSCK wrap functions */
268
269 static void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
270                        bool new_checked)
271 {
272         struct lfsck_component *com;
273
274         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
275                 com->lc_ops->lfsck_fail(env, com, new_checked);
276         }
277 }
278
279 static int lfsck_checkpoint(const struct lu_env *env,
280                             struct lfsck_instance *lfsck)
281 {
282         struct lfsck_component *com;
283         int                     rc  = 0;
284         int                     rc1 = 0;
285
286         if (likely(cfs_time_beforeq(cfs_time_current(),
287                                     lfsck->li_time_next_checkpoint)))
288                 return 0;
289
290         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
291         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
292                 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
293                 if (rc != 0)
294                         rc1 = rc;
295         }
296
297         lfsck->li_time_last_checkpoint = cfs_time_current();
298         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
299                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
300         return rc1 != 0 ? rc1 : rc;
301 }
302
303 static int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
304                       struct lfsck_start_param *lsp)
305 {
306         struct dt_object       *obj     = NULL;
307         struct lfsck_component *com;
308         struct lfsck_component *next;
309         struct lfsck_position  *pos     = NULL;
310         const struct dt_it_ops *iops    =
311                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
312         struct dt_it           *di;
313         int                     rc;
314         ENTRY;
315
316         LASSERT(lfsck->li_obj_dir == NULL);
317         LASSERT(lfsck->li_di_dir == NULL);
318
319         lfsck->li_current_oit_processed = 0;
320         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
321                 com->lc_new_checked = 0;
322                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
323                         com->lc_journal = 0;
324
325                 rc = com->lc_ops->lfsck_prep(env, com, lsp);
326                 if (rc != 0)
327                         GOTO(out, rc);
328
329                 if ((pos == NULL) ||
330                     (!lfsck_pos_is_zero(&com->lc_pos_start) &&
331                      lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
332                         pos = &com->lc_pos_start;
333         }
334
335         /* Init otable-based iterator. */
336         if (pos == NULL) {
337                 rc = iops->load(env, lfsck->li_di_oit, 0);
338                 if (rc > 0) {
339                         lfsck->li_oit_over = 1;
340                         rc = 0;
341                 }
342
343                 GOTO(out, rc);
344         }
345
346         rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
347         if (rc < 0)
348                 GOTO(out, rc);
349         else if (rc > 0)
350                 lfsck->li_oit_over = 1;
351
352         if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
353                 GOTO(out, rc = 0);
354
355         /* Find the directory for namespace-based traverse. */
356         obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
357         if (IS_ERR(obj))
358                 RETURN(PTR_ERR(obj));
359
360         /* XXX: Currently, skip remote object, the consistency for
361          *      remote object will be processed in LFSCK phase III. */
362         if (!dt_object_exists(obj) || dt_object_remote(obj) ||
363             unlikely(!S_ISDIR(lfsck_object_type(obj))))
364                 GOTO(out, rc = 0);
365
366         if (unlikely(!dt_try_as_dir(env, obj)))
367                 GOTO(out, rc = -ENOTDIR);
368
369         /* Init the namespace-based directory traverse. */
370         iops = &obj->do_index_ops->dio_it;
371         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
372         if (IS_ERR(di))
373                 GOTO(out, rc = PTR_ERR(di));
374
375         LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
376
377         rc = iops->load(env, di, pos->lp_dir_cookie);
378         if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
379                 rc = iops->next(env, di);
380         else if (rc > 0)
381                 rc = 0;
382
383         if (rc != 0) {
384                 iops->put(env, di);
385                 iops->fini(env, di);
386                 GOTO(out, rc);
387         }
388
389         lfsck->li_obj_dir = lfsck_object_get(obj);
390         lfsck->li_cookie_dir = iops->store(env, di);
391         spin_lock(&lfsck->li_lock);
392         lfsck->li_di_dir = di;
393         spin_unlock(&lfsck->li_lock);
394
395         GOTO(out, rc = 0);
396
397 out:
398         if (obj != NULL)
399                 lfsck_object_put(env, obj);
400
401         if (rc < 0) {
402                 list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
403                                          lc_link)
404                         com->lc_ops->lfsck_post(env, com, rc, true);
405
406                 return rc;
407         }
408
409         rc = 0;
410         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, true);
411         lfsck->li_pos_current = lfsck->li_pos_checkpoint;
412         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
413                 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
414                 if (rc != 0)
415                         break;
416         }
417
418         lfsck->li_time_last_checkpoint = cfs_time_current();
419         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
420                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
421         return rc;
422 }
423
424 static int lfsck_exec_oit(const struct lu_env *env,
425                           struct lfsck_instance *lfsck, struct dt_object *obj)
426 {
427         struct lfsck_component *com;
428         const struct dt_it_ops *iops;
429         struct dt_it           *di;
430         int                     rc;
431         ENTRY;
432
433         LASSERT(lfsck->li_obj_dir == NULL);
434
435         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
436                 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
437                 if (rc != 0)
438                         RETURN(rc);
439         }
440
441         rc = lfsck_needs_scan_dir(env, lfsck, obj);
442         if (rc <= 0)
443                 GOTO(out, rc);
444
445         if (unlikely(!dt_try_as_dir(env, obj)))
446                 GOTO(out, rc = -ENOTDIR);
447
448         iops = &obj->do_index_ops->dio_it;
449         di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
450         if (IS_ERR(di))
451                 GOTO(out, rc = PTR_ERR(di));
452
453         rc = iops->load(env, di, 0);
454         if (rc == 0)
455                 rc = iops->next(env, di);
456         else if (rc > 0)
457                 rc = 0;
458
459         if (rc != 0) {
460                 iops->put(env, di);
461                 iops->fini(env, di);
462                 GOTO(out, rc);
463         }
464
465         lfsck->li_obj_dir = lfsck_object_get(obj);
466         lfsck->li_cookie_dir = iops->store(env, di);
467         spin_lock(&lfsck->li_lock);
468         lfsck->li_di_dir = di;
469         spin_unlock(&lfsck->li_lock);
470
471         GOTO(out, rc = 0);
472
473 out:
474         if (rc < 0)
475                 lfsck_fail(env, lfsck, false);
476         return (rc > 0 ? 0 : rc);
477 }
478
479 static int lfsck_exec_dir(const struct lu_env *env,
480                           struct lfsck_instance *lfsck,
481                           struct lu_dirent *ent, __u16 type)
482 {
483         struct lfsck_component *com;
484         int                     rc;
485
486         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
487                 rc = com->lc_ops->lfsck_exec_dir(env, com, ent, type);
488                 if (rc != 0)
489                         return rc;
490         }
491         return 0;
492 }
493
494 static int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
495                       int result)
496 {
497         struct lfsck_component *com;
498         struct lfsck_component *next;
499         int                     rc  = 0;
500         int                     rc1 = 0;
501
502         lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
503         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
504                 rc = com->lc_ops->lfsck_post(env, com, result, false);
505                 if (rc != 0)
506                         rc1 = rc;
507         }
508
509         lfsck->li_time_last_checkpoint = cfs_time_current();
510         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
511                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
512
513         /* Ignore some component post failure to make other can go ahead. */
514         return result;
515 }
516
517 static int lfsck_double_scan(const struct lu_env *env,
518                              struct lfsck_instance *lfsck)
519 {
520         struct lfsck_component *com;
521         struct lfsck_component *next;
522         struct l_wait_info      lwi = { 0 };
523         int                     rc  = 0;
524         int                     rc1 = 0;
525
526         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
527                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
528                         com->lc_journal = 0;
529
530                 rc = com->lc_ops->lfsck_double_scan(env, com);
531                 if (rc != 0)
532                         rc1 = rc;
533         }
534
535         l_wait_event(lfsck->li_thread.t_ctl_waitq,
536                      atomic_read(&lfsck->li_double_scan_count) == 0,
537                      &lwi);
538
539         if (lfsck->li_status != LS_PAUSED &&
540             lfsck->li_status != LS_CO_PAUSED) {
541                 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
542                                          lc_link) {
543                         spin_lock(&lfsck->li_lock);
544                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
545                         spin_unlock(&lfsck->li_lock);
546                 }
547         }
548
549         return rc1 != 0 ? rc1 : rc;
550 }
551
552 static void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
553 {
554         struct lfsck_component *com;
555         struct lfsck_component *next;
556
557         list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
558                                  lc_link) {
559                 if (com->lc_ops->lfsck_quit != NULL)
560                         com->lc_ops->lfsck_quit(env, com);
561
562                 spin_lock(&lfsck->li_lock);
563                 list_del_init(&com->lc_link_dir);
564                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
565                 spin_unlock(&lfsck->li_lock);
566         }
567
568         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
569                                  lc_link) {
570                 if (com->lc_ops->lfsck_quit != NULL)
571                         com->lc_ops->lfsck_quit(env, com);
572
573                 spin_lock(&lfsck->li_lock);
574                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
575                 spin_unlock(&lfsck->li_lock);
576         }
577 }
578
579 /* LFSCK engines */
580
581 static int lfsck_master_dir_engine(const struct lu_env *env,
582                                    struct lfsck_instance *lfsck)
583 {
584         struct lfsck_thread_info        *info   = lfsck_env_info(env);
585         struct dt_object                *dir    = lfsck->li_obj_dir;
586         const struct dt_it_ops          *iops   = &dir->do_index_ops->dio_it;
587         struct dt_it                    *di     = lfsck->li_di_dir;
588         struct lu_dirent                *ent    =
589                         (struct lu_dirent *)info->lti_key;
590         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
591         struct ptlrpc_thread            *thread = &lfsck->li_thread;
592         int                              rc;
593         __u16                            type;
594         ENTRY;
595
596         do {
597                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
598                     cfs_fail_val > 0) {
599                         struct l_wait_info lwi;
600
601                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
602                                           NULL, NULL);
603                         l_wait_event(thread->t_ctl_waitq,
604                                      !thread_is_running(thread),
605                                      &lwi);
606                 }
607
608                 lfsck->li_new_scanned++;
609                 rc = iops->rec(env, di, (struct dt_rec *)ent,
610                                lfsck->li_args_dir);
611                 if (rc == 0)
612                         rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir,
613                                               &type);
614
615                 if (rc != 0) {
616                         CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
617                                "parent "DFID", cookie "LPX64": rc = %d\n",
618                                lfsck_lfsck2name(lfsck),
619                                PFID(lfsck_dto2fid(dir)),
620                                lfsck->li_cookie_dir, rc);
621                         lfsck_fail(env, lfsck, true);
622                         if (bk->lb_param & LPF_FAILOUT)
623                                 RETURN(rc);
624                         else
625                                 goto checkpoint;
626                 }
627
628                 if (ent->lde_attrs & LUDA_IGNORE)
629                         goto checkpoint;
630
631                 /* The type in the @ent structure may has been overwritten,
632                  * so we need to pass the @type parameter independently. */
633                 rc = lfsck_exec_dir(env, lfsck, ent, type);
634                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
635                         RETURN(rc);
636
637 checkpoint:
638                 rc = lfsck_checkpoint(env, lfsck);
639                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
640                         RETURN(rc);
641
642                 /* Rate control. */
643                 lfsck_control_speed(lfsck);
644                 if (unlikely(!thread_is_running(thread))) {
645                         CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
646                                "parent "DFID", cookie "LPX64"\n",
647                                lfsck_lfsck2name(lfsck),
648                                PFID(lfsck_dto2fid(dir)),
649                                lfsck->li_cookie_dir);
650                         RETURN(0);
651                 }
652
653                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
654                         spin_lock(&lfsck->li_lock);
655                         thread_set_flags(thread, SVC_STOPPING);
656                         spin_unlock(&lfsck->li_lock);
657                         RETURN(-EINVAL);
658                 }
659
660                 rc = iops->next(env, di);
661         } while (rc == 0);
662
663         if (rc > 0 && !lfsck->li_oit_over)
664                 lfsck_close_dir(env, lfsck);
665
666         RETURN(rc);
667 }
668
669 static int lfsck_master_oit_engine(const struct lu_env *env,
670                                    struct lfsck_instance *lfsck)
671 {
672         struct lfsck_thread_info        *info   = lfsck_env_info(env);
673         const struct dt_it_ops          *iops   =
674                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
675         struct dt_it                    *di     = lfsck->li_di_oit;
676         struct lu_fid                   *fid    = &info->lti_fid;
677         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
678         struct ptlrpc_thread            *thread = &lfsck->li_thread;
679         __u32                            idx    =
680                                 lfsck_dev_idx(lfsck->li_bottom);
681         int                              rc;
682         ENTRY;
683
684         do {
685                 struct dt_object *target;
686                 bool              update_lma = false;
687
688                 if (lfsck->li_di_dir != NULL) {
689                         rc = lfsck_master_dir_engine(env, lfsck);
690                         if (rc <= 0)
691                                 RETURN(rc);
692                 }
693
694                 if (unlikely(lfsck->li_oit_over))
695                         RETURN(1);
696
697                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
698                     cfs_fail_val > 0) {
699                         struct l_wait_info lwi;
700
701                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
702                                           NULL, NULL);
703                         l_wait_event(thread->t_ctl_waitq,
704                                      !thread_is_running(thread),
705                                      &lwi);
706                 }
707
708                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
709                         RETURN(0);
710
711                 lfsck->li_current_oit_processed = 1;
712                 lfsck->li_new_scanned++;
713                 lfsck->li_pos_current.lp_oit_cookie = iops->store(env, di);
714                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
715                 if (rc != 0) {
716                         CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
717                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
718                         lfsck_fail(env, lfsck, true);
719                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
720                                 RETURN(rc);
721                         else
722                                 goto checkpoint;
723                 }
724
725                 if (fid_is_idif(fid)) {
726                         __u32 idx1 = fid_idif_ost_idx(fid);
727
728                         LASSERT(!lfsck->li_master);
729
730                         /* It is an old format device, update the LMA. */
731                         if (idx != idx1) {
732                                 struct ost_id *oi = &info->lti_oi;
733
734                                 fid_to_ostid(fid, oi);
735                                 ostid_to_fid(fid, oi, idx);
736                                 update_lma = true;
737                         }
738                 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
739                            !fid_is_last_id(fid) && !fid_is_root(fid) &&
740                            !fid_seq_is_dot(fid_seq(fid))) {
741                         /* If the FID/object is only used locally and invisible
742                          * to external nodes, then LFSCK will not handle it. */
743                         goto checkpoint;
744                 }
745
746                 target = lfsck_object_find(env, lfsck, fid);
747                 if (IS_ERR(target)) {
748                         CDEBUG(D_LFSCK, "%s: OIT scan failed at find target "
749                                DFID", cookie "LPU64": rc = %d\n",
750                                lfsck_lfsck2name(lfsck), PFID(fid),
751                                iops->store(env, di), rc);
752                         lfsck_fail(env, lfsck, true);
753                         if (bk->lb_param & LPF_FAILOUT)
754                                 RETURN(PTR_ERR(target));
755                         else
756                                 goto checkpoint;
757                 }
758
759                 /* XXX: Currently, skip remote object, the consistency for
760                  *      remote object will be processed in LFSCK phase III. */
761                 if (dt_object_exists(target) && !dt_object_remote(target)) {
762                         if (update_lma) {
763                                 rc = lfsck_update_lma(env, lfsck, target);
764                                 if (rc != 0)
765                                         CDEBUG(D_LFSCK, "%s: fail to update "
766                                                "LMA for "DFID": rc = %d\n",
767                                                lfsck_lfsck2name(lfsck),
768                                                PFID(lfsck_dto2fid(target)), rc);
769                         }
770                         if (rc == 0)
771                                 rc = lfsck_exec_oit(env, lfsck, target);
772                 }
773                 lfsck_object_put(env, target);
774                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
775                         RETURN(rc);
776
777 checkpoint:
778                 rc = lfsck_checkpoint(env, lfsck);
779                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
780                         RETURN(rc);
781
782                 /* Rate control. */
783                 lfsck_control_speed(lfsck);
784
785                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
786                         spin_lock(&lfsck->li_lock);
787                         thread_set_flags(thread, SVC_STOPPING);
788                         spin_unlock(&lfsck->li_lock);
789                         RETURN(-EINVAL);
790                 }
791
792                 rc = iops->next(env, di);
793                 if (unlikely(rc > 0))
794                         lfsck->li_oit_over = 1;
795                 else if (likely(rc == 0))
796                         lfsck->li_current_oit_processed = 0;
797
798                 if (unlikely(!thread_is_running(thread))) {
799                         CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
800                                "cookie "LPU64"\n", lfsck_lfsck2name(lfsck),
801                                iops->store(env, di));
802                         RETURN(0);
803                 }
804         } while (rc == 0 || lfsck->li_di_dir != NULL);
805
806         RETURN(rc);
807 }
808
809 int lfsck_master_engine(void *args)
810 {
811         struct lfsck_thread_args *lta      = args;
812         struct lu_env            *env      = &lta->lta_env;
813         struct lfsck_instance    *lfsck    = lta->lta_lfsck;
814         struct ptlrpc_thread     *thread   = &lfsck->li_thread;
815         struct dt_object         *oit_obj  = lfsck->li_obj_oit;
816         const struct dt_it_ops   *oit_iops = &oit_obj->do_index_ops->dio_it;
817         struct dt_it             *oit_di;
818         struct l_wait_info        lwi      = { 0 };
819         int                       rc;
820         ENTRY;
821
822         if (lfsck->li_master &&
823             (!list_empty(&lfsck->li_list_scan) ||
824              !list_empty(&lfsck->li_list_double_scan))) {
825                 rc = lfsck_verify_lpf(env, lfsck);
826                 /* Fail to verify the .lustre/lost+found/MDTxxxx/ may be not
827                  * fatal, because the .lustre/lost+found/ maybe not accessed
828                  * by the LFSCK if it does not add orphans or others to such
829                  * directory. So go ahead until hit failure when really uses
830                  * the directory. */
831                 if (rc != 0)
832                         CDEBUG(D_LFSCK, "%s: master engine fail to verify the "
833                                ".lustre/lost+found/, go ahead: rc = %d\n",
834                                lfsck_lfsck2name(lfsck), rc);
835         }
836
837         oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
838         if (IS_ERR(oit_di)) {
839                 rc = PTR_ERR(oit_di);
840                 CDEBUG(D_LFSCK, "%s: master engine fail to init iteration: "
841                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
842
843                 GOTO(fini_args, rc);
844         }
845
846         spin_lock(&lfsck->li_lock);
847         lfsck->li_di_oit = oit_di;
848         spin_unlock(&lfsck->li_lock);
849         rc = lfsck_prep(env, lfsck, lta->lta_lsp);
850         if (rc != 0)
851                 GOTO(fini_oit, rc);
852
853         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
854                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
855                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
856                lfsck->li_pos_checkpoint.lp_oit_cookie,
857                lfsck->li_pos_checkpoint.lp_dir_cookie,
858                PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
859                current_pid());
860
861         spin_lock(&lfsck->li_lock);
862         thread_set_flags(thread, SVC_RUNNING);
863         spin_unlock(&lfsck->li_lock);
864         wake_up_all(&thread->t_ctl_waitq);
865
866         l_wait_event(thread->t_ctl_waitq,
867                      lfsck->li_start_unplug ||
868                      !thread_is_running(thread),
869                      &lwi);
870         if (!thread_is_running(thread))
871                 GOTO(fini_oit, rc = 0);
872
873         if (!list_empty(&lfsck->li_list_scan) ||
874             list_empty(&lfsck->li_list_double_scan))
875                 rc = lfsck_master_oit_engine(env, lfsck);
876         else
877                 rc = 1;
878
879         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
880                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
881                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
882                lfsck->li_pos_checkpoint.lp_oit_cookie,
883                lfsck->li_pos_checkpoint.lp_dir_cookie,
884                PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
885                current_pid(), rc);
886
887         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
888                 rc = lfsck_post(env, lfsck, rc);
889
890         if (lfsck->li_di_dir != NULL)
891                 lfsck_close_dir(env, lfsck);
892
893 fini_oit:
894         lfsck_di_oit_put(env, lfsck);
895         oit_iops->fini(env, oit_di);
896         if (rc == 1) {
897                 if (!list_empty(&lfsck->li_list_double_scan))
898                         rc = lfsck_double_scan(env, lfsck);
899                 else
900                         rc = 0;
901         } else {
902                 lfsck_quit(env, lfsck);
903         }
904
905         /* XXX: Purge the pinned objects in the future. */
906
907 fini_args:
908         spin_lock(&lfsck->li_lock);
909         thread_set_flags(thread, SVC_STOPPED);
910         spin_unlock(&lfsck->li_lock);
911         wake_up_all(&thread->t_ctl_waitq);
912         lfsck_thread_args_fini(lta);
913         return rc;
914 }
915
916 static inline bool lfsck_assistant_req_empty(struct lfsck_assistant_data *lad)
917 {
918         bool empty = false;
919
920         spin_lock(&lad->lad_lock);
921         if (list_empty(&lad->lad_req_list))
922                 empty = true;
923         spin_unlock(&lad->lad_lock);
924
925         return empty;
926 }
927
928 /**
929  * Query the LFSCK status from the instatnces on remote servers.
930  *
931  * The LFSCK assistant thread queries the LFSCK instances on other
932  * servers (MDT/OST) about their status, such as whether they have
933  * finished the phase1/phase2 scanning or not, and so on.
934  *
935  * \param[in] env       pointer to the thread context
936  * \param[in] com       pointer to the lfsck component
937  *
938  * \retval              0 for success
939  * \retval              negative error number on failure
940  */
941 static int lfsck_assistant_query_others(const struct lu_env *env,
942                                         struct lfsck_component *com)
943 {
944         struct lfsck_thread_info          *info  = lfsck_env_info(env);
945         struct lfsck_request              *lr    = &info->lti_lr;
946         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
947         struct lfsck_instance             *lfsck = com->lc_lfsck;
948         struct lfsck_assistant_data       *lad   = com->lc_data;
949         struct ptlrpc_request_set         *set;
950         struct lfsck_tgt_descs            *ltds;
951         struct lfsck_tgt_desc             *ltd;
952         struct list_head                  *phase_head;
953         int                                rc    = 0;
954         int                                rc1   = 0;
955         ENTRY;
956
957         set = ptlrpc_prep_set();
958         if (set == NULL)
959                 RETURN(-ENOMEM);
960
961         lad->lad_touch_gen++;
962         memset(lr, 0, sizeof(*lr));
963         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
964         lr->lr_event = LE_QUERY;
965         lr->lr_active = com->lc_type;
966         laia->laia_com = com;
967         laia->laia_lr = lr;
968         laia->laia_shared = 0;
969
970         if (!list_empty(&lad->lad_mdt_phase1_list)) {
971                 ltds = &lfsck->li_mdt_descs;
972                 lr->lr_flags = 0;
973                 phase_head = &lad->lad_mdt_phase1_list;
974         } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
975                 goto out;
976         } else {
977
978 again:
979                 ltds = &lfsck->li_ost_descs;
980                 lr->lr_flags = LEF_TO_OST;
981                 phase_head = &lad->lad_ost_phase1_list;
982         }
983
984         laia->laia_ltds = ltds;
985         spin_lock(&ltds->ltd_lock);
986         while (!list_empty(phase_head)) {
987                 struct list_head *phase_list;
988                 __u32            *gen;
989
990                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
991                         ltd = list_entry(phase_head->next,
992                                          struct lfsck_tgt_desc,
993                                          ltd_layout_phase_list);
994                         phase_list = &ltd->ltd_layout_phase_list;
995                         gen = &ltd->ltd_layout_gen;
996                 } else {
997                         ltd = list_entry(phase_head->next,
998                                          struct lfsck_tgt_desc,
999                                          ltd_namespace_phase_list);
1000                         phase_list = &ltd->ltd_namespace_phase_list;
1001                         gen = &ltd->ltd_namespace_gen;
1002                 }
1003
1004                 if (*gen == lad->lad_touch_gen)
1005                         break;
1006
1007                 *gen = lad->lad_touch_gen;
1008                 list_move_tail(phase_list, phase_head);
1009                 atomic_inc(&ltd->ltd_ref);
1010                 laia->laia_ltd = ltd;
1011                 spin_unlock(&ltds->ltd_lock);
1012                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1013                                          lfsck_async_interpret_common,
1014                                          laia, LFSCK_QUERY);
1015                 if (rc != 0) {
1016                         CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to query "
1017                                "%s %x for %s: rc = %d\n",
1018                                lfsck_lfsck2name(lfsck),
1019                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1020                                ltd->ltd_index, lad->lad_name, rc);
1021                         lfsck_tgt_put(ltd);
1022                         rc1 = rc;
1023                 }
1024                 spin_lock(&ltds->ltd_lock);
1025         }
1026         spin_unlock(&ltds->ltd_lock);
1027
1028         rc = ptlrpc_set_wait(set);
1029         if (rc < 0) {
1030                 ptlrpc_set_destroy(set);
1031                 RETURN(rc);
1032         }
1033
1034         if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST) &&
1035             list_empty(&lad->lad_mdt_phase1_list))
1036                 goto again;
1037
1038 out:
1039         ptlrpc_set_destroy(set);
1040
1041         RETURN(rc1 != 0 ? rc1 : rc);
1042 }
1043
1044 /**
1045  * Notify the LFSCK event to the instatnces on remote servers.
1046  *
1047  * The LFSCK assistant thread notifies the LFSCK instances on other
1048  * servers (MDT/OST) about some events, such as start new scanning,
1049  * stop the scanning, this LFSCK instance will exit, and so on.
1050  *
1051  * \param[in] env       pointer to the thread context
1052  * \param[in] com       pointer to the lfsck component
1053  * \param[in] lr        pointer to the LFSCK event request
1054  *
1055  * \retval              0 for success
1056  * \retval              negative error number on failure
1057  */
1058 static int lfsck_assistant_notify_others(const struct lu_env *env,
1059                                          struct lfsck_component *com,
1060                                          struct lfsck_request *lr)
1061 {
1062         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1063         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1064         struct lfsck_instance             *lfsck = com->lc_lfsck;
1065         struct lfsck_assistant_data       *lad   = com->lc_data;
1066         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1067         struct ptlrpc_request_set         *set;
1068         struct lfsck_tgt_descs            *ltds;
1069         struct lfsck_tgt_desc             *ltd;
1070         struct lfsck_tgt_desc             *next;
1071         __u32                              idx;
1072         int                                rc    = 0;
1073         int                                rc1   = 0;
1074         ENTRY;
1075
1076         set = ptlrpc_prep_set();
1077         if (set == NULL)
1078                 RETURN(-ENOMEM);
1079
1080         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1081         lr->lr_active = com->lc_type;
1082         laia->laia_com = com;
1083         laia->laia_lr = lr;
1084         laia->laia_shared = 0;
1085
1086         switch (lr->lr_event) {
1087         case LE_START:
1088                 if (com->lc_type != LFSCK_TYPE_LAYOUT)
1089                         goto next;
1090
1091                 lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1092                                LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
1093                 lr->lr_speed = bk->lb_speed_limit;
1094                 lr->lr_version = bk->lb_version;
1095                 lr->lr_param |= bk->lb_param;
1096                 lr->lr_async_windows = bk->lb_async_windows;
1097                 lr->lr_flags = LEF_TO_OST;
1098
1099                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1100                 ltds = &lfsck->li_ost_descs;
1101                 laia->laia_ltds = ltds;
1102                 down_read(&ltds->ltd_rw_sem);
1103                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1104                         ltd = lfsck_tgt_get(ltds, idx);
1105                         LASSERT(ltd != NULL);
1106
1107                         laia->laia_ltd = ltd;
1108                         ltd->ltd_layout_done = 0;
1109                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1110                                         lfsck_async_interpret_common,
1111                                         laia, LFSCK_NOTIFY);
1112                         if (rc != 0) {
1113                                 struct lfsck_layout *lo = com->lc_file_ram;
1114
1115                                 lo->ll_flags |= LF_INCOMPLETE;
1116                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1117                                        "notify OST %x for %s start: rc = %d\n",
1118                                        lfsck_lfsck2name(lfsck), idx,
1119                                        lad->lad_name, rc);
1120                                 lfsck_tgt_put(ltd);
1121                         }
1122                 }
1123                 up_read(&ltds->ltd_rw_sem);
1124
1125                 /* Sync up */
1126                 rc = ptlrpc_set_wait(set);
1127                 if (rc < 0) {
1128                         ptlrpc_set_destroy(set);
1129                         RETURN(rc);
1130                 }
1131
1132 next:
1133                 if (!(bk->lb_param & LPF_ALL_TGT))
1134                         break;
1135
1136                 /* link other MDT targets locallly. */
1137                 ltds = &lfsck->li_mdt_descs;
1138                 spin_lock(&ltds->ltd_lock);
1139                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1140                         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1141                                 ltd = LTD_TGT(ltds, idx);
1142                                 LASSERT(ltd != NULL);
1143
1144                                 if (!list_empty(&ltd->ltd_layout_list))
1145                                         continue;
1146
1147                                 list_add_tail(&ltd->ltd_layout_list,
1148                                               &lad->lad_mdt_list);
1149                                 list_add_tail(&ltd->ltd_layout_phase_list,
1150                                               &lad->lad_mdt_phase1_list);
1151                         }
1152                 } else {
1153                         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1154                                 ltd = LTD_TGT(ltds, idx);
1155                                 LASSERT(ltd != NULL);
1156
1157                                 if (!list_empty(&ltd->ltd_namespace_list))
1158                                         continue;
1159
1160                                 list_add_tail(&ltd->ltd_namespace_list,
1161                                               &lad->lad_mdt_list);
1162                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1163                                               &lad->lad_mdt_phase1_list);
1164                         }
1165                 }
1166                 spin_unlock(&ltds->ltd_lock);
1167                 break;
1168         case LE_STOP:
1169         case LE_PHASE2_DONE:
1170         case LE_PEER_EXIT: {
1171                 struct list_head *phase_head;
1172
1173                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1174                 if (bk->lb_param & LPF_ALL_TGT) {
1175                         phase_head = &lad->lad_mdt_list;
1176                         ltds = &lfsck->li_mdt_descs;
1177                         if (lr->lr_event == LE_STOP) {
1178                                 /* unlink other MDT targets locallly. */
1179                                 spin_lock(&ltds->ltd_lock);
1180                                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1181                                         list_for_each_entry_safe(ltd, next,
1182                                                 phase_head, ltd_layout_list) {
1183                                                 list_del_init(
1184                                                 &ltd->ltd_layout_phase_list);
1185                                                 list_del_init(
1186                                                 &ltd->ltd_layout_list);
1187                                         }
1188                                 } else {
1189                                         list_for_each_entry_safe(ltd, next,
1190                                                         phase_head,
1191                                                         ltd_namespace_list) {
1192                                                 list_del_init(
1193                                                 &ltd->ltd_namespace_phase_list);
1194                                                 list_del_init(
1195                                                 &ltd->ltd_namespace_list);
1196                                         }
1197                                 }
1198                                 spin_unlock(&ltds->ltd_lock);
1199
1200                                 if (com->lc_type != LFSCK_TYPE_LAYOUT)
1201                                         break;
1202
1203                                 lr->lr_flags |= LEF_TO_OST;
1204                                 phase_head = &lad->lad_ost_list;
1205                                 ltds = &lfsck->li_ost_descs;
1206                         } else {
1207                                 lr->lr_flags &= ~LEF_TO_OST;
1208                         }
1209                 } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
1210                         break;
1211                 } else {
1212                         lr->lr_flags |= LEF_TO_OST;
1213                         phase_head = &lad->lad_ost_list;
1214                         ltds = &lfsck->li_ost_descs;
1215                 }
1216
1217 again:
1218                 laia->laia_ltds = ltds;
1219                 spin_lock(&ltds->ltd_lock);
1220                 while (!list_empty(phase_head)) {
1221                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1222                                 ltd = list_entry(phase_head->next,
1223                                                  struct lfsck_tgt_desc,
1224                                                  ltd_layout_list);
1225                                 if (!list_empty(&ltd->ltd_layout_phase_list))
1226                                         list_del_init(
1227                                                 &ltd->ltd_layout_phase_list);
1228                                 list_del_init(&ltd->ltd_layout_list);
1229                         } else {
1230                                 ltd = list_entry(phase_head->next,
1231                                                  struct lfsck_tgt_desc,
1232                                                  ltd_namespace_list);
1233                                 if (!list_empty(&ltd->ltd_namespace_phase_list))
1234                                         list_del_init(
1235                                                 &ltd->ltd_namespace_phase_list);
1236                                 list_del_init(&ltd->ltd_namespace_list);
1237                         }
1238                         atomic_inc(&ltd->ltd_ref);
1239                         laia->laia_ltd = ltd;
1240                         spin_unlock(&ltds->ltd_lock);
1241                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1242                                         lfsck_async_interpret_common,
1243                                         laia, LFSCK_NOTIFY);
1244                         if (rc != 0) {
1245                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1246                                        "notify %s %x for %s stop/phase2_done/"
1247                                        "peer_exit: rc = %d\n",
1248                                        lfsck_lfsck2name(lfsck),
1249                                        (lr->lr_flags & LEF_TO_OST) ?
1250                                        "OST" : "MDT", ltd->ltd_index,
1251                                        lad->lad_name, rc);
1252                                 lfsck_tgt_put(ltd);
1253                         }
1254                         spin_lock(&ltds->ltd_lock);
1255                 }
1256                 spin_unlock(&ltds->ltd_lock);
1257
1258                 rc = ptlrpc_set_wait(set);
1259                 if (rc < 0) {
1260                         ptlrpc_set_destroy(set);
1261                         RETURN(rc);
1262                 }
1263
1264                 if (com->lc_type == LFSCK_TYPE_LAYOUT &&
1265                     !(lr->lr_flags & LEF_TO_OST)) {
1266                         lr->lr_flags |= LEF_TO_OST;
1267                         phase_head = &lad->lad_ost_list;
1268                         ltds = &lfsck->li_ost_descs;
1269                         goto again;
1270                 }
1271                 break;
1272         }
1273         case LE_PHASE1_DONE:
1274                 lad->lad_touch_gen++;
1275                 ltds = &lfsck->li_mdt_descs;
1276                 laia->laia_ltds = ltds;
1277                 spin_lock(&ltds->ltd_lock);
1278                 while (!list_empty(&lad->lad_mdt_list)) {
1279                         struct list_head *list;
1280                         __u32            *gen;
1281
1282                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1283                                 ltd = list_entry(lad->lad_mdt_list.next,
1284                                                  struct lfsck_tgt_desc,
1285                                                  ltd_layout_list);
1286                                 list = &ltd->ltd_layout_list;
1287                                 gen = &ltd->ltd_layout_gen;
1288                         } else {
1289                                 ltd = list_entry(lad->lad_mdt_list.next,
1290                                                  struct lfsck_tgt_desc,
1291                                                  ltd_namespace_list);
1292                                 list = &ltd->ltd_namespace_list;
1293                                 gen = &ltd->ltd_namespace_gen;
1294                         }
1295
1296                         if (*gen == lad->lad_touch_gen)
1297                                 break;
1298
1299                         *gen = lad->lad_touch_gen;
1300                         list_move_tail(list, &lad->lad_mdt_list);
1301                         atomic_inc(&ltd->ltd_ref);
1302                         laia->laia_ltd = ltd;
1303                         spin_unlock(&ltds->ltd_lock);
1304                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1305                                         lfsck_async_interpret_common,
1306                                         laia, LFSCK_NOTIFY);
1307                         if (rc != 0) {
1308                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1309                                        "notify MDT %x for %s phase1 done: "
1310                                        "rc = %d\n", lfsck_lfsck2name(lfsck),
1311                                        ltd->ltd_index, lad->lad_name, rc);
1312                                 lfsck_tgt_put(ltd);
1313                         }
1314                         spin_lock(&ltds->ltd_lock);
1315                 }
1316                 spin_unlock(&ltds->ltd_lock);
1317                 break;
1318         default:
1319                 CDEBUG(D_LFSCK, "%s: LFSCK assistant unexpected LFSCK event: "
1320                        "rc = %d\n", lfsck_lfsck2name(lfsck), lr->lr_event);
1321                 rc = -EINVAL;
1322                 break;
1323         }
1324
1325         rc1 = ptlrpc_set_wait(set);
1326         ptlrpc_set_destroy(set);
1327
1328         RETURN(rc != 0 ? rc : rc1);
1329 }
1330
1331 /**
1332  * The LFSCK assistant thread is triggered by the LFSCK main engine.
1333  * They co-work together as an asynchronous pipeline: the LFSCK main
1334  * engine scans the system and pre-fetches the objects, attributes,
1335  * or name entries, etc, and pushes them into the pipeline as input
1336  * requests for the LFSCK assistant thread; on the other end of the
1337  * pipeline, the LFSCK assistant thread performs the real check and
1338  * repair for every request from the main engine.
1339  *
1340  * Generally, the assistant engine may be blocked when check/repair
1341  * something, so the LFSCK main engine will run some faster. On the
1342  * other hand, the LFSCK main engine will drive multiple assistant
1343  * threads in parallel, means for each LFSCK component on the master
1344  * (such as layout LFSCK, namespace LFSCK), there is an independent
1345  * LFSCK assistant thread. So under such 1:N multiple asynchronous
1346  * pipelines mode, the whole LFSCK performance will be much better
1347  * than check/repair everything by the LFSCK main engine itself.
1348  */
1349 int lfsck_assistant_engine(void *args)
1350 {
1351         struct lfsck_thread_args          *lta     = args;
1352         struct lu_env                     *env     = &lta->lta_env;
1353         struct lfsck_component            *com     = lta->lta_com;
1354         struct lfsck_instance             *lfsck   = lta->lta_lfsck;
1355         struct lfsck_bookmark             *bk      = &lfsck->li_bookmark_ram;
1356         struct lfsck_position             *pos     = &com->lc_pos_start;
1357         struct lfsck_thread_info          *info    = lfsck_env_info(env);
1358         struct lfsck_request              *lr      = &info->lti_lr;
1359         struct lfsck_assistant_data       *lad     = com->lc_data;
1360         struct ptlrpc_thread              *mthread = &lfsck->li_thread;
1361         struct ptlrpc_thread              *athread = &lad->lad_thread;
1362         struct lfsck_assistant_operations *lao     = lad->lad_ops;
1363         struct lfsck_assistant_req        *lar;
1364         struct l_wait_info                 lwi     = { 0 };
1365         int                                rc      = 0;
1366         int                                rc1     = 0;
1367         ENTRY;
1368
1369         CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread start\n",
1370                lfsck_lfsck2name(lfsck), lad->lad_name);
1371
1372         memset(lr, 0, sizeof(*lr));
1373         lr->lr_event = LE_START;
1374         if (pos->lp_oit_cookie <= 1)
1375                 lr->lr_param = LPF_RESET;
1376         rc = lfsck_assistant_notify_others(env, com, lr);
1377         if (rc != 0) {
1378                 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to notify others "
1379                        "to start %s: rc = %d\n",
1380                        lfsck_lfsck2name(lfsck), lad->lad_name, rc);
1381                 GOTO(fini, rc);
1382         }
1383
1384         spin_lock(&lad->lad_lock);
1385         thread_set_flags(athread, SVC_RUNNING);
1386         spin_unlock(&lad->lad_lock);
1387         wake_up_all(&mthread->t_ctl_waitq);
1388
1389         while (1) {
1390                 while (!list_empty(&lad->lad_req_list)) {
1391                         bool wakeup = false;
1392
1393                         if (unlikely(lad->lad_exit ||
1394                                      !thread_is_running(mthread)))
1395                                 GOTO(cleanup1, rc = lad->lad_post_result);
1396
1397                         lar = list_entry(lad->lad_req_list.next,
1398                                          struct lfsck_assistant_req,
1399                                          lar_list);
1400                         /* Only the lfsck_assistant_engine thread itself can
1401                          * remove the "lar" from the head of the list, LFSCK
1402                          * engine thread only inserts other new "lar" at the
1403                          * end of the list. So it is safe to handle current
1404                          * "lar" without the spin_lock. */
1405                         rc = lao->la_handler_p1(env, com, lar);
1406                         spin_lock(&lad->lad_lock);
1407                         list_del_init(&lar->lar_list);
1408                         lad->lad_prefetched--;
1409                         /* Wake up the main engine thread only when the list
1410                          * is empty or half of the prefetched items have been
1411                          * handled to avoid too frequent thread schedule. */
1412                         if (lad->lad_prefetched == 0 ||
1413                             (bk->lb_async_windows != 0 &&
1414                              bk->lb_async_windows / 2 ==
1415                              lad->lad_prefetched))
1416                                 wakeup = true;
1417                         spin_unlock(&lad->lad_lock);
1418                         if (wakeup)
1419                                 wake_up_all(&mthread->t_ctl_waitq);
1420
1421                         lao->la_req_fini(env, lar);
1422                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1423                                 GOTO(cleanup1, rc);
1424                 }
1425
1426                 l_wait_event(athread->t_ctl_waitq,
1427                              !lfsck_assistant_req_empty(lad) ||
1428                              lad->lad_exit ||
1429                              lad->lad_to_post ||
1430                              lad->lad_to_double_scan,
1431                              &lwi);
1432
1433                 if (unlikely(lad->lad_exit))
1434                         GOTO(cleanup1, rc = lad->lad_post_result);
1435
1436                 if (!list_empty(&lad->lad_req_list))
1437                         continue;
1438
1439                 if (lad->lad_to_post) {
1440                         CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread post\n",
1441                                lfsck_lfsck2name(lfsck), lad->lad_name);
1442
1443                         if (unlikely(lad->lad_exit))
1444                                 GOTO(cleanup1, rc = lad->lad_post_result);
1445
1446                         lad->lad_to_post = 0;
1447                         LASSERT(lad->lad_post_result > 0);
1448
1449                         memset(lr, 0, sizeof(*lr));
1450                         lr->lr_event = LE_PHASE1_DONE;
1451                         lr->lr_status = lad->lad_post_result;
1452                         rc = lfsck_assistant_notify_others(env, com, lr);
1453                         if (rc != 0)
1454                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to "
1455                                        "notify others for %s post: rc = %d\n",
1456                                        lfsck_lfsck2name(lfsck),
1457                                        lad->lad_name, rc);
1458
1459                         /* Wakeup the master engine to go ahead. */
1460                         wake_up_all(&mthread->t_ctl_waitq);
1461                 }
1462
1463                 if (lad->lad_to_double_scan) {
1464                         lad->lad_to_double_scan = 0;
1465                         atomic_inc(&lfsck->li_double_scan_count);
1466                         lad->lad_in_double_scan = 1;
1467                         wake_up_all(&mthread->t_ctl_waitq);
1468
1469                         com->lc_new_checked = 0;
1470                         com->lc_new_scanned = 0;
1471                         com->lc_time_last_checkpoint = cfs_time_current();
1472                         com->lc_time_next_checkpoint =
1473                                 com->lc_time_last_checkpoint +
1474                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1475
1476                         /* Flush async updates before handling orphan. */
1477                         dt_sync(env, lfsck->li_next);
1478
1479                         CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 "
1480                                "scan start\n", lfsck_lfsck2name(lfsck));
1481
1482                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1483                                 GOTO(cleanup2, rc = 0);
1484
1485                         while (lad->lad_in_double_scan) {
1486                                 rc = lfsck_assistant_query_others(env, com);
1487                                 if (lfsck_phase2_next_ready(lad))
1488                                         goto p2_next;
1489
1490                                 if (rc < 0)
1491                                         GOTO(cleanup2, rc);
1492
1493                                 /* Pull LFSCK status on related targets once
1494                                  * per 30 seconds if we are not notified. */
1495                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
1496                                                            cfs_time_seconds(1),
1497                                                            NULL, NULL);
1498                                 rc = l_wait_event(athread->t_ctl_waitq,
1499                                         lfsck_phase2_next_ready(lad) ||
1500                                         lad->lad_exit ||
1501                                         !thread_is_running(mthread),
1502                                         &lwi);
1503
1504                                 if (unlikely(lad->lad_exit ||
1505                                              !thread_is_running(mthread)))
1506                                         GOTO(cleanup2, rc = 0);
1507
1508                                 if (rc == -ETIMEDOUT)
1509                                         continue;
1510
1511                                 if (rc < 0)
1512                                         GOTO(cleanup2, rc);
1513
1514 p2_next:
1515                                 rc = lao->la_handler_p2(env, com);
1516                                 if (rc != 0)
1517                                         GOTO(cleanup2, rc);
1518
1519                                 if (unlikely(lad->lad_exit ||
1520                                              !thread_is_running(mthread)))
1521                                         GOTO(cleanup2, rc = 0);
1522                         }
1523                 }
1524         }
1525
1526 cleanup1:
1527         /* Cleanup the unfinished requests. */
1528         spin_lock(&lad->lad_lock);
1529         if (rc < 0)
1530                 lad->lad_assistant_status = rc;
1531
1532         if (lad->lad_exit && lad->lad_post_result <= 0)
1533                 lao->la_fill_pos(env, com, &lfsck->li_pos_checkpoint);
1534
1535         while (!list_empty(&lad->lad_req_list)) {
1536                 lar = list_entry(lad->lad_req_list.next,
1537                                  struct lfsck_assistant_req,
1538                                  lar_list);
1539                 list_del_init(&lar->lar_list);
1540                 lad->lad_prefetched--;
1541                 spin_unlock(&lad->lad_lock);
1542                 lao->la_req_fini(env, lar);
1543                 spin_lock(&lad->lad_lock);
1544         }
1545         spin_unlock(&lad->lad_lock);
1546
1547         LASSERTF(lad->lad_prefetched == 0, "unmatched prefeteched objs %d\n",
1548                  lad->lad_prefetched);
1549
1550 cleanup2:
1551         memset(lr, 0, sizeof(*lr));
1552         if (rc > 0) {
1553                 lr->lr_event = LE_PHASE2_DONE;
1554                 lr->lr_status = rc;
1555         } else if (rc == 0) {
1556                 if (lfsck->li_flags & LPF_ALL_TGT) {
1557                         lr->lr_event = LE_STOP;
1558                         lr->lr_status = LS_STOPPED;
1559                 } else {
1560                         lr->lr_event = LE_PEER_EXIT;
1561                         switch (lfsck->li_status) {
1562                         case LS_PAUSED:
1563                         case LS_CO_PAUSED:
1564                                 lr->lr_status = LS_CO_PAUSED;
1565                                 break;
1566                         case LS_STOPPED:
1567                         case LS_CO_STOPPED:
1568                                 lr->lr_status = LS_CO_STOPPED;
1569                                 break;
1570                         default:
1571                                 CDEBUG(D_LFSCK, "%s: LFSCK assistant unknown "
1572                                        "status: rc = %d\n",
1573                                        lfsck_lfsck2name(lfsck),
1574                                        lfsck->li_status);
1575                                 lr->lr_status = LS_CO_FAILED;
1576                                 break;
1577                         }
1578                 }
1579         } else {
1580                 if (lfsck->li_flags & LPF_ALL_TGT) {
1581                         lr->lr_event = LE_STOP;
1582                         lr->lr_status = LS_FAILED;
1583                 } else {
1584                         lr->lr_event = LE_PEER_EXIT;
1585                         lr->lr_status = LS_CO_FAILED;
1586                 }
1587         }
1588
1589         rc1 = lfsck_assistant_notify_others(env, com, lr);
1590         if (rc1 != 0) {
1591                 CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to notify "
1592                        "others for %s quit: rc = %d\n",
1593                        lfsck_lfsck2name(lfsck), lad->lad_name, rc1);
1594                 rc = rc1;
1595         }
1596
1597         /* Flush async updates before exit. */
1598         dt_sync(env, lfsck->li_next);
1599
1600         /* Under force exit case, some requests may be just freed without
1601          * verification, those objects should be re-handled when next run.
1602          * So not update the on-disk tracing file under such case. */
1603         if (lad->lad_in_double_scan) {
1604                 if (!lad->lad_exit)
1605                         rc1 = lao->la_double_scan_result(env, com, rc);
1606
1607                 CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 scan "
1608                        "finished: rc = %d\n",
1609                        lfsck_lfsck2name(lfsck), rc1 != 0 ? rc1 : rc);
1610         }
1611
1612 fini:
1613         if (lad->lad_in_double_scan)
1614                 atomic_dec(&lfsck->li_double_scan_count);
1615
1616         spin_lock(&lad->lad_lock);
1617         lad->lad_assistant_status = (rc1 != 0 ? rc1 : rc);
1618         thread_set_flags(athread, SVC_STOPPED);
1619         wake_up_all(&mthread->t_ctl_waitq);
1620         spin_unlock(&lad->lad_lock);
1621
1622         CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread exit: rc = %d\n",
1623                lfsck_lfsck2name(lfsck), lad->lad_name,
1624                lad->lad_assistant_status);
1625
1626         lfsck_thread_args_fini(lta);
1627
1628         return rc;
1629 }