Whamcloud - gitweb
LU-4788 lfsck: replace cfs_list_t with list_head
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 static const char lfsck_namespace_name[] = "lfsck_namespace";
47
48 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
49                                       struct lfsck_namespace *src)
50 {
51         dst->ln_magic = le32_to_cpu(src->ln_magic);
52         dst->ln_status = le32_to_cpu(src->ln_status);
53         dst->ln_flags = le32_to_cpu(src->ln_flags);
54         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
55         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
56         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
57         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
58         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
59         dst->ln_time_last_checkpoint =
60                                 le64_to_cpu(src->ln_time_last_checkpoint);
61         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
62                                  &src->ln_pos_latest_start);
63         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
64                                  &src->ln_pos_last_checkpoint);
65         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
66                                  &src->ln_pos_first_inconsistent);
67         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
68         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
69         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
70         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
71         dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
72         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
73         dst->ln_objs_repaired_phase2 =
74                                 le64_to_cpu(src->ln_objs_repaired_phase2);
75         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
76         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
77         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
78         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
79                       &src->ln_fid_latest_scanned_phase2);
80         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
81         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
82 }
83
84 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
85                                       struct lfsck_namespace *src)
86 {
87         dst->ln_magic = cpu_to_le32(src->ln_magic);
88         dst->ln_status = cpu_to_le32(src->ln_status);
89         dst->ln_flags = cpu_to_le32(src->ln_flags);
90         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
91         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
92         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
93         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
94         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
95         dst->ln_time_last_checkpoint =
96                                 cpu_to_le64(src->ln_time_last_checkpoint);
97         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
98                                  &src->ln_pos_latest_start);
99         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
100                                  &src->ln_pos_last_checkpoint);
101         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
102                                  &src->ln_pos_first_inconsistent);
103         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
104         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
105         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
106         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
107         dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
108         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
109         dst->ln_objs_repaired_phase2 =
110                                 cpu_to_le64(src->ln_objs_repaired_phase2);
111         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
112         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
113         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
114         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
115                       &src->ln_fid_latest_scanned_phase2);
116         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
117         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
118 }
119
120 /**
121  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
122  * \retval 0: succeed.
123  * \retval -ve: failed cases.
124  */
125 static int lfsck_namespace_load(const struct lu_env *env,
126                                 struct lfsck_component *com)
127 {
128         int len = com->lc_file_size;
129         int rc;
130
131         rc = dt_xattr_get(env, com->lc_obj,
132                           lfsck_buf_get(env, com->lc_file_disk, len),
133                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
134         if (rc == len) {
135                 struct lfsck_namespace *ns = com->lc_file_ram;
136
137                 lfsck_namespace_le_to_cpu(ns,
138                                 (struct lfsck_namespace *)com->lc_file_disk);
139                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
140                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
141                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
142                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
143                         rc = 1;
144                 } else {
145                         rc = 0;
146                 }
147         } else if (rc != -ENODATA) {
148                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
149                        "expected = %d: rc = %d\n",
150                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
151                 if (rc >= 0)
152                         rc = 1;
153         }
154         return rc;
155 }
156
157 static int lfsck_namespace_store(const struct lu_env *env,
158                                  struct lfsck_component *com, bool init)
159 {
160         struct dt_object        *obj    = com->lc_obj;
161         struct lfsck_instance   *lfsck  = com->lc_lfsck;
162         struct thandle          *handle;
163         int                      len    = com->lc_file_size;
164         int                      rc;
165         ENTRY;
166
167         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
168                                   (struct lfsck_namespace *)com->lc_file_ram);
169         handle = dt_trans_create(env, lfsck->li_bottom);
170         if (IS_ERR(handle))
171                 GOTO(log, rc = PTR_ERR(handle));
172
173         rc = dt_declare_xattr_set(env, obj,
174                                   lfsck_buf_get(env, com->lc_file_disk, len),
175                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
176         if (rc != 0)
177                 GOTO(out, rc);
178
179         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
180         if (rc != 0)
181                 GOTO(out, rc);
182
183         rc = dt_xattr_set(env, obj,
184                           lfsck_buf_get(env, com->lc_file_disk, len),
185                           XATTR_NAME_LFSCK_NAMESPACE,
186                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
187                           handle, BYPASS_CAPA);
188
189         GOTO(out, rc);
190
191 out:
192         dt_trans_stop(env, lfsck->li_bottom, handle);
193
194 log:
195         if (rc != 0)
196                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
197                        lfsck_lfsck2name(lfsck), rc);
198         return rc;
199 }
200
201 static int lfsck_namespace_init(const struct lu_env *env,
202                                 struct lfsck_component *com)
203 {
204         struct lfsck_namespace *ns = com->lc_file_ram;
205         int rc;
206
207         memset(ns, 0, sizeof(*ns));
208         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
209         ns->ln_status = LS_INIT;
210         down_write(&com->lc_sem);
211         rc = lfsck_namespace_store(env, com, true);
212         up_write(&com->lc_sem);
213         return rc;
214 }
215
216 static int lfsck_namespace_lookup(const struct lu_env *env,
217                                   struct lfsck_component *com,
218                                   const struct lu_fid *fid, __u8 *flags)
219 {
220         struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
221         int            rc;
222
223         fid_cpu_to_be(key, fid);
224         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
225                        (const struct dt_key *)key, BYPASS_CAPA);
226         return rc;
227 }
228
229 static int lfsck_namespace_delete(const struct lu_env *env,
230                                   struct lfsck_component *com,
231                                   const struct lu_fid *fid)
232 {
233         struct lfsck_instance   *lfsck  = com->lc_lfsck;
234         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
235         struct thandle          *handle;
236         struct dt_object        *obj    = com->lc_obj;
237         int                      rc;
238         ENTRY;
239
240         handle = dt_trans_create(env, lfsck->li_bottom);
241         if (IS_ERR(handle))
242                 RETURN(PTR_ERR(handle));
243
244         rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
245         if (rc != 0)
246                 GOTO(out, rc);
247
248         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
249         if (rc != 0)
250                 GOTO(out, rc);
251
252         fid_cpu_to_be(key, fid);
253         rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
254                        BYPASS_CAPA);
255
256         GOTO(out, rc);
257
258 out:
259         dt_trans_stop(env, lfsck->li_bottom, handle);
260         return rc;
261 }
262
263 static int lfsck_namespace_update(const struct lu_env *env,
264                                   struct lfsck_component *com,
265                                   const struct lu_fid *fid,
266                                   __u8 flags, bool force)
267 {
268         struct lfsck_instance   *lfsck  = com->lc_lfsck;
269         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
270         struct thandle          *handle;
271         struct dt_object        *obj    = com->lc_obj;
272         int                      rc;
273         bool                     exist  = false;
274         __u8                     tf;
275         ENTRY;
276
277         rc = lfsck_namespace_lookup(env, com, fid, &tf);
278         if (rc != 0 && rc != -ENOENT)
279                 RETURN(rc);
280
281         if (rc == 0) {
282                 if (!force || flags == tf)
283                         RETURN(0);
284
285                 exist = true;
286                 handle = dt_trans_create(env, lfsck->li_bottom);
287                 if (IS_ERR(handle))
288                         RETURN(PTR_ERR(handle));
289
290                 rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
291                                        handle);
292                 if (rc != 0)
293                         GOTO(out, rc);
294         } else {
295                 handle = dt_trans_create(env, lfsck->li_bottom);
296                 if (IS_ERR(handle))
297                         RETURN(PTR_ERR(handle));
298         }
299
300         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
301                                (const struct dt_key *)fid, handle);
302         if (rc != 0)
303                 GOTO(out, rc);
304
305         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
306         if (rc != 0)
307                 GOTO(out, rc);
308
309         fid_cpu_to_be(key, fid);
310         if (exist) {
311                 rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
312                                BYPASS_CAPA);
313                 if (rc != 0)
314                         GOTO(out, rc);
315         }
316
317         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
318                        (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
319
320         GOTO(out, rc);
321
322 out:
323         dt_trans_stop(env, lfsck->li_bottom, handle);
324         return rc;
325 }
326
327 static int lfsck_namespace_check_exist(const struct lu_env *env,
328                                        struct lfsck_instance *lfsck,
329                                        struct dt_object *obj, const char *name)
330 {
331         struct dt_object *dir = lfsck->li_obj_dir;
332         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
333         int               rc;
334         ENTRY;
335
336         if (unlikely(lfsck_is_dead_obj(obj)))
337                 RETURN(LFSCK_NAMEENTRY_DEAD);
338
339         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
340                        (const struct dt_key *)name, BYPASS_CAPA);
341         if (rc == -ENOENT)
342                 RETURN(LFSCK_NAMEENTRY_REMOVED);
343
344         if (rc < 0)
345                 RETURN(rc);
346
347         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
348                 RETURN(LFSCK_NAMEENTRY_RECREATED);
349
350         RETURN(0);
351 }
352
353 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
354                                             struct dt_object *obj,
355                                             struct thandle *handle)
356 {
357         int rc;
358
359         /* For destroying all invalid linkEA entries. */
360         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
361         if (rc != 0)
362                 return rc;
363
364         /* For insert new linkEA entry. */
365         rc = dt_declare_xattr_set(env, obj,
366                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
367                         XATTR_NAME_LINK, 0, handle);
368         return rc;
369 }
370
371 static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
372                             struct linkea_data *ldata)
373 {
374         int rc;
375
376         ldata->ld_buf =
377                 lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
378                                        PAGE_CACHE_SIZE);
379         if (ldata->ld_buf->lb_buf == NULL)
380                 return -ENOMEM;
381
382         if (!dt_object_exists(obj))
383                 return -ENODATA;
384
385         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
386         if (rc == -ERANGE) {
387                 /* Buf was too small, figure out what we need. */
388                 lu_buf_free(ldata->ld_buf);
389                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
390                                   BYPASS_CAPA);
391                 if (rc < 0)
392                         return rc;
393
394                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
395                 if (ldata->ld_buf->lb_buf == NULL)
396                         return -ENOMEM;
397
398                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
399                                   BYPASS_CAPA);
400         }
401         if (rc < 0)
402                 return rc;
403
404         linkea_init(ldata);
405
406         return 0;
407 }
408
409 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
410                              struct linkea_data *ldata, struct thandle *handle)
411 {
412         const struct lu_buf *buf = lfsck_buf_get_const(env,
413                                                        ldata->ld_buf->lb_buf,
414                                                        ldata->ld_leh->leh_len);
415
416         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
417                             BYPASS_CAPA);
418 }
419
420 /**
421  * \retval ve: removed entries
422  */
423 static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
424                                      struct linkea_data *ldata,
425                                      struct lu_name *cname,
426                                      struct lu_fid *pfid)
427 {
428         struct link_ea_entry    *oldlee;
429         int                      oldlen;
430         int                      removed = 0;
431
432         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
433         oldlee = ldata->ld_lee;
434         oldlen = ldata->ld_reclen;
435         linkea_next_entry(ldata);
436         while (ldata->ld_lee != NULL) {
437                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
438                                    ldata->ld_lee->lee_reclen[1];
439                 if (unlikely(ldata->ld_reclen == oldlen &&
440                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
441                         linkea_del_buf(ldata, cname);
442                         removed++;
443                 } else {
444                         linkea_next_entry(ldata);
445                 }
446         }
447         ldata->ld_lee = oldlee;
448         ldata->ld_reclen = oldlen;
449         return removed;
450 }
451
452 /**
453  * \retval +ve  repaired
454  * \retval 0    no need to repair
455  * \retval -ve  error cases
456  */
457 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
458                                            struct lfsck_component *com,
459                                            struct dt_object *child, __u8 flags)
460 {
461         struct lfsck_thread_info *info    = lfsck_env_info(env);
462         struct lu_attr           *la      = &info->lti_la;
463         struct lu_name           *cname   = &info->lti_name;
464         struct lu_fid            *pfid    = &info->lti_fid;
465         struct lu_fid            *cfid    = &info->lti_fid2;
466         struct lfsck_instance   *lfsck    = com->lc_lfsck;
467         struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
468         struct lfsck_namespace  *ns       = com->lc_file_ram;
469         struct linkea_data       ldata    = { 0 };
470         struct thandle          *handle   = NULL;
471         bool                     locked   = false;
472         bool                     update   = false;
473         int                      rc;
474         ENTRY;
475
476         if (com->lc_journal) {
477
478 again:
479                 LASSERT(!locked);
480
481                 update = false;
482                 com->lc_journal = 1;
483                 handle = dt_trans_create(env, lfsck->li_next);
484                 if (IS_ERR(handle))
485                         RETURN(rc = PTR_ERR(handle));
486
487                 rc = dt_declare_xattr_set(env, child,
488                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
489                         XATTR_NAME_LINK, 0, handle);
490                 if (rc != 0)
491                         GOTO(stop, rc);
492
493                 rc = dt_trans_start(env, lfsck->li_next, handle);
494                 if (rc != 0)
495                         GOTO(stop, rc);
496
497                 dt_write_lock(env, child, MOR_TGT_CHILD);
498                 locked = true;
499         }
500
501         if (unlikely(lfsck_is_dead_obj(child)))
502                 GOTO(stop, rc = 0);
503
504         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
505         if (rc == 0)
506                 rc = lfsck_links_read(env, child, &ldata);
507         if (rc != 0) {
508                 if ((bk->lb_param & LPF_DRYRUN) &&
509                     (rc == -EINVAL || rc == -ENODATA))
510                         rc = 1;
511
512                 GOTO(stop, rc);
513         }
514
515         linkea_first_entry(&ldata);
516         while (ldata.ld_lee != NULL) {
517                 struct dt_object *parent = NULL;
518
519                 rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
520                 if (rc > 0)
521                         update = true;
522
523                 if (!fid_is_sane(pfid))
524                         goto shrink;
525
526                 parent = lfsck_object_find(env, lfsck, pfid);
527                 if (parent == NULL)
528                         goto shrink;
529                 else if (IS_ERR(parent))
530                         GOTO(stop, rc = PTR_ERR(parent));
531
532                 if (!dt_object_exists(parent))
533                         goto shrink;
534
535                 /* XXX: Currently, skip remote object, the consistency for
536                  *      remote object will be processed in LFSCK phase III. */
537                 if (dt_object_remote(parent)) {
538                         lfsck_object_put(env, parent);
539                         linkea_next_entry(&ldata);
540                         continue;
541                 }
542
543                 if (unlikely(!dt_try_as_dir(env, parent)))
544                         goto shrink;
545
546                 /* To guarantee the 'name' is terminated with '0'. */
547                 memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
548                 info->lti_key[cname->ln_namelen] = 0;
549                 cname->ln_name = info->lti_key;
550                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
551                                (const struct dt_key *)cname->ln_name,
552                                BYPASS_CAPA);
553                 if (rc != 0 && rc != -ENOENT) {
554                         lfsck_object_put(env, parent);
555                         GOTO(stop, rc);
556                 }
557
558                 if (rc == 0) {
559                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
560                                 lfsck_object_put(env, parent);
561                                 linkea_next_entry(&ldata);
562                                 continue;
563                         }
564
565                         goto shrink;
566                 }
567
568                 /* If there is no name entry in the parent dir and the object
569                  * link count is less than the linkea entries count, then the
570                  * linkea entry should be removed. */
571                 if (ldata.ld_leh->leh_reccount > la->la_nlink)
572                         goto shrink;
573
574                 /* XXX: For the case of there is a linkea entry, but without
575                  *      name entry pointing to the object and its hard links
576                  *      count is not less than the object name entries count,
577                  *      then seems we should add the 'missed' name entry back
578                  *      to namespace, but before LFSCK phase III finished, we
579                  *      do not know whether the object has some inconsistency
580                  *      on other MDTs. So now, do NOT add the name entry back
581                  *      to the namespace, but keep the linkEA entry. LU-2914 */
582                 lfsck_object_put(env, parent);
583                 linkea_next_entry(&ldata);
584                 continue;
585
586 shrink:
587                 if (parent != NULL)
588                         lfsck_object_put(env, parent);
589                 if (bk->lb_param & LPF_DRYRUN)
590                         RETURN(1);
591
592                 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
593                       "for the object: "DFID", parent "DFID", name %.*s\n",
594                       lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)),
595                       PFID(pfid), cname->ln_namelen, cname->ln_name);
596
597                 linkea_del_buf(&ldata, cname);
598                 update = true;
599         }
600
601         if (update) {
602                 if (!com->lc_journal) {
603                         com->lc_journal = 1;
604                         goto again;
605                 }
606
607                 rc = lfsck_links_write(env, child, &ldata, handle);
608         }
609
610         GOTO(stop, rc);
611
612 stop:
613         if (locked) {
614         /* XXX: For the case linkea entries count does not match the object hard
615          *      links count, we cannot update the later one simply. Before LFSCK
616          *      phase III finished, we cannot know whether there are some remote
617          *      name entries to be repaired or not. LU-2914 */
618                 if (rc == 0 && !lfsck_is_dead_obj(child) &&
619                     ldata.ld_leh != NULL &&
620                     ldata.ld_leh->leh_reccount != la->la_nlink)
621                         CDEBUG(D_LFSCK, "%s: the object "DFID" linkEA entry "
622                                "count %u may not match its hardlink count %u\n",
623                                lfsck_lfsck2name(lfsck), PFID(cfid),
624                                ldata.ld_leh->leh_reccount, la->la_nlink);
625
626                 dt_write_unlock(env, child);
627         }
628
629         if (handle != NULL)
630                 dt_trans_stop(env, lfsck->li_next, handle);
631
632         if (rc == 0 && update) {
633                 ns->ln_objs_nlink_repaired++;
634                 rc = 1;
635         }
636
637         return rc;
638 }
639
640 /* namespace APIs */
641
642 static int lfsck_namespace_reset(const struct lu_env *env,
643                                  struct lfsck_component *com, bool init)
644 {
645         struct lfsck_instance   *lfsck = com->lc_lfsck;
646         struct lfsck_namespace  *ns    = com->lc_file_ram;
647         struct dt_object        *root;
648         struct dt_object        *dto;
649         int                      rc;
650         ENTRY;
651
652         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
653         if (IS_ERR(root))
654                 GOTO(log, rc = PTR_ERR(root));
655
656         if (unlikely(!dt_try_as_dir(env, root)))
657                 GOTO(put, rc = -ENOTDIR);
658
659         down_write(&com->lc_sem);
660         if (init) {
661                 memset(ns, 0, sizeof(*ns));
662         } else {
663                 __u32 count = ns->ln_success_count;
664                 __u64 last_time = ns->ln_time_last_complete;
665
666                 memset(ns, 0, sizeof(*ns));
667                 ns->ln_success_count = count;
668                 ns->ln_time_last_complete = last_time;
669         }
670         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
671         ns->ln_status = LS_INIT;
672
673         rc = local_object_unlink(env, lfsck->li_bottom, root,
674                                  lfsck_namespace_name);
675         if (rc != 0)
676                 GOTO(out, rc);
677
678         lfsck_object_put(env, com->lc_obj);
679         com->lc_obj = NULL;
680         dto = local_index_find_or_create(env, lfsck->li_los, root,
681                                          lfsck_namespace_name,
682                                          S_IFREG | S_IRUGO | S_IWUSR,
683                                          &dt_lfsck_features);
684         if (IS_ERR(dto))
685                 GOTO(out, rc = PTR_ERR(dto));
686
687         com->lc_obj = dto;
688         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
689         if (rc != 0)
690                 GOTO(out, rc);
691
692         rc = lfsck_namespace_store(env, com, true);
693
694         GOTO(out, rc);
695
696 out:
697         up_write(&com->lc_sem);
698
699 put:
700         lu_object_put(env, &root->do_lu);
701 log:
702         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
703                lfsck_lfsck2name(lfsck), rc);
704         return rc;
705 }
706
707 static void
708 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
709                      bool new_checked)
710 {
711         struct lfsck_namespace *ns = com->lc_file_ram;
712
713         down_write(&com->lc_sem);
714         if (new_checked)
715                 com->lc_new_checked++;
716         ns->ln_items_failed++;
717         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
718                 lfsck_pos_fill(env, com->lc_lfsck,
719                                &ns->ln_pos_first_inconsistent, false);
720
721                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
722                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
723                        lfsck_lfsck2name(com->lc_lfsck),
724                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
725                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
726                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
727         }
728         up_write(&com->lc_sem);
729 }
730
731 static int lfsck_namespace_checkpoint(const struct lu_env *env,
732                                       struct lfsck_component *com, bool init)
733 {
734         struct lfsck_instance   *lfsck = com->lc_lfsck;
735         struct lfsck_namespace  *ns    = com->lc_file_ram;
736         int                      rc;
737
738         if (com->lc_new_checked == 0 && !init)
739                 return 0;
740
741         down_write(&com->lc_sem);
742         if (init) {
743                 ns->ln_pos_latest_start = lfsck->li_pos_current;
744         } else {
745                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
746                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
747                                 HALF_SEC - lfsck->li_time_last_checkpoint);
748                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
749                 ns->ln_items_checked += com->lc_new_checked;
750                 com->lc_new_checked = 0;
751         }
752
753         rc = lfsck_namespace_store(env, com, false);
754         up_write(&com->lc_sem);
755
756         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
757                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
758                lfsck->li_pos_current.lp_oit_cookie,
759                PFID(&lfsck->li_pos_current.lp_dir_parent),
760                lfsck->li_pos_current.lp_dir_cookie, rc);
761
762         return rc;
763 }
764
765 static int lfsck_namespace_prep(const struct lu_env *env,
766                                 struct lfsck_component *com,
767                                 struct lfsck_start_param *lsp)
768 {
769         struct lfsck_instance   *lfsck  = com->lc_lfsck;
770         struct lfsck_namespace  *ns     = com->lc_file_ram;
771         struct lfsck_position   *pos    = &com->lc_pos_start;
772
773         if (ns->ln_status == LS_COMPLETED) {
774                 int rc;
775
776                 rc = lfsck_namespace_reset(env, com, false);
777                 if (rc == 0)
778                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
779
780                 if (rc != 0) {
781                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
782                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
783
784                         return rc;
785                 }
786         }
787
788         down_write(&com->lc_sem);
789         ns->ln_time_latest_start = cfs_time_current_sec();
790
791         spin_lock(&lfsck->li_lock);
792         if (ns->ln_flags & LF_SCANNED_ONCE) {
793                 if (!lfsck->li_drop_dryrun ||
794                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
795                         ns->ln_status = LS_SCANNING_PHASE2;
796                         list_move_tail(&com->lc_link,
797                                        &lfsck->li_list_double_scan);
798                         if (!list_empty(&com->lc_link_dir))
799                                 list_del_init(&com->lc_link_dir);
800                         lfsck_pos_set_zero(pos);
801                 } else {
802                         ns->ln_status = LS_SCANNING_PHASE1;
803                         ns->ln_run_time_phase1 = 0;
804                         ns->ln_run_time_phase2 = 0;
805                         ns->ln_items_checked = 0;
806                         ns->ln_items_repaired = 0;
807                         ns->ln_items_failed = 0;
808                         ns->ln_dirs_checked = 0;
809                         ns->ln_mlinked_checked = 0;
810                         ns->ln_objs_checked_phase2 = 0;
811                         ns->ln_objs_repaired_phase2 = 0;
812                         ns->ln_objs_failed_phase2 = 0;
813                         ns->ln_objs_nlink_repaired = 0;
814                         ns->ln_objs_lost_found = 0;
815                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
816                         if (list_empty(&com->lc_link_dir))
817                                 list_add_tail(&com->lc_link_dir,
818                                               &lfsck->li_list_dir);
819                         *pos = ns->ln_pos_first_inconsistent;
820                 }
821         } else {
822                 ns->ln_status = LS_SCANNING_PHASE1;
823                 if (list_empty(&com->lc_link_dir))
824                         list_add_tail(&com->lc_link_dir,
825                                       &lfsck->li_list_dir);
826                 if (!lfsck->li_drop_dryrun ||
827                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
828                         *pos = ns->ln_pos_last_checkpoint;
829                         pos->lp_oit_cookie++;
830                 } else {
831                         *pos = ns->ln_pos_first_inconsistent;
832                 }
833         }
834         spin_unlock(&lfsck->li_lock);
835         up_write(&com->lc_sem);
836
837         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
838                DFID", "LPX64"]\n", lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
839                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
840
841         return 0;
842 }
843
844 static int lfsck_namespace_exec_oit(const struct lu_env *env,
845                                     struct lfsck_component *com,
846                                     struct dt_object *obj)
847 {
848         down_write(&com->lc_sem);
849         com->lc_new_checked++;
850         if (S_ISDIR(lfsck_object_type(obj)))
851                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
852         up_write(&com->lc_sem);
853         return 0;
854 }
855
856 static int lfsck_namespace_exec_dir(const struct lu_env *env,
857                                     struct lfsck_component *com,
858                                     struct dt_object *obj,
859                                     struct lu_dirent *ent)
860 {
861         struct lfsck_thread_info   *info     = lfsck_env_info(env);
862         struct lu_attr             *la       = &info->lti_la;
863         struct lfsck_instance      *lfsck    = com->lc_lfsck;
864         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
865         struct lfsck_namespace     *ns       = com->lc_file_ram;
866         struct linkea_data          ldata    = { 0 };
867         const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
868         const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
869         const struct lu_name       *cname;
870         struct thandle             *handle   = NULL;
871         bool                        repaired = false;
872         bool                        locked   = false;
873         bool                        remove;
874         bool                        newdata;
875         bool                        log      = false;
876         int                         count    = 0;
877         int                         rc;
878         ENTRY;
879
880         cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
881         down_write(&com->lc_sem);
882         com->lc_new_checked++;
883
884         if (ent->lde_attrs & LUDA_UPGRADE) {
885                 ns->ln_flags |= LF_UPGRADE;
886                 ns->ln_dirent_repaired++;
887                 repaired = true;
888         } else if (ent->lde_attrs & LUDA_REPAIR) {
889                 ns->ln_flags |= LF_INCONSISTENT;
890                 ns->ln_dirent_repaired++;
891                 repaired = true;
892         }
893
894         if (ent->lde_name[0] == '.' &&
895             (ent->lde_namelen == 1 ||
896              (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
897              fid_seq_is_dot_lustre(fid_seq(&ent->lde_fid))))
898                 GOTO(out, rc = 0);
899
900         if (!(bk->lb_param & LPF_DRYRUN) &&
901             (com->lc_journal || repaired)) {
902
903 again:
904                 LASSERT(!locked);
905
906                 com->lc_journal = 1;
907                 handle = dt_trans_create(env, lfsck->li_next);
908                 if (IS_ERR(handle))
909                         GOTO(out, rc = PTR_ERR(handle));
910
911                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
912                 if (rc != 0)
913                         GOTO(stop, rc);
914
915                 rc = dt_trans_start(env, lfsck->li_next, handle);
916                 if (rc != 0)
917                         GOTO(stop, rc);
918
919                 dt_write_lock(env, obj, MOR_TGT_CHILD);
920                 locked = true;
921         }
922
923         rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
924         if (rc != 0)
925                 GOTO(stop, rc);
926
927         rc = lfsck_links_read(env, obj, &ldata);
928         if (rc == 0) {
929                 count = ldata.ld_leh->leh_reccount;
930                 rc = linkea_links_find(&ldata, cname, pfid);
931                 if ((rc == 0) &&
932                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
933                         goto record;
934
935                 ns->ln_flags |= LF_INCONSISTENT;
936                 /* For dir, if there are more than one linkea entries, or the
937                  * linkea entry does not match the name entry, then remove all
938                  * and add the correct one. */
939                 if (S_ISDIR(lfsck_object_type(obj))) {
940                         remove = true;
941                         newdata = true;
942                 } else {
943                         remove = false;
944                         newdata = false;
945                 }
946                 goto nodata;
947         } else if (unlikely(rc == -EINVAL)) {
948                 count = 1;
949                 ns->ln_flags |= LF_INCONSISTENT;
950                 /* The magic crashed, we are not sure whether there are more
951                  * corrupt data in the linkea, so remove all linkea entries. */
952                 remove = true;
953                 newdata = true;
954                 goto nodata;
955         } else if (rc == -ENODATA) {
956                 count = 1;
957                 ns->ln_flags |= LF_UPGRADE;
958                 remove = false;
959                 newdata = true;
960
961 nodata:
962                 if (bk->lb_param & LPF_DRYRUN) {
963                         ns->ln_linkea_repaired++;
964                         log = true;
965                         repaired = true;
966                         goto record;
967                 }
968
969                 if (!com->lc_journal)
970                         goto again;
971
972                 if (remove) {
973                         LASSERT(newdata);
974
975                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
976                                           BYPASS_CAPA);
977                         if (rc != 0)
978                                 GOTO(stop, rc);
979                 }
980
981                 if (newdata) {
982                         rc = linkea_data_new(&ldata,
983                                         &lfsck_env_info(env)->lti_linkea_buf);
984                         if (rc != 0)
985                                 GOTO(stop, rc);
986                 }
987
988                 rc = linkea_add_buf(&ldata, cname, pfid);
989                 if (rc != 0)
990                         GOTO(stop, rc);
991
992                 rc = lfsck_links_write(env, obj, &ldata, handle);
993                 if (rc != 0)
994                         GOTO(stop, rc);
995
996                 count = ldata.ld_leh->leh_reccount;
997                 ns->ln_linkea_repaired++;
998                 log = true;
999                 repaired = true;
1000         } else {
1001                 GOTO(stop, rc);
1002         }
1003
1004 record:
1005         LASSERT(count > 0);
1006
1007         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1008         if (rc != 0)
1009                 GOTO(stop, rc);
1010
1011         if ((count == 1) &&
1012             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
1013                 /* Usually, it is for single linked object or dir, do nothing.*/
1014                 GOTO(stop, rc);
1015
1016         /* Following modification will be in another transaction.  */
1017         if (handle != NULL) {
1018                 LASSERT(dt_write_locked(env, obj));
1019
1020                 dt_write_unlock(env, obj);
1021                 locked = false;
1022
1023                 dt_trans_stop(env, lfsck->li_next, handle);
1024                 handle = NULL;
1025
1026                 if (log)
1027                         CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired "
1028                               "linkEA for the object: "DFID", parent "
1029                               DFID", name %.*s\n",
1030                               lfsck_lfsck2name(lfsck), PFID(cfid), PFID(pfid),
1031                               ent->lde_namelen, ent->lde_name);
1032         }
1033
1034         ns->ln_mlinked_checked++;
1035         rc = lfsck_namespace_update(env, com, cfid,
1036                         count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
1037
1038         GOTO(out, rc);
1039
1040 stop:
1041         if (locked)
1042                 dt_write_unlock(env, obj);
1043
1044         if (handle != NULL)
1045                 dt_trans_stop(env, lfsck->li_next, handle);
1046
1047 out:
1048         if (rc < 0) {
1049                 CDEBUG(D_LFSCK, "%s: namespace LFSCK exec_dir failed, "
1050                        "parent "DFID", child name %.*s, child FID "DFID
1051                        ": rc = %d\n", lfsck_lfsck2name(lfsck), PFID(pfid),
1052                        ent->lde_namelen, ent->lde_name, PFID(cfid), rc);
1053
1054                 ns->ln_items_failed++;
1055                 if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1056                         lfsck_pos_fill(env, lfsck,
1057                                        &ns->ln_pos_first_inconsistent, false);
1058                 if (!(bk->lb_param & LPF_FAILOUT))
1059                         rc = 0;
1060         } else {
1061                 if (repaired) {
1062                         ns->ln_items_repaired++;
1063                         if (bk->lb_param & LPF_DRYRUN &&
1064                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1065                                 lfsck_pos_fill(env, lfsck,
1066                                                &ns->ln_pos_first_inconsistent,
1067                                                false);
1068                 } else {
1069                         com->lc_journal = 0;
1070                 }
1071                 rc = 0;
1072         }
1073         up_write(&com->lc_sem);
1074         return rc;
1075 }
1076
1077 static int lfsck_namespace_post(const struct lu_env *env,
1078                                 struct lfsck_component *com,
1079                                 int result, bool init)
1080 {
1081         struct lfsck_instance   *lfsck = com->lc_lfsck;
1082         struct lfsck_namespace  *ns    = com->lc_file_ram;
1083         int                      rc;
1084
1085         down_write(&com->lc_sem);
1086         spin_lock(&lfsck->li_lock);
1087         if (!init)
1088                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
1089         if (result > 0) {
1090                 ns->ln_status = LS_SCANNING_PHASE2;
1091                 ns->ln_flags |= LF_SCANNED_ONCE;
1092                 ns->ln_flags &= ~LF_UPGRADE;
1093                 list_del_init(&com->lc_link_dir);
1094                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
1095         } else if (result == 0) {
1096                 ns->ln_status = lfsck->li_status;
1097                 if (ns->ln_status == 0)
1098                         ns->ln_status = LS_STOPPED;
1099                 if (ns->ln_status != LS_PAUSED) {
1100                         list_del_init(&com->lc_link_dir);
1101                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1102                 }
1103         } else {
1104                 ns->ln_status = LS_FAILED;
1105                 list_del_init(&com->lc_link_dir);
1106                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1107         }
1108         spin_unlock(&lfsck->li_lock);
1109
1110         if (!init) {
1111                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1112                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1113                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1114                 ns->ln_items_checked += com->lc_new_checked;
1115                 com->lc_new_checked = 0;
1116         }
1117
1118         rc = lfsck_namespace_store(env, com, false);
1119         up_write(&com->lc_sem);
1120
1121         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
1122                lfsck_lfsck2name(lfsck), rc);
1123
1124         return rc;
1125 }
1126
1127 static int
1128 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1129                      struct seq_file *m)
1130 {
1131         struct lfsck_instance   *lfsck = com->lc_lfsck;
1132         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1133         struct lfsck_namespace  *ns    = com->lc_file_ram;
1134         int                      rc;
1135
1136         down_read(&com->lc_sem);
1137         seq_printf(m, "name: lfsck_namespace\n"
1138                    "magic: %#x\n"
1139                    "version: %d\n"
1140                    "status: %s\n",
1141                    ns->ln_magic,
1142                    bk->lb_version,
1143                    lfsck_status2names(ns->ln_status));
1144
1145         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
1146         if (rc < 0)
1147                 goto out;
1148
1149         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
1150         if (rc < 0)
1151                 goto out;
1152
1153         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
1154                              "time_since_last_completed");
1155         if (rc < 0)
1156                 goto out;
1157
1158         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
1159                              "time_since_latest_start");
1160         if (rc < 0)
1161                 goto out;
1162
1163         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
1164                              "time_since_last_checkpoint");
1165         if (rc < 0)
1166                 goto out;
1167
1168         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
1169                             "latest_start_position");
1170         if (rc < 0)
1171                 goto out;
1172
1173         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
1174                             "last_checkpoint_position");
1175         if (rc < 0)
1176                 goto out;
1177
1178         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
1179                             "first_failure_position");
1180         if (rc < 0)
1181                 goto out;
1182
1183         if (ns->ln_status == LS_SCANNING_PHASE1) {
1184                 struct lfsck_position pos;
1185                 const struct dt_it_ops *iops;
1186                 cfs_duration_t duration = cfs_time_current() -
1187                                           lfsck->li_time_last_checkpoint;
1188                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1189                 __u64 speed = checked;
1190                 __u64 new_checked = com->lc_new_checked * HZ;
1191                 __u32 rtime = ns->ln_run_time_phase1 +
1192                               cfs_duration_sec(duration + HALF_SEC);
1193
1194                 if (duration != 0)
1195                         do_div(new_checked, duration);
1196                 if (rtime != 0)
1197                         do_div(speed, rtime);
1198                 seq_printf(m, "checked_phase1: "LPU64"\n"
1199                               "checked_phase2: "LPU64"\n"
1200                               "updated_phase1: "LPU64"\n"
1201                               "updated_phase2: "LPU64"\n"
1202                               "failed_phase1: "LPU64"\n"
1203                               "failed_phase2: "LPU64"\n"
1204                               "directories: "LPU64"\n"
1205                               "multi_linked_files: "LPU64"\n"
1206                               "dirent_repaired: "LPU64"\n"
1207                               "linkea_repaired: "LPU64"\n"
1208                               "nlinks_repaired: "LPU64"\n"
1209                               "lost_found: "LPU64"\n"
1210                               "success_count: %u\n"
1211                               "run_time_phase1: %u seconds\n"
1212                               "run_time_phase2: %u seconds\n"
1213                               "average_speed_phase1: "LPU64" items/sec\n"
1214                               "average_speed_phase2: N/A\n"
1215                               "real_time_speed_phase1: "LPU64" items/sec\n"
1216                               "real_time_speed_phase2: N/A\n",
1217                               checked,
1218                               ns->ln_objs_checked_phase2,
1219                               ns->ln_items_repaired,
1220                               ns->ln_objs_repaired_phase2,
1221                               ns->ln_items_failed,
1222                               ns->ln_objs_failed_phase2,
1223                               ns->ln_dirs_checked,
1224                               ns->ln_mlinked_checked,
1225                               ns->ln_dirent_repaired,
1226                               ns->ln_linkea_repaired,
1227                               ns->ln_objs_nlink_repaired,
1228                               ns->ln_objs_lost_found,
1229                               ns->ln_success_count,
1230                               rtime,
1231                               ns->ln_run_time_phase2,
1232                               speed,
1233                               new_checked);
1234
1235                 LASSERT(lfsck->li_di_oit != NULL);
1236
1237                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1238
1239                 /* The low layer otable-based iteration position may NOT
1240                  * exactly match the namespace-based directory traversal
1241                  * cookie. Generally, it is not a serious issue. But the
1242                  * caller should NOT make assumption on that. */
1243                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1244                 if (!lfsck->li_current_oit_processed)
1245                         pos.lp_oit_cookie--;
1246
1247                 spin_lock(&lfsck->li_lock);
1248                 if (lfsck->li_di_dir != NULL) {
1249                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1250                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1251                                 fid_zero(&pos.lp_dir_parent);
1252                                 pos.lp_dir_cookie = 0;
1253                         } else {
1254                                 pos.lp_dir_parent =
1255                                         *lfsck_dto2fid(lfsck->li_obj_dir);
1256                         }
1257                 } else {
1258                         fid_zero(&pos.lp_dir_parent);
1259                         pos.lp_dir_cookie = 0;
1260                 }
1261                 spin_unlock(&lfsck->li_lock);
1262                 lfsck_pos_dump(m, &pos, "current_position");
1263         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1264                 cfs_duration_t duration = cfs_time_current() -
1265                                           lfsck->li_time_last_checkpoint;
1266                 __u64 checked = ns->ln_objs_checked_phase2 +
1267                                 com->lc_new_checked;
1268                 __u64 speed1 = ns->ln_items_checked;
1269                 __u64 speed2 = checked;
1270                 __u64 new_checked = com->lc_new_checked * HZ;
1271                 __u32 rtime = ns->ln_run_time_phase2 +
1272                               cfs_duration_sec(duration + HALF_SEC);
1273
1274                 if (duration != 0)
1275                         do_div(new_checked, duration);
1276                 if (ns->ln_run_time_phase1 != 0)
1277                         do_div(speed1, ns->ln_run_time_phase1);
1278                 if (rtime != 0)
1279                         do_div(speed2, rtime);
1280                 seq_printf(m, "checked_phase1: "LPU64"\n"
1281                               "checked_phase2: "LPU64"\n"
1282                               "updated_phase1: "LPU64"\n"
1283                               "updated_phase2: "LPU64"\n"
1284                               "failed_phase1: "LPU64"\n"
1285                               "failed_phase2: "LPU64"\n"
1286                               "directories: "LPU64"\n"
1287                               "multi_linked_files: "LPU64"\n"
1288                               "dirent_repaired: "LPU64"\n"
1289                               "linkea_repaired: "LPU64"\n"
1290                               "nlinks_repaired: "LPU64"\n"
1291                               "lost_found: "LPU64"\n"
1292                               "success_count: %u\n"
1293                               "run_time_phase1: %u seconds\n"
1294                               "run_time_phase2: %u seconds\n"
1295                               "average_speed_phase1: "LPU64" items/sec\n"
1296                               "average_speed_phase2: "LPU64" objs/sec\n"
1297                               "real_time_speed_phase1: N/A\n"
1298                               "real_time_speed_phase2: "LPU64" objs/sec\n"
1299                               "current_position: "DFID"\n",
1300                               ns->ln_items_checked,
1301                               checked,
1302                               ns->ln_items_repaired,
1303                               ns->ln_objs_repaired_phase2,
1304                               ns->ln_items_failed,
1305                               ns->ln_objs_failed_phase2,
1306                               ns->ln_dirs_checked,
1307                               ns->ln_mlinked_checked,
1308                               ns->ln_dirent_repaired,
1309                               ns->ln_linkea_repaired,
1310                               ns->ln_objs_nlink_repaired,
1311                               ns->ln_objs_lost_found,
1312                               ns->ln_success_count,
1313                               ns->ln_run_time_phase1,
1314                               rtime,
1315                               speed1,
1316                               speed2,
1317                               new_checked,
1318                               PFID(&ns->ln_fid_latest_scanned_phase2));
1319         } else {
1320                 __u64 speed1 = ns->ln_items_checked;
1321                 __u64 speed2 = ns->ln_objs_checked_phase2;
1322
1323                 if (ns->ln_run_time_phase1 != 0)
1324                         do_div(speed1, ns->ln_run_time_phase1);
1325                 if (ns->ln_run_time_phase2 != 0)
1326                         do_div(speed2, ns->ln_run_time_phase2);
1327                 seq_printf(m, "checked_phase1: "LPU64"\n"
1328                               "checked_phase2: "LPU64"\n"
1329                               "updated_phase1: "LPU64"\n"
1330                               "updated_phase2: "LPU64"\n"
1331                               "failed_phase1: "LPU64"\n"
1332                               "failed_phase2: "LPU64"\n"
1333                               "directories: "LPU64"\n"
1334                               "multi_linked_files: "LPU64"\n"
1335                               "dirent_repaired: "LPU64"\n"
1336                               "linkea_repaired: "LPU64"\n"
1337                               "nlinks_repaired: "LPU64"\n"
1338                               "lost_found: "LPU64"\n"
1339                               "success_count: %u\n"
1340                               "run_time_phase1: %u seconds\n"
1341                               "run_time_phase2: %u seconds\n"
1342                               "average_speed_phase1: "LPU64" items/sec\n"
1343                               "average_speed_phase2: "LPU64" objs/sec\n"
1344                               "real_time_speed_phase1: N/A\n"
1345                               "real_time_speed_phase2: N/A\n"
1346                               "current_position: N/A\n",
1347                               ns->ln_items_checked,
1348                               ns->ln_objs_checked_phase2,
1349                               ns->ln_items_repaired,
1350                               ns->ln_objs_repaired_phase2,
1351                               ns->ln_items_failed,
1352                               ns->ln_objs_failed_phase2,
1353                               ns->ln_dirs_checked,
1354                               ns->ln_mlinked_checked,
1355                               ns->ln_dirent_repaired,
1356                               ns->ln_linkea_repaired,
1357                               ns->ln_objs_nlink_repaired,
1358                               ns->ln_objs_lost_found,
1359                               ns->ln_success_count,
1360                               ns->ln_run_time_phase1,
1361                               ns->ln_run_time_phase2,
1362                               speed1,
1363                               speed2);
1364         }
1365 out:
1366         up_read(&com->lc_sem);
1367         return 0;
1368 }
1369
1370 static int lfsck_namespace_double_scan_main(void *args)
1371 {
1372         struct lfsck_thread_args *lta   = args;
1373         const struct lu_env     *env    = &lta->lta_env;
1374         struct lfsck_component  *com    = lta->lta_com;
1375         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1376         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1377         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1378         struct lfsck_namespace  *ns     = com->lc_file_ram;
1379         struct dt_object        *obj    = com->lc_obj;
1380         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
1381         struct dt_object        *target;
1382         struct dt_it            *di;
1383         struct dt_key           *key;
1384         struct lu_fid            fid;
1385         int                      rc;
1386         __u8                     flags = 0;
1387         ENTRY;
1388
1389         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
1390                lfsck_lfsck2name(lfsck));
1391
1392         com->lc_new_checked = 0;
1393         com->lc_new_scanned = 0;
1394         com->lc_time_last_checkpoint = cfs_time_current();
1395         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
1396                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1397
1398         di = iops->init(env, obj, 0, BYPASS_CAPA);
1399         if (IS_ERR(di))
1400                 GOTO(out, rc = PTR_ERR(di));
1401
1402         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
1403         rc = iops->get(env, di, (const struct dt_key *)&fid);
1404         if (rc < 0)
1405                 GOTO(fini, rc);
1406
1407         /* Skip the start one, which either has been processed or non-exist. */
1408         rc = iops->next(env, di);
1409         if (rc != 0)
1410                 GOTO(put, rc);
1411
1412         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1413                 GOTO(put, rc = 0);
1414
1415         do {
1416                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1417                     cfs_fail_val > 0) {
1418                         struct l_wait_info lwi;
1419
1420                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1421                                           NULL, NULL);
1422                         l_wait_event(thread->t_ctl_waitq,
1423                                      !thread_is_running(thread),
1424                                      &lwi);
1425                 }
1426
1427                 key = iops->key(env, di);
1428                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
1429                 target = lfsck_object_find(env, lfsck, &fid);
1430                 down_write(&com->lc_sem);
1431                 if (target == NULL) {
1432                         rc = 0;
1433                         goto checkpoint;
1434                 } else if (IS_ERR(target)) {
1435                         rc = PTR_ERR(target);
1436                         goto checkpoint;
1437                 }
1438
1439                 /* XXX: Currently, skip remote object, the consistency for
1440                  *      remote object will be processed in LFSCK phase III. */
1441                 if (dt_object_exists(target) && !dt_object_remote(target)) {
1442                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
1443                         if (rc == 0)
1444                                 rc = lfsck_namespace_double_scan_one(env, com,
1445                                                                 target, flags);
1446                 }
1447
1448                 lfsck_object_put(env, target);
1449
1450 checkpoint:
1451                 com->lc_new_checked++;
1452                 com->lc_new_scanned++;
1453                 ns->ln_fid_latest_scanned_phase2 = fid;
1454                 if (rc > 0)
1455                         ns->ln_objs_repaired_phase2++;
1456                 else if (rc < 0)
1457                         ns->ln_objs_failed_phase2++;
1458                 up_write(&com->lc_sem);
1459
1460                 if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
1461                         lfsck_namespace_delete(env, com, &fid);
1462                 } else if (rc < 0) {
1463                         flags |= LLF_REPAIR_FAILED;
1464                         lfsck_namespace_update(env, com, &fid, flags, true);
1465                 }
1466
1467                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1468                         GOTO(put, rc);
1469
1470                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
1471                                               cfs_time_current())) &&
1472                     com->lc_new_checked != 0) {
1473                         down_write(&com->lc_sem);
1474                         ns->ln_run_time_phase2 +=
1475                                 cfs_duration_sec(cfs_time_current() +
1476                                 HALF_SEC - com->lc_time_last_checkpoint);
1477                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1478                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1479                         com->lc_new_checked = 0;
1480                         rc = lfsck_namespace_store(env, com, false);
1481                         up_write(&com->lc_sem);
1482                         if (rc != 0)
1483                                 GOTO(put, rc);
1484
1485                         com->lc_time_last_checkpoint = cfs_time_current();
1486                         com->lc_time_next_checkpoint =
1487                                 com->lc_time_last_checkpoint +
1488                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1489                 }
1490
1491                 lfsck_control_speed_by_self(com);
1492                 if (unlikely(!thread_is_running(thread)))
1493                         GOTO(put, rc = 0);
1494
1495                 rc = iops->next(env, di);
1496         } while (rc == 0);
1497
1498         GOTO(put, rc);
1499
1500 put:
1501         iops->put(env, di);
1502
1503 fini:
1504         iops->fini(env, di);
1505
1506 out:
1507         down_write(&com->lc_sem);
1508         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1509                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1510         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1511         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1512         com->lc_new_checked = 0;
1513
1514         if (rc > 0) {
1515                 com->lc_journal = 0;
1516                 ns->ln_status = LS_COMPLETED;
1517                 if (!(bk->lb_param & LPF_DRYRUN))
1518                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1519                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1520                 ns->ln_success_count++;
1521         } else if (rc == 0) {
1522                 ns->ln_status = lfsck->li_status;
1523                 if (ns->ln_status == 0)
1524                         ns->ln_status = LS_STOPPED;
1525         } else {
1526                 ns->ln_status = LS_FAILED;
1527         }
1528
1529         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan finished, status %d: "
1530               "rc = %d\n", lfsck_lfsck2name(lfsck), ns->ln_status, rc);
1531
1532         rc = lfsck_namespace_store(env, com, false);
1533         up_write(&com->lc_sem);
1534         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
1535                 wake_up_all(&thread->t_ctl_waitq);
1536
1537         lfsck_thread_args_fini(lta);
1538
1539         return rc;
1540 }
1541
1542 static int lfsck_namespace_double_scan(const struct lu_env *env,
1543                                        struct lfsck_component *com)
1544 {
1545         struct lfsck_instance           *lfsck = com->lc_lfsck;
1546         struct lfsck_namespace          *ns    = com->lc_file_ram;
1547         struct lfsck_thread_args        *lta;
1548         struct task_struct              *task;
1549         int                              rc;
1550         ENTRY;
1551
1552         if (unlikely(ns->ln_status != LS_SCANNING_PHASE2))
1553                 RETURN(0);
1554
1555         lta = lfsck_thread_args_init(lfsck, com, NULL);
1556         if (IS_ERR(lta))
1557                 GOTO(out, rc = PTR_ERR(lta));
1558
1559         atomic_inc(&lfsck->li_double_scan_count);
1560         task = kthread_run(lfsck_namespace_double_scan_main, lta,
1561                            "lfsck_namespace");
1562         if (IS_ERR(task)) {
1563                 atomic_dec(&lfsck->li_double_scan_count);
1564                 lfsck_thread_args_fini(lta);
1565                 GOTO(out, rc = PTR_ERR(task));
1566         }
1567
1568         RETURN(0);
1569
1570 out:
1571         CERROR("%s: cannot start LFSCK namespace thread: rc = %d\n",
1572                lfsck_lfsck2name(lfsck), rc);
1573         return rc;
1574 }
1575
1576 static int lfsck_namespace_in_notify(const struct lu_env *env,
1577                                      struct lfsck_component *com,
1578                                      struct lfsck_request *lr)
1579 {
1580         return 0;
1581 }
1582
1583 static int lfsck_namespace_query(const struct lu_env *env,
1584                                  struct lfsck_component *com)
1585 {
1586         struct lfsck_namespace *ns = com->lc_file_ram;
1587
1588         return ns->ln_status;
1589 }
1590
1591 static struct lfsck_operations lfsck_namespace_ops = {
1592         .lfsck_reset            = lfsck_namespace_reset,
1593         .lfsck_fail             = lfsck_namespace_fail,
1594         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1595         .lfsck_prep             = lfsck_namespace_prep,
1596         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1597         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1598         .lfsck_post             = lfsck_namespace_post,
1599         .lfsck_dump             = lfsck_namespace_dump,
1600         .lfsck_double_scan      = lfsck_namespace_double_scan,
1601         .lfsck_in_notify        = lfsck_namespace_in_notify,
1602         .lfsck_query            = lfsck_namespace_query,
1603 };
1604
1605 /**
1606  * Verify the specified linkEA entry for the given directory object.
1607  * If the object has no such linkEA entry or it has more other linkEA
1608  * entries, then re-generate the linkEA with the given information.
1609  *
1610  * \param[in] env       pointer to the thread context
1611  * \param[in] dev       pointer to the dt_device
1612  * \param[in] obj       pointer to the dt_object to be handled
1613  * \param[in] cname     the name for the child in the parent directory
1614  * \param[in] pfid      the parent directory's FID for the linkEA
1615  *
1616  * \retval              0 for success
1617  * \retval              negative error number on failure
1618  */
1619 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
1620                         struct dt_object *obj, const struct lu_name *cname,
1621                         const struct lu_fid *pfid)
1622 {
1623         struct linkea_data       ldata  = { 0 };
1624         struct lu_buf            linkea_buf;
1625         struct thandle          *th;
1626         int                      rc;
1627         int                      fl     = LU_XATTR_CREATE;
1628         bool                     dirty  = false;
1629         ENTRY;
1630
1631         LASSERT(S_ISDIR(lfsck_object_type(obj)));
1632
1633         rc = lfsck_links_read(env, obj, &ldata);
1634         if (rc == -ENODATA) {
1635                 dirty = true;
1636         } else if (rc == 0) {
1637                 fl = LU_XATTR_REPLACE;
1638                 if (ldata.ld_leh->leh_reccount != 1) {
1639                         dirty = true;
1640                 } else {
1641                         rc = linkea_links_find(&ldata, cname, pfid);
1642                         if (rc != 0)
1643                                 dirty = true;
1644                 }
1645         }
1646
1647         if (!dirty)
1648                 RETURN(rc);
1649
1650         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
1651         if (rc != 0)
1652                 RETURN(rc);
1653
1654         rc = linkea_add_buf(&ldata, cname, pfid);
1655         if (rc != 0)
1656                 RETURN(rc);
1657
1658         linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1659         linkea_buf.lb_len = ldata.ld_leh->leh_len;
1660         th = dt_trans_create(env, dev);
1661         if (IS_ERR(th))
1662                 RETURN(PTR_ERR(th));
1663
1664         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1665                                   XATTR_NAME_LINK, fl, th);
1666         if (rc != 0)
1667                 GOTO(stop, rc = PTR_ERR(th));
1668
1669         rc = dt_trans_start_local(env, dev, th);
1670         if (rc != 0)
1671                 GOTO(stop, rc);
1672
1673         dt_write_lock(env, obj, 0);
1674         rc = dt_xattr_set(env, obj, &linkea_buf,
1675                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
1676         dt_write_unlock(env, obj);
1677
1678         GOTO(stop, rc);
1679
1680 stop:
1681         dt_trans_stop(env, dev, th);
1682         return rc;
1683 }
1684
1685 int lfsck_namespace_setup(const struct lu_env *env,
1686                           struct lfsck_instance *lfsck)
1687 {
1688         struct lfsck_component  *com;
1689         struct lfsck_namespace  *ns;
1690         struct dt_object        *root = NULL;
1691         struct dt_object        *obj;
1692         int                      rc;
1693         ENTRY;
1694
1695         LASSERT(lfsck->li_master);
1696
1697         OBD_ALLOC_PTR(com);
1698         if (com == NULL)
1699                 RETURN(-ENOMEM);
1700
1701         INIT_LIST_HEAD(&com->lc_link);
1702         INIT_LIST_HEAD(&com->lc_link_dir);
1703         init_rwsem(&com->lc_sem);
1704         atomic_set(&com->lc_ref, 1);
1705         com->lc_lfsck = lfsck;
1706         com->lc_type = LFSCK_TYPE_NAMESPACE;
1707         com->lc_ops = &lfsck_namespace_ops;
1708         com->lc_file_size = sizeof(struct lfsck_namespace);
1709         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
1710         if (com->lc_file_ram == NULL)
1711                 GOTO(out, rc = -ENOMEM);
1712
1713         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
1714         if (com->lc_file_disk == NULL)
1715                 GOTO(out, rc = -ENOMEM);
1716
1717         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
1718         if (IS_ERR(root))
1719                 GOTO(out, rc = PTR_ERR(root));
1720
1721         if (unlikely(!dt_try_as_dir(env, root)))
1722                 GOTO(out, rc = -ENOTDIR);
1723
1724         obj = local_index_find_or_create(env, lfsck->li_los, root,
1725                                          lfsck_namespace_name,
1726                                          S_IFREG | S_IRUGO | S_IWUSR,
1727                                          &dt_lfsck_features);
1728         if (IS_ERR(obj))
1729                 GOTO(out, rc = PTR_ERR(obj));
1730
1731         com->lc_obj = obj;
1732         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
1733         if (rc != 0)
1734                 GOTO(out, rc);
1735
1736         rc = lfsck_namespace_load(env, com);
1737         if (rc > 0)
1738                 rc = lfsck_namespace_reset(env, com, true);
1739         else if (rc == -ENODATA)
1740                 rc = lfsck_namespace_init(env, com);
1741         if (rc != 0)
1742                 GOTO(out, rc);
1743
1744         ns = com->lc_file_ram;
1745         switch (ns->ln_status) {
1746         case LS_INIT:
1747         case LS_COMPLETED:
1748         case LS_FAILED:
1749         case LS_STOPPED:
1750                 spin_lock(&lfsck->li_lock);
1751                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1752                 spin_unlock(&lfsck->li_lock);
1753                 break;
1754         default:
1755                 CERROR("%s: unknown lfsck_namespace status %d\n",
1756                        lfsck_lfsck2name(lfsck), ns->ln_status);
1757                 /* fall through */
1758         case LS_SCANNING_PHASE1:
1759         case LS_SCANNING_PHASE2:
1760                 /* No need to store the status to disk right now.
1761                  * If the system crashed before the status stored,
1762                  * it will be loaded back when next time. */
1763                 ns->ln_status = LS_CRASHED;
1764                 /* fall through */
1765         case LS_PAUSED:
1766         case LS_CRASHED:
1767                 spin_lock(&lfsck->li_lock);
1768                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
1769                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
1770                 spin_unlock(&lfsck->li_lock);
1771                 break;
1772         }
1773
1774         GOTO(out, rc = 0);
1775
1776 out:
1777         if (root != NULL && !IS_ERR(root))
1778                 lu_object_put(env, &root->do_lu);
1779         if (rc != 0) {
1780                 lfsck_component_cleanup(env, com);
1781                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
1782                        lfsck_lfsck2name(lfsck), rc);
1783         }
1784         return rc;
1785 }