Whamcloud - gitweb
LU-4970 tests: wait async LFSCK updates to be done
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 static const char lfsck_namespace_name[] = "lfsck_namespace";
47
48 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
49                                       struct lfsck_namespace *src)
50 {
51         dst->ln_magic = le32_to_cpu(src->ln_magic);
52         dst->ln_status = le32_to_cpu(src->ln_status);
53         dst->ln_flags = le32_to_cpu(src->ln_flags);
54         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
55         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
56         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
57         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
58         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
59         dst->ln_time_last_checkpoint =
60                                 le64_to_cpu(src->ln_time_last_checkpoint);
61         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
62                                  &src->ln_pos_latest_start);
63         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
64                                  &src->ln_pos_last_checkpoint);
65         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
66                                  &src->ln_pos_first_inconsistent);
67         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
68         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
69         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
70         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
71         dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
72         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
73         dst->ln_objs_repaired_phase2 =
74                                 le64_to_cpu(src->ln_objs_repaired_phase2);
75         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
76         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
77         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
78         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
79                       &src->ln_fid_latest_scanned_phase2);
80         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
81         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
82 }
83
84 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
85                                       struct lfsck_namespace *src)
86 {
87         dst->ln_magic = cpu_to_le32(src->ln_magic);
88         dst->ln_status = cpu_to_le32(src->ln_status);
89         dst->ln_flags = cpu_to_le32(src->ln_flags);
90         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
91         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
92         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
93         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
94         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
95         dst->ln_time_last_checkpoint =
96                                 cpu_to_le64(src->ln_time_last_checkpoint);
97         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
98                                  &src->ln_pos_latest_start);
99         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
100                                  &src->ln_pos_last_checkpoint);
101         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
102                                  &src->ln_pos_first_inconsistent);
103         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
104         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
105         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
106         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
107         dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
108         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
109         dst->ln_objs_repaired_phase2 =
110                                 cpu_to_le64(src->ln_objs_repaired_phase2);
111         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
112         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
113         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
114         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
115                       &src->ln_fid_latest_scanned_phase2);
116         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
117         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
118 }
119
120 /**
121  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
122  * \retval 0: succeed.
123  * \retval -ve: failed cases.
124  */
125 static int lfsck_namespace_load(const struct lu_env *env,
126                                 struct lfsck_component *com)
127 {
128         int len = com->lc_file_size;
129         int rc;
130
131         rc = dt_xattr_get(env, com->lc_obj,
132                           lfsck_buf_get(env, com->lc_file_disk, len),
133                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
134         if (rc == len) {
135                 struct lfsck_namespace *ns = com->lc_file_ram;
136
137                 lfsck_namespace_le_to_cpu(ns,
138                                 (struct lfsck_namespace *)com->lc_file_disk);
139                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
140                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
141                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
142                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
143                         rc = 1;
144                 } else {
145                         rc = 0;
146                 }
147         } else if (rc != -ENODATA) {
148                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
149                        "expected = %d: rc = %d\n",
150                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
151                 if (rc >= 0)
152                         rc = 1;
153         }
154         return rc;
155 }
156
157 static int lfsck_namespace_store(const struct lu_env *env,
158                                  struct lfsck_component *com, bool init)
159 {
160         struct dt_object        *obj    = com->lc_obj;
161         struct lfsck_instance   *lfsck  = com->lc_lfsck;
162         struct thandle          *handle;
163         int                      len    = com->lc_file_size;
164         int                      rc;
165         ENTRY;
166
167         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
168                                   (struct lfsck_namespace *)com->lc_file_ram);
169         handle = dt_trans_create(env, lfsck->li_bottom);
170         if (IS_ERR(handle))
171                 GOTO(log, rc = PTR_ERR(handle));
172
173         rc = dt_declare_xattr_set(env, obj,
174                                   lfsck_buf_get(env, com->lc_file_disk, len),
175                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
176         if (rc != 0)
177                 GOTO(out, rc);
178
179         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
180         if (rc != 0)
181                 GOTO(out, rc);
182
183         rc = dt_xattr_set(env, obj,
184                           lfsck_buf_get(env, com->lc_file_disk, len),
185                           XATTR_NAME_LFSCK_NAMESPACE,
186                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
187                           handle, BYPASS_CAPA);
188
189         GOTO(out, rc);
190
191 out:
192         dt_trans_stop(env, lfsck->li_bottom, handle);
193
194 log:
195         if (rc != 0)
196                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
197                        lfsck_lfsck2name(lfsck), rc);
198         return rc;
199 }
200
201 static int lfsck_namespace_init(const struct lu_env *env,
202                                 struct lfsck_component *com)
203 {
204         struct lfsck_namespace *ns = com->lc_file_ram;
205         int rc;
206
207         memset(ns, 0, sizeof(*ns));
208         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
209         ns->ln_status = LS_INIT;
210         down_write(&com->lc_sem);
211         rc = lfsck_namespace_store(env, com, true);
212         up_write(&com->lc_sem);
213         return rc;
214 }
215
216 static int lfsck_namespace_lookup(const struct lu_env *env,
217                                   struct lfsck_component *com,
218                                   const struct lu_fid *fid, __u8 *flags)
219 {
220         struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
221         int            rc;
222
223         fid_cpu_to_be(key, fid);
224         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
225                        (const struct dt_key *)key, BYPASS_CAPA);
226         return rc;
227 }
228
229 static int lfsck_namespace_delete(const struct lu_env *env,
230                                   struct lfsck_component *com,
231                                   const struct lu_fid *fid)
232 {
233         struct lfsck_instance   *lfsck  = com->lc_lfsck;
234         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
235         struct thandle          *handle;
236         struct dt_object        *obj    = com->lc_obj;
237         int                      rc;
238         ENTRY;
239
240         handle = dt_trans_create(env, lfsck->li_bottom);
241         if (IS_ERR(handle))
242                 RETURN(PTR_ERR(handle));
243
244         rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
245         if (rc != 0)
246                 GOTO(out, rc);
247
248         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
249         if (rc != 0)
250                 GOTO(out, rc);
251
252         fid_cpu_to_be(key, fid);
253         rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
254                        BYPASS_CAPA);
255
256         GOTO(out, rc);
257
258 out:
259         dt_trans_stop(env, lfsck->li_bottom, handle);
260         return rc;
261 }
262
263 static int lfsck_namespace_update(const struct lu_env *env,
264                                   struct lfsck_component *com,
265                                   const struct lu_fid *fid,
266                                   __u8 flags, bool force)
267 {
268         struct lfsck_instance   *lfsck  = com->lc_lfsck;
269         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
270         struct thandle          *handle;
271         struct dt_object        *obj    = com->lc_obj;
272         int                      rc;
273         bool                     exist  = false;
274         __u8                     tf;
275         ENTRY;
276
277         rc = lfsck_namespace_lookup(env, com, fid, &tf);
278         if (rc != 0 && rc != -ENOENT)
279                 RETURN(rc);
280
281         if (rc == 0) {
282                 if (!force || flags == tf)
283                         RETURN(0);
284
285                 exist = true;
286                 handle = dt_trans_create(env, lfsck->li_bottom);
287                 if (IS_ERR(handle))
288                         RETURN(PTR_ERR(handle));
289
290                 rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
291                                        handle);
292                 if (rc != 0)
293                         GOTO(out, rc);
294         } else {
295                 handle = dt_trans_create(env, lfsck->li_bottom);
296                 if (IS_ERR(handle))
297                         RETURN(PTR_ERR(handle));
298         }
299
300         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
301                                (const struct dt_key *)fid, handle);
302         if (rc != 0)
303                 GOTO(out, rc);
304
305         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
306         if (rc != 0)
307                 GOTO(out, rc);
308
309         fid_cpu_to_be(key, fid);
310         if (exist) {
311                 rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
312                                BYPASS_CAPA);
313                 if (rc != 0)
314                         GOTO(out, rc);
315         }
316
317         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
318                        (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
319
320         GOTO(out, rc);
321
322 out:
323         dt_trans_stop(env, lfsck->li_bottom, handle);
324         return rc;
325 }
326
327 static int lfsck_namespace_check_exist(const struct lu_env *env,
328                                        struct lfsck_instance *lfsck,
329                                        struct dt_object *obj, const char *name)
330 {
331         struct dt_object *dir = lfsck->li_obj_dir;
332         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
333         int               rc;
334         ENTRY;
335
336         if (unlikely(lfsck_is_dead_obj(obj)))
337                 RETURN(LFSCK_NAMEENTRY_DEAD);
338
339         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
340                        (const struct dt_key *)name, BYPASS_CAPA);
341         if (rc == -ENOENT)
342                 RETURN(LFSCK_NAMEENTRY_REMOVED);
343
344         if (rc < 0)
345                 RETURN(rc);
346
347         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
348                 RETURN(LFSCK_NAMEENTRY_RECREATED);
349
350         RETURN(0);
351 }
352
353 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
354                                             struct dt_object *obj,
355                                             struct thandle *handle)
356 {
357         int rc;
358
359         /* For destroying all invalid linkEA entries. */
360         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
361         if (rc != 0)
362                 return rc;
363
364         /* For insert new linkEA entry. */
365         rc = dt_declare_xattr_set(env, obj,
366                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
367                         XATTR_NAME_LINK, 0, handle);
368         return rc;
369 }
370
371 static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
372                             struct linkea_data *ldata)
373 {
374         int rc;
375
376         ldata->ld_buf =
377                 lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
378                                        PAGE_CACHE_SIZE);
379         if (ldata->ld_buf->lb_buf == NULL)
380                 return -ENOMEM;
381
382         if (!dt_object_exists(obj))
383                 return -ENODATA;
384
385         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
386         if (rc == -ERANGE) {
387                 /* Buf was too small, figure out what we need. */
388                 lu_buf_free(ldata->ld_buf);
389                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
390                                   BYPASS_CAPA);
391                 if (rc < 0)
392                         return rc;
393
394                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
395                 if (ldata->ld_buf->lb_buf == NULL)
396                         return -ENOMEM;
397
398                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
399                                   BYPASS_CAPA);
400         }
401         if (rc < 0)
402                 return rc;
403
404         linkea_init(ldata);
405
406         return 0;
407 }
408
409 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
410                              struct linkea_data *ldata, struct thandle *handle)
411 {
412         const struct lu_buf *buf = lfsck_buf_get_const(env,
413                                                        ldata->ld_buf->lb_buf,
414                                                        ldata->ld_leh->leh_len);
415
416         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
417                             BYPASS_CAPA);
418 }
419
420 /**
421  * \retval ve: removed entries
422  */
423 static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
424                                      struct linkea_data *ldata,
425                                      struct lu_name *cname,
426                                      struct lu_fid *pfid)
427 {
428         struct link_ea_entry    *oldlee;
429         int                      oldlen;
430         int                      removed = 0;
431
432         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
433         oldlee = ldata->ld_lee;
434         oldlen = ldata->ld_reclen;
435         linkea_next_entry(ldata);
436         while (ldata->ld_lee != NULL) {
437                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
438                                    ldata->ld_lee->lee_reclen[1];
439                 if (unlikely(ldata->ld_reclen == oldlen &&
440                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
441                         linkea_del_buf(ldata, cname);
442                         removed++;
443                 } else {
444                         linkea_next_entry(ldata);
445                 }
446         }
447         ldata->ld_lee = oldlee;
448         ldata->ld_reclen = oldlen;
449         return removed;
450 }
451
452 /**
453  * \retval +ve  repaired
454  * \retval 0    no need to repair
455  * \retval -ve  error cases
456  */
457 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
458                                            struct lfsck_component *com,
459                                            struct dt_object *child, __u8 flags)
460 {
461         struct lfsck_thread_info *info    = lfsck_env_info(env);
462         struct lu_attr           *la      = &info->lti_la;
463         struct lu_name           *cname   = &info->lti_name;
464         struct lu_fid            *pfid    = &info->lti_fid;
465         struct lu_fid            *cfid    = &info->lti_fid2;
466         struct lfsck_instance   *lfsck    = com->lc_lfsck;
467         struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
468         struct lfsck_namespace  *ns       = com->lc_file_ram;
469         struct linkea_data       ldata    = { 0 };
470         struct thandle          *handle   = NULL;
471         bool                     locked   = false;
472         bool                     update   = false;
473         int                      rc;
474         ENTRY;
475
476         if (com->lc_journal) {
477
478 again:
479                 LASSERT(!locked);
480
481                 update = false;
482                 com->lc_journal = 1;
483                 handle = dt_trans_create(env, lfsck->li_next);
484                 if (IS_ERR(handle))
485                         RETURN(rc = PTR_ERR(handle));
486
487                 rc = dt_declare_xattr_set(env, child,
488                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
489                         XATTR_NAME_LINK, 0, handle);
490                 if (rc != 0)
491                         GOTO(stop, rc);
492
493                 rc = dt_trans_start(env, lfsck->li_next, handle);
494                 if (rc != 0)
495                         GOTO(stop, rc);
496
497                 dt_write_lock(env, child, MOR_TGT_CHILD);
498                 locked = true;
499         }
500
501         if (unlikely(lfsck_is_dead_obj(child)))
502                 GOTO(stop, rc = 0);
503
504         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
505         if (rc == 0)
506                 rc = lfsck_links_read(env, child, &ldata);
507         if (rc != 0) {
508                 if ((bk->lb_param & LPF_DRYRUN) &&
509                     (rc == -EINVAL || rc == -ENODATA))
510                         rc = 1;
511
512                 GOTO(stop, rc);
513         }
514
515         linkea_first_entry(&ldata);
516         while (ldata.ld_lee != NULL) {
517                 struct dt_object *parent = NULL;
518
519                 rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
520                 if (rc > 0)
521                         update = true;
522
523                 if (!fid_is_sane(pfid))
524                         goto shrink;
525
526                 parent = lfsck_object_find(env, lfsck, pfid);
527                 if (parent == NULL)
528                         goto shrink;
529                 else if (IS_ERR(parent))
530                         GOTO(stop, rc = PTR_ERR(parent));
531
532                 if (!dt_object_exists(parent))
533                         goto shrink;
534
535                 /* XXX: Currently, skip remote object, the consistency for
536                  *      remote object will be processed in LFSCK phase III. */
537                 if (dt_object_remote(parent)) {
538                         lfsck_object_put(env, parent);
539                         linkea_next_entry(&ldata);
540                         continue;
541                 }
542
543                 if (unlikely(!dt_try_as_dir(env, parent)))
544                         goto shrink;
545
546                 /* To guarantee the 'name' is terminated with '0'. */
547                 memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
548                 info->lti_key[cname->ln_namelen] = 0;
549                 cname->ln_name = info->lti_key;
550                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
551                                (const struct dt_key *)cname->ln_name,
552                                BYPASS_CAPA);
553                 if (rc != 0 && rc != -ENOENT) {
554                         lfsck_object_put(env, parent);
555                         GOTO(stop, rc);
556                 }
557
558                 if (rc == 0) {
559                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
560                                 lfsck_object_put(env, parent);
561                                 linkea_next_entry(&ldata);
562                                 continue;
563                         }
564
565                         goto shrink;
566                 }
567
568                 /* If there is no name entry in the parent dir and the object
569                  * link count is less than the linkea entries count, then the
570                  * linkea entry should be removed. */
571                 if (ldata.ld_leh->leh_reccount > la->la_nlink)
572                         goto shrink;
573
574                 /* XXX: For the case of there is a linkea entry, but without
575                  *      name entry pointing to the object and its hard links
576                  *      count is not less than the object name entries count,
577                  *      then seems we should add the 'missed' name entry back
578                  *      to namespace, but before LFSCK phase III finished, we
579                  *      do not know whether the object has some inconsistency
580                  *      on other MDTs. So now, do NOT add the name entry back
581                  *      to the namespace, but keep the linkEA entry. LU-2914 */
582                 lfsck_object_put(env, parent);
583                 linkea_next_entry(&ldata);
584                 continue;
585
586 shrink:
587                 if (parent != NULL)
588                         lfsck_object_put(env, parent);
589                 if (bk->lb_param & LPF_DRYRUN)
590                         RETURN(1);
591
592                 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
593                       "for the object: "DFID", parent "DFID", name %.*s\n",
594                       lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)),
595                       PFID(pfid), cname->ln_namelen, cname->ln_name);
596
597                 linkea_del_buf(&ldata, cname);
598                 update = true;
599         }
600
601         if (update) {
602                 if (!com->lc_journal) {
603                         com->lc_journal = 1;
604                         goto again;
605                 }
606
607                 rc = lfsck_links_write(env, child, &ldata, handle);
608         }
609
610         GOTO(stop, rc);
611
612 stop:
613         if (locked) {
614         /* XXX: For the case linkea entries count does not match the object hard
615          *      links count, we cannot update the later one simply. Before LFSCK
616          *      phase III finished, we cannot know whether there are some remote
617          *      name entries to be repaired or not. LU-2914 */
618                 if (rc == 0 && !lfsck_is_dead_obj(child) &&
619                     ldata.ld_leh != NULL &&
620                     ldata.ld_leh->leh_reccount != la->la_nlink)
621                         CDEBUG(D_LFSCK, "%s: the object "DFID" linkEA entry "
622                                "count %u may not match its hardlink count %u\n",
623                                lfsck_lfsck2name(lfsck), PFID(cfid),
624                                ldata.ld_leh->leh_reccount, la->la_nlink);
625
626                 dt_write_unlock(env, child);
627         }
628
629         if (handle != NULL)
630                 dt_trans_stop(env, lfsck->li_next, handle);
631
632         if (rc == 0 && update) {
633                 ns->ln_objs_nlink_repaired++;
634                 rc = 1;
635         }
636
637         return rc;
638 }
639
640 /* namespace APIs */
641
642 static int lfsck_namespace_reset(const struct lu_env *env,
643                                  struct lfsck_component *com, bool init)
644 {
645         struct lfsck_instance   *lfsck = com->lc_lfsck;
646         struct lfsck_namespace  *ns    = com->lc_file_ram;
647         struct dt_object        *root;
648         struct dt_object        *dto;
649         int                      rc;
650         ENTRY;
651
652         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
653         if (IS_ERR(root))
654                 GOTO(log, rc = PTR_ERR(root));
655
656         if (unlikely(!dt_try_as_dir(env, root)))
657                 GOTO(put, rc = -ENOTDIR);
658
659         down_write(&com->lc_sem);
660         if (init) {
661                 memset(ns, 0, sizeof(*ns));
662         } else {
663                 __u32 count = ns->ln_success_count;
664                 __u64 last_time = ns->ln_time_last_complete;
665
666                 memset(ns, 0, sizeof(*ns));
667                 ns->ln_success_count = count;
668                 ns->ln_time_last_complete = last_time;
669         }
670         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
671         ns->ln_status = LS_INIT;
672
673         rc = local_object_unlink(env, lfsck->li_bottom, root,
674                                  lfsck_namespace_name);
675         if (rc != 0)
676                 GOTO(out, rc);
677
678         lfsck_object_put(env, com->lc_obj);
679         com->lc_obj = NULL;
680         dto = local_index_find_or_create(env, lfsck->li_los, root,
681                                          lfsck_namespace_name,
682                                          S_IFREG | S_IRUGO | S_IWUSR,
683                                          &dt_lfsck_features);
684         if (IS_ERR(dto))
685                 GOTO(out, rc = PTR_ERR(dto));
686
687         com->lc_obj = dto;
688         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
689         if (rc != 0)
690                 GOTO(out, rc);
691
692         rc = lfsck_namespace_store(env, com, true);
693
694         GOTO(out, rc);
695
696 out:
697         up_write(&com->lc_sem);
698
699 put:
700         lu_object_put(env, &root->do_lu);
701 log:
702         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
703                lfsck_lfsck2name(lfsck), rc);
704         return rc;
705 }
706
707 static void
708 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
709                      bool new_checked)
710 {
711         struct lfsck_namespace *ns = com->lc_file_ram;
712
713         down_write(&com->lc_sem);
714         if (new_checked)
715                 com->lc_new_checked++;
716         ns->ln_items_failed++;
717         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
718                 lfsck_pos_fill(env, com->lc_lfsck,
719                                &ns->ln_pos_first_inconsistent, false);
720
721                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
722                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
723                        lfsck_lfsck2name(com->lc_lfsck),
724                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
725                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
726                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
727         }
728         up_write(&com->lc_sem);
729 }
730
731 static int lfsck_namespace_checkpoint(const struct lu_env *env,
732                                       struct lfsck_component *com, bool init)
733 {
734         struct lfsck_instance   *lfsck = com->lc_lfsck;
735         struct lfsck_namespace  *ns    = com->lc_file_ram;
736         int                      rc;
737
738         if (com->lc_new_checked == 0 && !init)
739                 return 0;
740
741         down_write(&com->lc_sem);
742         if (init) {
743                 ns->ln_pos_latest_start = lfsck->li_pos_current;
744         } else {
745                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
746                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
747                                 HALF_SEC - lfsck->li_time_last_checkpoint);
748                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
749                 ns->ln_items_checked += com->lc_new_checked;
750                 com->lc_new_checked = 0;
751         }
752
753         rc = lfsck_namespace_store(env, com, false);
754         up_write(&com->lc_sem);
755
756         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
757                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
758                lfsck->li_pos_current.lp_oit_cookie,
759                PFID(&lfsck->li_pos_current.lp_dir_parent),
760                lfsck->li_pos_current.lp_dir_cookie, rc);
761
762         return rc;
763 }
764
765 static int lfsck_namespace_prep(const struct lu_env *env,
766                                 struct lfsck_component *com,
767                                 struct lfsck_start_param *lsp)
768 {
769         struct lfsck_instance   *lfsck  = com->lc_lfsck;
770         struct lfsck_namespace  *ns     = com->lc_file_ram;
771         struct lfsck_position   *pos    = &com->lc_pos_start;
772
773         if (ns->ln_status == LS_COMPLETED) {
774                 int rc;
775
776                 rc = lfsck_namespace_reset(env, com, false);
777                 if (rc == 0)
778                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
779
780                 if (rc != 0) {
781                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
782                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
783
784                         return rc;
785                 }
786         }
787
788         down_write(&com->lc_sem);
789         ns->ln_time_latest_start = cfs_time_current_sec();
790
791         spin_lock(&lfsck->li_lock);
792         if (ns->ln_flags & LF_SCANNED_ONCE) {
793                 if (!lfsck->li_drop_dryrun ||
794                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
795                         ns->ln_status = LS_SCANNING_PHASE2;
796                         cfs_list_del_init(&com->lc_link);
797                         cfs_list_add_tail(&com->lc_link,
798                                           &lfsck->li_list_double_scan);
799                         if (!cfs_list_empty(&com->lc_link_dir))
800                                 cfs_list_del_init(&com->lc_link_dir);
801                         lfsck_pos_set_zero(pos);
802                 } else {
803                         ns->ln_status = LS_SCANNING_PHASE1;
804                         ns->ln_run_time_phase1 = 0;
805                         ns->ln_run_time_phase2 = 0;
806                         ns->ln_items_checked = 0;
807                         ns->ln_items_repaired = 0;
808                         ns->ln_items_failed = 0;
809                         ns->ln_dirs_checked = 0;
810                         ns->ln_mlinked_checked = 0;
811                         ns->ln_objs_checked_phase2 = 0;
812                         ns->ln_objs_repaired_phase2 = 0;
813                         ns->ln_objs_failed_phase2 = 0;
814                         ns->ln_objs_nlink_repaired = 0;
815                         ns->ln_objs_lost_found = 0;
816                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
817                         if (cfs_list_empty(&com->lc_link_dir))
818                                 cfs_list_add_tail(&com->lc_link_dir,
819                                                   &lfsck->li_list_dir);
820                         *pos = ns->ln_pos_first_inconsistent;
821                 }
822         } else {
823                 ns->ln_status = LS_SCANNING_PHASE1;
824                 if (cfs_list_empty(&com->lc_link_dir))
825                         cfs_list_add_tail(&com->lc_link_dir,
826                                           &lfsck->li_list_dir);
827                 if (!lfsck->li_drop_dryrun ||
828                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
829                         *pos = ns->ln_pos_last_checkpoint;
830                         pos->lp_oit_cookie++;
831                 } else {
832                         *pos = ns->ln_pos_first_inconsistent;
833                 }
834         }
835         spin_unlock(&lfsck->li_lock);
836         up_write(&com->lc_sem);
837
838         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
839                DFID", "LPX64"]\n", lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
840                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
841
842         return 0;
843 }
844
845 static int lfsck_namespace_exec_oit(const struct lu_env *env,
846                                     struct lfsck_component *com,
847                                     struct dt_object *obj)
848 {
849         down_write(&com->lc_sem);
850         com->lc_new_checked++;
851         if (S_ISDIR(lfsck_object_type(obj)))
852                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
853         up_write(&com->lc_sem);
854         return 0;
855 }
856
857 static int lfsck_namespace_exec_dir(const struct lu_env *env,
858                                     struct lfsck_component *com,
859                                     struct dt_object *obj,
860                                     struct lu_dirent *ent)
861 {
862         struct lfsck_thread_info   *info     = lfsck_env_info(env);
863         struct lu_attr             *la       = &info->lti_la;
864         struct lfsck_instance      *lfsck    = com->lc_lfsck;
865         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
866         struct lfsck_namespace     *ns       = com->lc_file_ram;
867         struct linkea_data          ldata    = { 0 };
868         const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
869         const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
870         const struct lu_name       *cname;
871         struct thandle             *handle   = NULL;
872         bool                        repaired = false;
873         bool                        locked   = false;
874         bool                        remove;
875         bool                        newdata;
876         bool                        log      = false;
877         int                         count    = 0;
878         int                         rc;
879         ENTRY;
880
881         cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
882         down_write(&com->lc_sem);
883         com->lc_new_checked++;
884
885         if (ent->lde_attrs & LUDA_UPGRADE) {
886                 ns->ln_flags |= LF_UPGRADE;
887                 ns->ln_dirent_repaired++;
888                 repaired = true;
889         } else if (ent->lde_attrs & LUDA_REPAIR) {
890                 ns->ln_flags |= LF_INCONSISTENT;
891                 ns->ln_dirent_repaired++;
892                 repaired = true;
893         }
894
895         if (ent->lde_name[0] == '.' &&
896             (ent->lde_namelen == 1 ||
897              (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
898              fid_seq_is_dot_lustre(fid_seq(&ent->lde_fid))))
899                 GOTO(out, rc = 0);
900
901         if (!(bk->lb_param & LPF_DRYRUN) &&
902             (com->lc_journal || repaired)) {
903
904 again:
905                 LASSERT(!locked);
906
907                 com->lc_journal = 1;
908                 handle = dt_trans_create(env, lfsck->li_next);
909                 if (IS_ERR(handle))
910                         GOTO(out, rc = PTR_ERR(handle));
911
912                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
913                 if (rc != 0)
914                         GOTO(stop, rc);
915
916                 rc = dt_trans_start(env, lfsck->li_next, handle);
917                 if (rc != 0)
918                         GOTO(stop, rc);
919
920                 dt_write_lock(env, obj, MOR_TGT_CHILD);
921                 locked = true;
922         }
923
924         rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
925         if (rc != 0)
926                 GOTO(stop, rc);
927
928         rc = lfsck_links_read(env, obj, &ldata);
929         if (rc == 0) {
930                 count = ldata.ld_leh->leh_reccount;
931                 rc = linkea_links_find(&ldata, cname, pfid);
932                 if ((rc == 0) &&
933                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
934                         goto record;
935
936                 ns->ln_flags |= LF_INCONSISTENT;
937                 /* For dir, if there are more than one linkea entries, or the
938                  * linkea entry does not match the name entry, then remove all
939                  * and add the correct one. */
940                 if (S_ISDIR(lfsck_object_type(obj))) {
941                         remove = true;
942                         newdata = true;
943                 } else {
944                         remove = false;
945                         newdata = false;
946                 }
947                 goto nodata;
948         } else if (unlikely(rc == -EINVAL)) {
949                 count = 1;
950                 ns->ln_flags |= LF_INCONSISTENT;
951                 /* The magic crashed, we are not sure whether there are more
952                  * corrupt data in the linkea, so remove all linkea entries. */
953                 remove = true;
954                 newdata = true;
955                 goto nodata;
956         } else if (rc == -ENODATA) {
957                 count = 1;
958                 ns->ln_flags |= LF_UPGRADE;
959                 remove = false;
960                 newdata = true;
961
962 nodata:
963                 if (bk->lb_param & LPF_DRYRUN) {
964                         ns->ln_linkea_repaired++;
965                         log = true;
966                         repaired = true;
967                         goto record;
968                 }
969
970                 if (!com->lc_journal)
971                         goto again;
972
973                 if (remove) {
974                         LASSERT(newdata);
975
976                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
977                                           BYPASS_CAPA);
978                         if (rc != 0)
979                                 GOTO(stop, rc);
980                 }
981
982                 if (newdata) {
983                         rc = linkea_data_new(&ldata,
984                                         &lfsck_env_info(env)->lti_linkea_buf);
985                         if (rc != 0)
986                                 GOTO(stop, rc);
987                 }
988
989                 rc = linkea_add_buf(&ldata, cname, pfid);
990                 if (rc != 0)
991                         GOTO(stop, rc);
992
993                 rc = lfsck_links_write(env, obj, &ldata, handle);
994                 if (rc != 0)
995                         GOTO(stop, rc);
996
997                 count = ldata.ld_leh->leh_reccount;
998                 ns->ln_linkea_repaired++;
999                 log = true;
1000                 repaired = true;
1001         } else {
1002                 GOTO(stop, rc);
1003         }
1004
1005 record:
1006         LASSERT(count > 0);
1007
1008         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1009         if (rc != 0)
1010                 GOTO(stop, rc);
1011
1012         if ((count == 1) &&
1013             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
1014                 /* Usually, it is for single linked object or dir, do nothing.*/
1015                 GOTO(stop, rc);
1016
1017         /* Following modification will be in another transaction.  */
1018         if (handle != NULL) {
1019                 LASSERT(dt_write_locked(env, obj));
1020
1021                 dt_write_unlock(env, obj);
1022                 locked = false;
1023
1024                 dt_trans_stop(env, lfsck->li_next, handle);
1025                 handle = NULL;
1026
1027                 if (log)
1028                         CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired "
1029                               "linkEA for the object: "DFID", parent "
1030                               DFID", name %.*s\n",
1031                               lfsck_lfsck2name(lfsck), PFID(cfid), PFID(pfid),
1032                               ent->lde_namelen, ent->lde_name);
1033         }
1034
1035         ns->ln_mlinked_checked++;
1036         rc = lfsck_namespace_update(env, com, cfid,
1037                         count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
1038
1039         GOTO(out, rc);
1040
1041 stop:
1042         if (locked)
1043                 dt_write_unlock(env, obj);
1044
1045         if (handle != NULL)
1046                 dt_trans_stop(env, lfsck->li_next, handle);
1047
1048 out:
1049         if (rc < 0) {
1050                 CDEBUG(D_LFSCK, "%s: namespace LFSCK exec_dir failed, "
1051                        "parent "DFID", child name %.*s, child FID "DFID
1052                        ": rc = %d\n", lfsck_lfsck2name(lfsck), PFID(pfid),
1053                        ent->lde_namelen, ent->lde_name, PFID(cfid), rc);
1054
1055                 ns->ln_items_failed++;
1056                 if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1057                         lfsck_pos_fill(env, lfsck,
1058                                        &ns->ln_pos_first_inconsistent, false);
1059                 if (!(bk->lb_param & LPF_FAILOUT))
1060                         rc = 0;
1061         } else {
1062                 if (repaired) {
1063                         ns->ln_items_repaired++;
1064                         if (bk->lb_param & LPF_DRYRUN &&
1065                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1066                                 lfsck_pos_fill(env, lfsck,
1067                                                &ns->ln_pos_first_inconsistent,
1068                                                false);
1069                 } else {
1070                         com->lc_journal = 0;
1071                 }
1072                 rc = 0;
1073         }
1074         up_write(&com->lc_sem);
1075         return rc;
1076 }
1077
1078 static int lfsck_namespace_post(const struct lu_env *env,
1079                                 struct lfsck_component *com,
1080                                 int result, bool init)
1081 {
1082         struct lfsck_instance   *lfsck = com->lc_lfsck;
1083         struct lfsck_namespace  *ns    = com->lc_file_ram;
1084         int                      rc;
1085
1086         down_write(&com->lc_sem);
1087         spin_lock(&lfsck->li_lock);
1088         if (!init)
1089                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
1090         if (result > 0) {
1091                 ns->ln_status = LS_SCANNING_PHASE2;
1092                 ns->ln_flags |= LF_SCANNED_ONCE;
1093                 ns->ln_flags &= ~LF_UPGRADE;
1094                 cfs_list_del_init(&com->lc_link);
1095                 cfs_list_del_init(&com->lc_link_dir);
1096                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
1097         } else if (result == 0) {
1098                 ns->ln_status = lfsck->li_status;
1099                 if (ns->ln_status == 0)
1100                         ns->ln_status = LS_STOPPED;
1101                 if (ns->ln_status != LS_PAUSED) {
1102                         cfs_list_del_init(&com->lc_link);
1103                         cfs_list_del_init(&com->lc_link_dir);
1104                         cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1105                 }
1106         } else {
1107                 ns->ln_status = LS_FAILED;
1108                 cfs_list_del_init(&com->lc_link);
1109                 cfs_list_del_init(&com->lc_link_dir);
1110                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1111         }
1112         spin_unlock(&lfsck->li_lock);
1113
1114         if (!init) {
1115                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1116                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1117                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1118                 ns->ln_items_checked += com->lc_new_checked;
1119                 com->lc_new_checked = 0;
1120         }
1121
1122         rc = lfsck_namespace_store(env, com, false);
1123         up_write(&com->lc_sem);
1124
1125         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
1126                lfsck_lfsck2name(lfsck), rc);
1127
1128         return rc;
1129 }
1130
1131 static int
1132 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1133                      struct seq_file *m)
1134 {
1135         struct lfsck_instance   *lfsck = com->lc_lfsck;
1136         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1137         struct lfsck_namespace  *ns    = com->lc_file_ram;
1138         int                      rc;
1139
1140         down_read(&com->lc_sem);
1141         seq_printf(m, "name: lfsck_namespace\n"
1142                    "magic: %#x\n"
1143                    "version: %d\n"
1144                    "status: %s\n",
1145                    ns->ln_magic,
1146                    bk->lb_version,
1147                    lfsck_status2names(ns->ln_status));
1148
1149         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
1150         if (rc < 0)
1151                 goto out;
1152
1153         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
1154         if (rc < 0)
1155                 goto out;
1156
1157         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
1158                              "time_since_last_completed");
1159         if (rc < 0)
1160                 goto out;
1161
1162         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
1163                              "time_since_latest_start");
1164         if (rc < 0)
1165                 goto out;
1166
1167         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
1168                              "time_since_last_checkpoint");
1169         if (rc < 0)
1170                 goto out;
1171
1172         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
1173                             "latest_start_position");
1174         if (rc < 0)
1175                 goto out;
1176
1177         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
1178                             "last_checkpoint_position");
1179         if (rc < 0)
1180                 goto out;
1181
1182         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
1183                             "first_failure_position");
1184         if (rc < 0)
1185                 goto out;
1186
1187         if (ns->ln_status == LS_SCANNING_PHASE1) {
1188                 struct lfsck_position pos;
1189                 const struct dt_it_ops *iops;
1190                 cfs_duration_t duration = cfs_time_current() -
1191                                           lfsck->li_time_last_checkpoint;
1192                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1193                 __u64 speed = checked;
1194                 __u64 new_checked = com->lc_new_checked * HZ;
1195                 __u32 rtime = ns->ln_run_time_phase1 +
1196                               cfs_duration_sec(duration + HALF_SEC);
1197
1198                 if (duration != 0)
1199                         do_div(new_checked, duration);
1200                 if (rtime != 0)
1201                         do_div(speed, rtime);
1202                 seq_printf(m, "checked_phase1: "LPU64"\n"
1203                               "checked_phase2: "LPU64"\n"
1204                               "updated_phase1: "LPU64"\n"
1205                               "updated_phase2: "LPU64"\n"
1206                               "failed_phase1: "LPU64"\n"
1207                               "failed_phase2: "LPU64"\n"
1208                               "directories: "LPU64"\n"
1209                               "multi_linked_files: "LPU64"\n"
1210                               "dirent_repaired: "LPU64"\n"
1211                               "linkea_repaired: "LPU64"\n"
1212                               "nlinks_repaired: "LPU64"\n"
1213                               "lost_found: "LPU64"\n"
1214                               "success_count: %u\n"
1215                               "run_time_phase1: %u seconds\n"
1216                               "run_time_phase2: %u seconds\n"
1217                               "average_speed_phase1: "LPU64" items/sec\n"
1218                               "average_speed_phase2: N/A\n"
1219                               "real_time_speed_phase1: "LPU64" items/sec\n"
1220                               "real_time_speed_phase2: N/A\n",
1221                               checked,
1222                               ns->ln_objs_checked_phase2,
1223                               ns->ln_items_repaired,
1224                               ns->ln_objs_repaired_phase2,
1225                               ns->ln_items_failed,
1226                               ns->ln_objs_failed_phase2,
1227                               ns->ln_dirs_checked,
1228                               ns->ln_mlinked_checked,
1229                               ns->ln_dirent_repaired,
1230                               ns->ln_linkea_repaired,
1231                               ns->ln_objs_nlink_repaired,
1232                               ns->ln_objs_lost_found,
1233                               ns->ln_success_count,
1234                               rtime,
1235                               ns->ln_run_time_phase2,
1236                               speed,
1237                               new_checked);
1238
1239                 LASSERT(lfsck->li_di_oit != NULL);
1240
1241                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1242
1243                 /* The low layer otable-based iteration position may NOT
1244                  * exactly match the namespace-based directory traversal
1245                  * cookie. Generally, it is not a serious issue. But the
1246                  * caller should NOT make assumption on that. */
1247                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1248                 if (!lfsck->li_current_oit_processed)
1249                         pos.lp_oit_cookie--;
1250
1251                 spin_lock(&lfsck->li_lock);
1252                 if (lfsck->li_di_dir != NULL) {
1253                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1254                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1255                                 fid_zero(&pos.lp_dir_parent);
1256                                 pos.lp_dir_cookie = 0;
1257                         } else {
1258                                 pos.lp_dir_parent =
1259                                         *lfsck_dto2fid(lfsck->li_obj_dir);
1260                         }
1261                 } else {
1262                         fid_zero(&pos.lp_dir_parent);
1263                         pos.lp_dir_cookie = 0;
1264                 }
1265                 spin_unlock(&lfsck->li_lock);
1266                 lfsck_pos_dump(m, &pos, "current_position");
1267         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1268                 cfs_duration_t duration = cfs_time_current() -
1269                                           lfsck->li_time_last_checkpoint;
1270                 __u64 checked = ns->ln_objs_checked_phase2 +
1271                                 com->lc_new_checked;
1272                 __u64 speed1 = ns->ln_items_checked;
1273                 __u64 speed2 = checked;
1274                 __u64 new_checked = com->lc_new_checked * HZ;
1275                 __u32 rtime = ns->ln_run_time_phase2 +
1276                               cfs_duration_sec(duration + HALF_SEC);
1277
1278                 if (duration != 0)
1279                         do_div(new_checked, duration);
1280                 if (ns->ln_run_time_phase1 != 0)
1281                         do_div(speed1, ns->ln_run_time_phase1);
1282                 if (rtime != 0)
1283                         do_div(speed2, rtime);
1284                 seq_printf(m, "checked_phase1: "LPU64"\n"
1285                               "checked_phase2: "LPU64"\n"
1286                               "updated_phase1: "LPU64"\n"
1287                               "updated_phase2: "LPU64"\n"
1288                               "failed_phase1: "LPU64"\n"
1289                               "failed_phase2: "LPU64"\n"
1290                               "directories: "LPU64"\n"
1291                               "multi_linked_files: "LPU64"\n"
1292                               "dirent_repaired: "LPU64"\n"
1293                               "linkea_repaired: "LPU64"\n"
1294                               "nlinks_repaired: "LPU64"\n"
1295                               "lost_found: "LPU64"\n"
1296                               "success_count: %u\n"
1297                               "run_time_phase1: %u seconds\n"
1298                               "run_time_phase2: %u seconds\n"
1299                               "average_speed_phase1: "LPU64" items/sec\n"
1300                               "average_speed_phase2: "LPU64" objs/sec\n"
1301                               "real_time_speed_phase1: N/A\n"
1302                               "real_time_speed_phase2: "LPU64" objs/sec\n"
1303                               "current_position: "DFID"\n",
1304                               ns->ln_items_checked,
1305                               checked,
1306                               ns->ln_items_repaired,
1307                               ns->ln_objs_repaired_phase2,
1308                               ns->ln_items_failed,
1309                               ns->ln_objs_failed_phase2,
1310                               ns->ln_dirs_checked,
1311                               ns->ln_mlinked_checked,
1312                               ns->ln_dirent_repaired,
1313                               ns->ln_linkea_repaired,
1314                               ns->ln_objs_nlink_repaired,
1315                               ns->ln_objs_lost_found,
1316                               ns->ln_success_count,
1317                               ns->ln_run_time_phase1,
1318                               rtime,
1319                               speed1,
1320                               speed2,
1321                               new_checked,
1322                               PFID(&ns->ln_fid_latest_scanned_phase2));
1323         } else {
1324                 __u64 speed1 = ns->ln_items_checked;
1325                 __u64 speed2 = ns->ln_objs_checked_phase2;
1326
1327                 if (ns->ln_run_time_phase1 != 0)
1328                         do_div(speed1, ns->ln_run_time_phase1);
1329                 if (ns->ln_run_time_phase2 != 0)
1330                         do_div(speed2, ns->ln_run_time_phase2);
1331                 seq_printf(m, "checked_phase1: "LPU64"\n"
1332                               "checked_phase2: "LPU64"\n"
1333                               "updated_phase1: "LPU64"\n"
1334                               "updated_phase2: "LPU64"\n"
1335                               "failed_phase1: "LPU64"\n"
1336                               "failed_phase2: "LPU64"\n"
1337                               "directories: "LPU64"\n"
1338                               "multi_linked_files: "LPU64"\n"
1339                               "dirent_repaired: "LPU64"\n"
1340                               "linkea_repaired: "LPU64"\n"
1341                               "nlinks_repaired: "LPU64"\n"
1342                               "lost_found: "LPU64"\n"
1343                               "success_count: %u\n"
1344                               "run_time_phase1: %u seconds\n"
1345                               "run_time_phase2: %u seconds\n"
1346                               "average_speed_phase1: "LPU64" items/sec\n"
1347                               "average_speed_phase2: "LPU64" objs/sec\n"
1348                               "real_time_speed_phase1: N/A\n"
1349                               "real_time_speed_phase2: N/A\n"
1350                               "current_position: N/A\n",
1351                               ns->ln_items_checked,
1352                               ns->ln_objs_checked_phase2,
1353                               ns->ln_items_repaired,
1354                               ns->ln_objs_repaired_phase2,
1355                               ns->ln_items_failed,
1356                               ns->ln_objs_failed_phase2,
1357                               ns->ln_dirs_checked,
1358                               ns->ln_mlinked_checked,
1359                               ns->ln_dirent_repaired,
1360                               ns->ln_linkea_repaired,
1361                               ns->ln_objs_nlink_repaired,
1362                               ns->ln_objs_lost_found,
1363                               ns->ln_success_count,
1364                               ns->ln_run_time_phase1,
1365                               ns->ln_run_time_phase2,
1366                               speed1,
1367                               speed2);
1368         }
1369 out:
1370         up_read(&com->lc_sem);
1371         return 0;
1372 }
1373
1374 static int lfsck_namespace_double_scan_main(void *args)
1375 {
1376         struct lfsck_thread_args *lta   = args;
1377         const struct lu_env     *env    = &lta->lta_env;
1378         struct lfsck_component  *com    = lta->lta_com;
1379         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1380         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1381         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1382         struct lfsck_namespace  *ns     = com->lc_file_ram;
1383         struct dt_object        *obj    = com->lc_obj;
1384         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
1385         struct dt_object        *target;
1386         struct dt_it            *di;
1387         struct dt_key           *key;
1388         struct lu_fid            fid;
1389         int                      rc;
1390         __u8                     flags = 0;
1391         ENTRY;
1392
1393         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
1394                lfsck_lfsck2name(lfsck));
1395
1396         com->lc_new_checked = 0;
1397         com->lc_new_scanned = 0;
1398         com->lc_time_last_checkpoint = cfs_time_current();
1399         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
1400                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1401
1402         di = iops->init(env, obj, 0, BYPASS_CAPA);
1403         if (IS_ERR(di))
1404                 GOTO(out, rc = PTR_ERR(di));
1405
1406         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
1407         rc = iops->get(env, di, (const struct dt_key *)&fid);
1408         if (rc < 0)
1409                 GOTO(fini, rc);
1410
1411         /* Skip the start one, which either has been processed or non-exist. */
1412         rc = iops->next(env, di);
1413         if (rc != 0)
1414                 GOTO(put, rc);
1415
1416         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1417                 GOTO(put, rc = 0);
1418
1419         do {
1420                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1421                     cfs_fail_val > 0) {
1422                         struct l_wait_info lwi;
1423
1424                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1425                                           NULL, NULL);
1426                         l_wait_event(thread->t_ctl_waitq,
1427                                      !thread_is_running(thread),
1428                                      &lwi);
1429                 }
1430
1431                 key = iops->key(env, di);
1432                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
1433                 target = lfsck_object_find(env, lfsck, &fid);
1434                 down_write(&com->lc_sem);
1435                 if (target == NULL) {
1436                         rc = 0;
1437                         goto checkpoint;
1438                 } else if (IS_ERR(target)) {
1439                         rc = PTR_ERR(target);
1440                         goto checkpoint;
1441                 }
1442
1443                 /* XXX: Currently, skip remote object, the consistency for
1444                  *      remote object will be processed in LFSCK phase III. */
1445                 if (dt_object_exists(target) && !dt_object_remote(target)) {
1446                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
1447                         if (rc == 0)
1448                                 rc = lfsck_namespace_double_scan_one(env, com,
1449                                                                 target, flags);
1450                 }
1451
1452                 lfsck_object_put(env, target);
1453
1454 checkpoint:
1455                 com->lc_new_checked++;
1456                 com->lc_new_scanned++;
1457                 ns->ln_fid_latest_scanned_phase2 = fid;
1458                 if (rc > 0)
1459                         ns->ln_objs_repaired_phase2++;
1460                 else if (rc < 0)
1461                         ns->ln_objs_failed_phase2++;
1462                 up_write(&com->lc_sem);
1463
1464                 if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
1465                         lfsck_namespace_delete(env, com, &fid);
1466                 } else if (rc < 0) {
1467                         flags |= LLF_REPAIR_FAILED;
1468                         lfsck_namespace_update(env, com, &fid, flags, true);
1469                 }
1470
1471                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1472                         GOTO(put, rc);
1473
1474                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
1475                                               cfs_time_current())) &&
1476                     com->lc_new_checked != 0) {
1477                         down_write(&com->lc_sem);
1478                         ns->ln_run_time_phase2 +=
1479                                 cfs_duration_sec(cfs_time_current() +
1480                                 HALF_SEC - com->lc_time_last_checkpoint);
1481                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1482                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1483                         com->lc_new_checked = 0;
1484                         rc = lfsck_namespace_store(env, com, false);
1485                         up_write(&com->lc_sem);
1486                         if (rc != 0)
1487                                 GOTO(put, rc);
1488
1489                         com->lc_time_last_checkpoint = cfs_time_current();
1490                         com->lc_time_next_checkpoint =
1491                                 com->lc_time_last_checkpoint +
1492                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1493                 }
1494
1495                 lfsck_control_speed_by_self(com);
1496                 if (unlikely(!thread_is_running(thread)))
1497                         GOTO(put, rc = 0);
1498
1499                 rc = iops->next(env, di);
1500         } while (rc == 0);
1501
1502         GOTO(put, rc);
1503
1504 put:
1505         iops->put(env, di);
1506
1507 fini:
1508         iops->fini(env, di);
1509
1510 out:
1511         down_write(&com->lc_sem);
1512         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1513                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1514         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1515         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1516         com->lc_new_checked = 0;
1517
1518         if (rc > 0) {
1519                 com->lc_journal = 0;
1520                 ns->ln_status = LS_COMPLETED;
1521                 if (!(bk->lb_param & LPF_DRYRUN))
1522                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1523                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1524                 ns->ln_success_count++;
1525         } else if (rc == 0) {
1526                 ns->ln_status = lfsck->li_status;
1527                 if (ns->ln_status == 0)
1528                         ns->ln_status = LS_STOPPED;
1529         } else {
1530                 ns->ln_status = LS_FAILED;
1531         }
1532
1533         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan finished, status %d: "
1534               "rc = %d\n", lfsck_lfsck2name(lfsck), ns->ln_status, rc);
1535
1536         rc = lfsck_namespace_store(env, com, false);
1537         up_write(&com->lc_sem);
1538         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
1539                 wake_up_all(&thread->t_ctl_waitq);
1540
1541         lfsck_thread_args_fini(lta);
1542
1543         return rc;
1544 }
1545
1546 static int lfsck_namespace_double_scan(const struct lu_env *env,
1547                                        struct lfsck_component *com)
1548 {
1549         struct lfsck_instance           *lfsck = com->lc_lfsck;
1550         struct lfsck_namespace          *ns    = com->lc_file_ram;
1551         struct lfsck_thread_args        *lta;
1552         struct task_struct              *task;
1553         int                              rc;
1554         ENTRY;
1555
1556         if (unlikely(ns->ln_status != LS_SCANNING_PHASE2))
1557                 RETURN(0);
1558
1559         lta = lfsck_thread_args_init(lfsck, com, NULL);
1560         if (IS_ERR(lta))
1561                 GOTO(out, rc = PTR_ERR(lta));
1562
1563         atomic_inc(&lfsck->li_double_scan_count);
1564         task = kthread_run(lfsck_namespace_double_scan_main, lta,
1565                            "lfsck_namespace");
1566         if (IS_ERR(task)) {
1567                 atomic_dec(&lfsck->li_double_scan_count);
1568                 lfsck_thread_args_fini(lta);
1569                 GOTO(out, rc = PTR_ERR(task));
1570         }
1571
1572         RETURN(0);
1573
1574 out:
1575         CERROR("%s: cannot start LFSCK namespace thread: rc = %d\n",
1576                lfsck_lfsck2name(lfsck), rc);
1577         return rc;
1578 }
1579
1580 static int lfsck_namespace_in_notify(const struct lu_env *env,
1581                                      struct lfsck_component *com,
1582                                      struct lfsck_request *lr)
1583 {
1584         return 0;
1585 }
1586
1587 static int lfsck_namespace_query(const struct lu_env *env,
1588                                  struct lfsck_component *com)
1589 {
1590         struct lfsck_namespace *ns = com->lc_file_ram;
1591
1592         return ns->ln_status;
1593 }
1594
1595 static struct lfsck_operations lfsck_namespace_ops = {
1596         .lfsck_reset            = lfsck_namespace_reset,
1597         .lfsck_fail             = lfsck_namespace_fail,
1598         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1599         .lfsck_prep             = lfsck_namespace_prep,
1600         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1601         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1602         .lfsck_post             = lfsck_namespace_post,
1603         .lfsck_dump             = lfsck_namespace_dump,
1604         .lfsck_double_scan      = lfsck_namespace_double_scan,
1605         .lfsck_in_notify        = lfsck_namespace_in_notify,
1606         .lfsck_query            = lfsck_namespace_query,
1607 };
1608
1609 /**
1610  * Verify the specified linkEA entry for the given directory object.
1611  * If the object has no such linkEA entry or it has more other linkEA
1612  * entries, then re-generate the linkEA with the given information.
1613  *
1614  * \param[in] env       pointer to the thread context
1615  * \param[in] dev       pointer to the dt_device
1616  * \param[in] obj       pointer to the dt_object to be handled
1617  * \param[in] cname     the name for the child in the parent directory
1618  * \param[in] pfid      the parent directory's FID for the linkEA
1619  *
1620  * \retval              0 for success
1621  * \retval              negative error number on failure
1622  */
1623 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
1624                         struct dt_object *obj, const struct lu_name *cname,
1625                         const struct lu_fid *pfid)
1626 {
1627         struct linkea_data       ldata  = { 0 };
1628         struct lu_buf            linkea_buf;
1629         struct thandle          *th;
1630         int                      rc;
1631         int                      fl     = LU_XATTR_CREATE;
1632         bool                     dirty  = false;
1633         ENTRY;
1634
1635         LASSERT(S_ISDIR(lfsck_object_type(obj)));
1636
1637         rc = lfsck_links_read(env, obj, &ldata);
1638         if (rc == -ENODATA) {
1639                 dirty = true;
1640         } else if (rc == 0) {
1641                 fl = LU_XATTR_REPLACE;
1642                 if (ldata.ld_leh->leh_reccount != 1) {
1643                         dirty = true;
1644                 } else {
1645                         rc = linkea_links_find(&ldata, cname, pfid);
1646                         if (rc != 0)
1647                                 dirty = true;
1648                 }
1649         }
1650
1651         if (!dirty)
1652                 RETURN(rc);
1653
1654         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
1655         if (rc != 0)
1656                 RETURN(rc);
1657
1658         rc = linkea_add_buf(&ldata, cname, pfid);
1659         if (rc != 0)
1660                 RETURN(rc);
1661
1662         linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1663         linkea_buf.lb_len = ldata.ld_leh->leh_len;
1664         th = dt_trans_create(env, dev);
1665         if (IS_ERR(th))
1666                 RETURN(PTR_ERR(th));
1667
1668         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1669                                   XATTR_NAME_LINK, fl, th);
1670         if (rc != 0)
1671                 GOTO(stop, rc = PTR_ERR(th));
1672
1673         rc = dt_trans_start_local(env, dev, th);
1674         if (rc != 0)
1675                 GOTO(stop, rc);
1676
1677         dt_write_lock(env, obj, 0);
1678         rc = dt_xattr_set(env, obj, &linkea_buf,
1679                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
1680         dt_write_unlock(env, obj);
1681
1682         GOTO(stop, rc);
1683
1684 stop:
1685         dt_trans_stop(env, dev, th);
1686         return rc;
1687 }
1688
1689 int lfsck_namespace_setup(const struct lu_env *env,
1690                           struct lfsck_instance *lfsck)
1691 {
1692         struct lfsck_component  *com;
1693         struct lfsck_namespace  *ns;
1694         struct dt_object        *root = NULL;
1695         struct dt_object        *obj;
1696         int                      rc;
1697         ENTRY;
1698
1699         LASSERT(lfsck->li_master);
1700
1701         OBD_ALLOC_PTR(com);
1702         if (com == NULL)
1703                 RETURN(-ENOMEM);
1704
1705         CFS_INIT_LIST_HEAD(&com->lc_link);
1706         CFS_INIT_LIST_HEAD(&com->lc_link_dir);
1707         init_rwsem(&com->lc_sem);
1708         atomic_set(&com->lc_ref, 1);
1709         com->lc_lfsck = lfsck;
1710         com->lc_type = LFSCK_TYPE_NAMESPACE;
1711         com->lc_ops = &lfsck_namespace_ops;
1712         com->lc_file_size = sizeof(struct lfsck_namespace);
1713         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
1714         if (com->lc_file_ram == NULL)
1715                 GOTO(out, rc = -ENOMEM);
1716
1717         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
1718         if (com->lc_file_disk == NULL)
1719                 GOTO(out, rc = -ENOMEM);
1720
1721         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
1722         if (IS_ERR(root))
1723                 GOTO(out, rc = PTR_ERR(root));
1724
1725         if (unlikely(!dt_try_as_dir(env, root)))
1726                 GOTO(out, rc = -ENOTDIR);
1727
1728         obj = local_index_find_or_create(env, lfsck->li_los, root,
1729                                          lfsck_namespace_name,
1730                                          S_IFREG | S_IRUGO | S_IWUSR,
1731                                          &dt_lfsck_features);
1732         if (IS_ERR(obj))
1733                 GOTO(out, rc = PTR_ERR(obj));
1734
1735         com->lc_obj = obj;
1736         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
1737         if (rc != 0)
1738                 GOTO(out, rc);
1739
1740         rc = lfsck_namespace_load(env, com);
1741         if (rc > 0)
1742                 rc = lfsck_namespace_reset(env, com, true);
1743         else if (rc == -ENODATA)
1744                 rc = lfsck_namespace_init(env, com);
1745         if (rc != 0)
1746                 GOTO(out, rc);
1747
1748         ns = com->lc_file_ram;
1749         switch (ns->ln_status) {
1750         case LS_INIT:
1751         case LS_COMPLETED:
1752         case LS_FAILED:
1753         case LS_STOPPED:
1754                 spin_lock(&lfsck->li_lock);
1755                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1756                 spin_unlock(&lfsck->li_lock);
1757                 break;
1758         default:
1759                 CERROR("%s: unknown lfsck_namespace status %d\n",
1760                        lfsck_lfsck2name(lfsck), ns->ln_status);
1761                 /* fall through */
1762         case LS_SCANNING_PHASE1:
1763         case LS_SCANNING_PHASE2:
1764                 /* No need to store the status to disk right now.
1765                  * If the system crashed before the status stored,
1766                  * it will be loaded back when next time. */
1767                 ns->ln_status = LS_CRASHED;
1768                 /* fall through */
1769         case LS_PAUSED:
1770         case LS_CRASHED:
1771                 spin_lock(&lfsck->li_lock);
1772                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_scan);
1773                 cfs_list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
1774                 spin_unlock(&lfsck->li_lock);
1775                 break;
1776         }
1777
1778         GOTO(out, rc = 0);
1779
1780 out:
1781         if (root != NULL && !IS_ERR(root))
1782                 lu_object_put(env, &root->do_lu);
1783         if (rc != 0) {
1784                 lfsck_component_cleanup(env, com);
1785                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
1786                        lfsck_lfsck2name(lfsck), rc);
1787         }
1788         return rc;
1789 }