Whamcloud - gitweb
dc4bb5e1c64106bdde220748bddfac45fa1d1ef4
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 static const char lfsck_namespace_name[] = "lfsck_namespace";
47
48 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
49                                       struct lfsck_namespace *src)
50 {
51         dst->ln_magic = le32_to_cpu(src->ln_magic);
52         dst->ln_status = le32_to_cpu(src->ln_status);
53         dst->ln_flags = le32_to_cpu(src->ln_flags);
54         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
55         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
56         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
57         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
58         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
59         dst->ln_time_last_checkpoint =
60                                 le64_to_cpu(src->ln_time_last_checkpoint);
61         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
62                                  &src->ln_pos_latest_start);
63         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
64                                  &src->ln_pos_last_checkpoint);
65         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
66                                  &src->ln_pos_first_inconsistent);
67         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
68         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
69         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
70         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
71         dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
72         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
73         dst->ln_objs_repaired_phase2 =
74                                 le64_to_cpu(src->ln_objs_repaired_phase2);
75         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
76         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
77         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
78         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
79                       &src->ln_fid_latest_scanned_phase2);
80         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
81         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
82 }
83
84 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
85                                       struct lfsck_namespace *src)
86 {
87         dst->ln_magic = cpu_to_le32(src->ln_magic);
88         dst->ln_status = cpu_to_le32(src->ln_status);
89         dst->ln_flags = cpu_to_le32(src->ln_flags);
90         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
91         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
92         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
93         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
94         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
95         dst->ln_time_last_checkpoint =
96                                 cpu_to_le64(src->ln_time_last_checkpoint);
97         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
98                                  &src->ln_pos_latest_start);
99         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
100                                  &src->ln_pos_last_checkpoint);
101         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
102                                  &src->ln_pos_first_inconsistent);
103         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
104         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
105         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
106         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
107         dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
108         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
109         dst->ln_objs_repaired_phase2 =
110                                 cpu_to_le64(src->ln_objs_repaired_phase2);
111         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
112         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
113         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
114         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
115                       &src->ln_fid_latest_scanned_phase2);
116         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
117         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
118 }
119
120 /**
121  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
122  * \retval 0: succeed.
123  * \retval -ve: failed cases.
124  */
125 static int lfsck_namespace_load(const struct lu_env *env,
126                                 struct lfsck_component *com)
127 {
128         int len = com->lc_file_size;
129         int rc;
130
131         rc = dt_xattr_get(env, com->lc_obj,
132                           lfsck_buf_get(env, com->lc_file_disk, len),
133                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
134         if (rc == len) {
135                 struct lfsck_namespace *ns = com->lc_file_ram;
136
137                 lfsck_namespace_le_to_cpu(ns,
138                                 (struct lfsck_namespace *)com->lc_file_disk);
139                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
140                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
141                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
142                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
143                         rc = 1;
144                 } else {
145                         rc = 0;
146                 }
147         } else if (rc != -ENODATA) {
148                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
149                        "expected = %d: rc = %d\n",
150                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
151                 if (rc >= 0)
152                         rc = 1;
153         }
154         return rc;
155 }
156
157 static int lfsck_namespace_store(const struct lu_env *env,
158                                  struct lfsck_component *com, bool init)
159 {
160         struct dt_object        *obj    = com->lc_obj;
161         struct lfsck_instance   *lfsck  = com->lc_lfsck;
162         struct thandle          *handle;
163         int                      len    = com->lc_file_size;
164         int                      rc;
165         ENTRY;
166
167         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
168                                   (struct lfsck_namespace *)com->lc_file_ram);
169         handle = dt_trans_create(env, lfsck->li_bottom);
170         if (IS_ERR(handle))
171                 GOTO(log, rc = PTR_ERR(handle));
172
173         rc = dt_declare_xattr_set(env, obj,
174                                   lfsck_buf_get(env, com->lc_file_disk, len),
175                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
176         if (rc != 0)
177                 GOTO(out, rc);
178
179         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
180         if (rc != 0)
181                 GOTO(out, rc);
182
183         rc = dt_xattr_set(env, obj,
184                           lfsck_buf_get(env, com->lc_file_disk, len),
185                           XATTR_NAME_LFSCK_NAMESPACE,
186                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
187                           handle, BYPASS_CAPA);
188
189         GOTO(out, rc);
190
191 out:
192         dt_trans_stop(env, lfsck->li_bottom, handle);
193
194 log:
195         if (rc != 0)
196                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
197                        lfsck_lfsck2name(lfsck), rc);
198         return rc;
199 }
200
201 static int lfsck_namespace_init(const struct lu_env *env,
202                                 struct lfsck_component *com)
203 {
204         struct lfsck_namespace *ns = com->lc_file_ram;
205         int rc;
206
207         memset(ns, 0, sizeof(*ns));
208         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
209         ns->ln_status = LS_INIT;
210         down_write(&com->lc_sem);
211         rc = lfsck_namespace_store(env, com, true);
212         up_write(&com->lc_sem);
213         return rc;
214 }
215
216 static int lfsck_namespace_lookup(const struct lu_env *env,
217                                   struct lfsck_component *com,
218                                   const struct lu_fid *fid, __u8 *flags)
219 {
220         struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
221         int            rc;
222
223         fid_cpu_to_be(key, fid);
224         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
225                        (const struct dt_key *)key, BYPASS_CAPA);
226         return rc;
227 }
228
229 static int lfsck_namespace_delete(const struct lu_env *env,
230                                   struct lfsck_component *com,
231                                   const struct lu_fid *fid)
232 {
233         struct lfsck_instance   *lfsck  = com->lc_lfsck;
234         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
235         struct thandle          *handle;
236         struct dt_object        *obj    = com->lc_obj;
237         int                      rc;
238         ENTRY;
239
240         handle = dt_trans_create(env, lfsck->li_bottom);
241         if (IS_ERR(handle))
242                 RETURN(PTR_ERR(handle));
243
244         rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
245         if (rc != 0)
246                 GOTO(out, rc);
247
248         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
249         if (rc != 0)
250                 GOTO(out, rc);
251
252         fid_cpu_to_be(key, fid);
253         rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
254                        BYPASS_CAPA);
255
256         GOTO(out, rc);
257
258 out:
259         dt_trans_stop(env, lfsck->li_bottom, handle);
260         return rc;
261 }
262
263 static int lfsck_namespace_update(const struct lu_env *env,
264                                   struct lfsck_component *com,
265                                   const struct lu_fid *fid,
266                                   __u8 flags, bool force)
267 {
268         struct lfsck_instance   *lfsck  = com->lc_lfsck;
269         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
270         struct thandle          *handle;
271         struct dt_object        *obj    = com->lc_obj;
272         int                      rc;
273         bool                     exist  = false;
274         __u8                     tf;
275         ENTRY;
276
277         rc = lfsck_namespace_lookup(env, com, fid, &tf);
278         if (rc != 0 && rc != -ENOENT)
279                 RETURN(rc);
280
281         if (rc == 0) {
282                 if (!force || flags == tf)
283                         RETURN(0);
284
285                 exist = true;
286                 handle = dt_trans_create(env, lfsck->li_bottom);
287                 if (IS_ERR(handle))
288                         RETURN(PTR_ERR(handle));
289
290                 rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
291                                        handle);
292                 if (rc != 0)
293                         GOTO(out, rc);
294         } else {
295                 handle = dt_trans_create(env, lfsck->li_bottom);
296                 if (IS_ERR(handle))
297                         RETURN(PTR_ERR(handle));
298         }
299
300         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
301                                (const struct dt_key *)fid, handle);
302         if (rc != 0)
303                 GOTO(out, rc);
304
305         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
306         if (rc != 0)
307                 GOTO(out, rc);
308
309         fid_cpu_to_be(key, fid);
310         if (exist) {
311                 rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
312                                BYPASS_CAPA);
313                 if (rc != 0)
314                         GOTO(out, rc);
315         }
316
317         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
318                        (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
319
320         GOTO(out, rc);
321
322 out:
323         dt_trans_stop(env, lfsck->li_bottom, handle);
324         return rc;
325 }
326
327 static int lfsck_namespace_check_exist(const struct lu_env *env,
328                                        struct lfsck_instance *lfsck,
329                                        struct dt_object *obj, const char *name)
330 {
331         struct dt_object *dir = lfsck->li_obj_dir;
332         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
333         int               rc;
334         ENTRY;
335
336         if (unlikely(lfsck_is_dead_obj(obj)))
337                 RETURN(LFSCK_NAMEENTRY_DEAD);
338
339         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
340                        (const struct dt_key *)name, BYPASS_CAPA);
341         if (rc == -ENOENT)
342                 RETURN(LFSCK_NAMEENTRY_REMOVED);
343
344         if (rc < 0)
345                 RETURN(rc);
346
347         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
348                 RETURN(LFSCK_NAMEENTRY_RECREATED);
349
350         RETURN(0);
351 }
352
353 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
354                                             struct dt_object *obj,
355                                             struct thandle *handle)
356 {
357         int rc;
358
359         /* For destroying all invalid linkEA entries. */
360         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
361         if (rc != 0)
362                 return rc;
363
364         /* For insert new linkEA entry. */
365         rc = dt_declare_xattr_set(env, obj,
366                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
367                         XATTR_NAME_LINK, 0, handle);
368         return rc;
369 }
370
371 static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
372                             struct linkea_data *ldata)
373 {
374         int rc;
375
376         ldata->ld_buf =
377                 lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
378                                        PAGE_CACHE_SIZE);
379         if (ldata->ld_buf->lb_buf == NULL)
380                 return -ENOMEM;
381
382         if (!dt_object_exists(obj))
383                 return -ENODATA;
384
385         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
386         if (rc == -ERANGE) {
387                 /* Buf was too small, figure out what we need. */
388                 lu_buf_free(ldata->ld_buf);
389                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
390                                   BYPASS_CAPA);
391                 if (rc < 0)
392                         return rc;
393
394                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
395                 if (ldata->ld_buf->lb_buf == NULL)
396                         return -ENOMEM;
397
398                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
399                                   BYPASS_CAPA);
400         }
401         if (rc < 0)
402                 return rc;
403
404         linkea_init(ldata);
405
406         return 0;
407 }
408
409 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
410                              struct linkea_data *ldata, struct thandle *handle)
411 {
412         const struct lu_buf *buf = lfsck_buf_get_const(env,
413                                                        ldata->ld_buf->lb_buf,
414                                                        ldata->ld_leh->leh_len);
415
416         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
417                             BYPASS_CAPA);
418 }
419
420 /**
421  * \retval ve: removed entries
422  */
423 static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
424                                      struct linkea_data *ldata,
425                                      struct lu_name *cname,
426                                      struct lu_fid *pfid)
427 {
428         struct link_ea_entry    *oldlee;
429         int                      oldlen;
430         int                      removed = 0;
431
432         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
433         oldlee = ldata->ld_lee;
434         oldlen = ldata->ld_reclen;
435         linkea_next_entry(ldata);
436         while (ldata->ld_lee != NULL) {
437                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
438                                    ldata->ld_lee->lee_reclen[1];
439                 if (unlikely(ldata->ld_reclen == oldlen &&
440                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
441                         linkea_del_buf(ldata, cname);
442                         removed++;
443                 } else {
444                         linkea_next_entry(ldata);
445                 }
446         }
447         ldata->ld_lee = oldlee;
448         ldata->ld_reclen = oldlen;
449         return removed;
450 }
451
452 /**
453  * \retval +ve  repaired
454  * \retval 0    no need to repair
455  * \retval -ve  error cases
456  */
457 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
458                                            struct lfsck_component *com,
459                                            struct dt_object *child, __u8 flags)
460 {
461         struct lfsck_thread_info *info    = lfsck_env_info(env);
462         struct lu_attr           *la      = &info->lti_la;
463         struct lu_name           *cname   = &info->lti_name;
464         struct lu_fid            *pfid    = &info->lti_fid;
465         struct lu_fid            *cfid    = &info->lti_fid2;
466         struct lfsck_instance   *lfsck    = com->lc_lfsck;
467         struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
468         struct lfsck_namespace  *ns       = com->lc_file_ram;
469         struct linkea_data       ldata    = { 0 };
470         struct thandle          *handle   = NULL;
471         bool                     locked   = false;
472         bool                     update   = false;
473         int                      rc;
474         ENTRY;
475
476         if (com->lc_journal) {
477
478 again:
479                 LASSERT(!locked);
480
481                 update = false;
482                 com->lc_journal = 1;
483                 handle = dt_trans_create(env, lfsck->li_next);
484                 if (IS_ERR(handle))
485                         RETURN(rc = PTR_ERR(handle));
486
487                 rc = dt_declare_xattr_set(env, child,
488                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
489                         XATTR_NAME_LINK, 0, handle);
490                 if (rc != 0)
491                         GOTO(stop, rc);
492
493                 rc = dt_trans_start(env, lfsck->li_next, handle);
494                 if (rc != 0)
495                         GOTO(stop, rc);
496
497                 dt_write_lock(env, child, MOR_TGT_CHILD);
498                 locked = true;
499         }
500
501         if (unlikely(lfsck_is_dead_obj(child)))
502                 GOTO(stop, rc = 0);
503
504         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
505         if (rc == 0)
506                 rc = lfsck_links_read(env, child, &ldata);
507         if (rc != 0) {
508                 if ((bk->lb_param & LPF_DRYRUN) &&
509                     (rc == -EINVAL || rc == -ENODATA))
510                         rc = 1;
511
512                 GOTO(stop, rc);
513         }
514
515         linkea_first_entry(&ldata);
516         while (ldata.ld_lee != NULL) {
517                 struct dt_object *parent = NULL;
518
519                 rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
520                 if (rc > 0)
521                         update = true;
522
523                 if (!fid_is_sane(pfid))
524                         goto shrink;
525
526                 parent = lfsck_object_find(env, lfsck, pfid);
527                 if (IS_ERR(parent))
528                         GOTO(stop, rc = PTR_ERR(parent));
529
530                 if (!dt_object_exists(parent))
531                         goto shrink;
532
533                 /* XXX: Currently, skip remote object, the consistency for
534                  *      remote object will be processed in LFSCK phase III. */
535                 if (dt_object_remote(parent)) {
536                         lfsck_object_put(env, parent);
537                         linkea_next_entry(&ldata);
538                         continue;
539                 }
540
541                 if (unlikely(!dt_try_as_dir(env, parent)))
542                         goto shrink;
543
544                 /* To guarantee the 'name' is terminated with '0'. */
545                 memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
546                 info->lti_key[cname->ln_namelen] = 0;
547                 cname->ln_name = info->lti_key;
548                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
549                                (const struct dt_key *)cname->ln_name,
550                                BYPASS_CAPA);
551                 if (rc != 0 && rc != -ENOENT) {
552                         lfsck_object_put(env, parent);
553                         GOTO(stop, rc);
554                 }
555
556                 if (rc == 0) {
557                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
558                                 lfsck_object_put(env, parent);
559                                 linkea_next_entry(&ldata);
560                                 continue;
561                         }
562
563                         goto shrink;
564                 }
565
566                 /* If there is no name entry in the parent dir and the object
567                  * link count is less than the linkea entries count, then the
568                  * linkea entry should be removed. */
569                 if (ldata.ld_leh->leh_reccount > la->la_nlink)
570                         goto shrink;
571
572                 /* XXX: For the case of there is a linkea entry, but without
573                  *      name entry pointing to the object and its hard links
574                  *      count is not less than the object name entries count,
575                  *      then seems we should add the 'missed' name entry back
576                  *      to namespace, but before LFSCK phase III finished, we
577                  *      do not know whether the object has some inconsistency
578                  *      on other MDTs. So now, do NOT add the name entry back
579                  *      to the namespace, but keep the linkEA entry. LU-2914 */
580                 lfsck_object_put(env, parent);
581                 linkea_next_entry(&ldata);
582                 continue;
583
584 shrink:
585                 if (parent != NULL)
586                         lfsck_object_put(env, parent);
587                 if (bk->lb_param & LPF_DRYRUN)
588                         RETURN(1);
589
590                 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
591                       "for the object: "DFID", parent "DFID", name %.*s\n",
592                       lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)),
593                       PFID(pfid), cname->ln_namelen, cname->ln_name);
594
595                 linkea_del_buf(&ldata, cname);
596                 update = true;
597         }
598
599         if (update) {
600                 if (!com->lc_journal) {
601                         com->lc_journal = 1;
602                         goto again;
603                 }
604
605                 rc = lfsck_links_write(env, child, &ldata, handle);
606         }
607
608         GOTO(stop, rc);
609
610 stop:
611         if (locked) {
612         /* XXX: For the case linkea entries count does not match the object hard
613          *      links count, we cannot update the later one simply. Before LFSCK
614          *      phase III finished, we cannot know whether there are some remote
615          *      name entries to be repaired or not. LU-2914 */
616                 if (rc == 0 && !lfsck_is_dead_obj(child) &&
617                     ldata.ld_leh != NULL &&
618                     ldata.ld_leh->leh_reccount != la->la_nlink)
619                         CDEBUG(D_LFSCK, "%s: the object "DFID" linkEA entry "
620                                "count %u may not match its hardlink count %u\n",
621                                lfsck_lfsck2name(lfsck), PFID(cfid),
622                                ldata.ld_leh->leh_reccount, la->la_nlink);
623
624                 dt_write_unlock(env, child);
625         }
626
627         if (handle != NULL)
628                 dt_trans_stop(env, lfsck->li_next, handle);
629
630         if (rc == 0 && update) {
631                 ns->ln_objs_nlink_repaired++;
632                 rc = 1;
633         }
634
635         return rc;
636 }
637
638 /* namespace APIs */
639
640 static int lfsck_namespace_reset(const struct lu_env *env,
641                                  struct lfsck_component *com, bool init)
642 {
643         struct lfsck_instance   *lfsck = com->lc_lfsck;
644         struct lfsck_namespace  *ns    = com->lc_file_ram;
645         struct dt_object        *root;
646         struct dt_object        *dto;
647         int                      rc;
648         ENTRY;
649
650         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
651         if (IS_ERR(root))
652                 GOTO(log, rc = PTR_ERR(root));
653
654         if (unlikely(!dt_try_as_dir(env, root)))
655                 GOTO(put, rc = -ENOTDIR);
656
657         down_write(&com->lc_sem);
658         if (init) {
659                 memset(ns, 0, sizeof(*ns));
660         } else {
661                 __u32 count = ns->ln_success_count;
662                 __u64 last_time = ns->ln_time_last_complete;
663
664                 memset(ns, 0, sizeof(*ns));
665                 ns->ln_success_count = count;
666                 ns->ln_time_last_complete = last_time;
667         }
668         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
669         ns->ln_status = LS_INIT;
670
671         rc = local_object_unlink(env, lfsck->li_bottom, root,
672                                  lfsck_namespace_name);
673         if (rc != 0)
674                 GOTO(out, rc);
675
676         lfsck_object_put(env, com->lc_obj);
677         com->lc_obj = NULL;
678         dto = local_index_find_or_create(env, lfsck->li_los, root,
679                                          lfsck_namespace_name,
680                                          S_IFREG | S_IRUGO | S_IWUSR,
681                                          &dt_lfsck_features);
682         if (IS_ERR(dto))
683                 GOTO(out, rc = PTR_ERR(dto));
684
685         com->lc_obj = dto;
686         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
687         if (rc != 0)
688                 GOTO(out, rc);
689
690         rc = lfsck_namespace_store(env, com, true);
691
692         GOTO(out, rc);
693
694 out:
695         up_write(&com->lc_sem);
696
697 put:
698         lu_object_put(env, &root->do_lu);
699 log:
700         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
701                lfsck_lfsck2name(lfsck), rc);
702         return rc;
703 }
704
705 static void
706 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
707                      bool new_checked)
708 {
709         struct lfsck_namespace *ns = com->lc_file_ram;
710
711         down_write(&com->lc_sem);
712         if (new_checked)
713                 com->lc_new_checked++;
714         ns->ln_items_failed++;
715         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
716                 lfsck_pos_fill(env, com->lc_lfsck,
717                                &ns->ln_pos_first_inconsistent, false);
718
719                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
720                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
721                        lfsck_lfsck2name(com->lc_lfsck),
722                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
723                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
724                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
725         }
726         up_write(&com->lc_sem);
727 }
728
729 static int lfsck_namespace_checkpoint(const struct lu_env *env,
730                                       struct lfsck_component *com, bool init)
731 {
732         struct lfsck_instance   *lfsck = com->lc_lfsck;
733         struct lfsck_namespace  *ns    = com->lc_file_ram;
734         int                      rc;
735
736         if (com->lc_new_checked == 0 && !init)
737                 return 0;
738
739         down_write(&com->lc_sem);
740         if (init) {
741                 ns->ln_pos_latest_start = lfsck->li_pos_current;
742         } else {
743                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
744                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
745                                 HALF_SEC - lfsck->li_time_last_checkpoint);
746                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
747                 ns->ln_items_checked += com->lc_new_checked;
748                 com->lc_new_checked = 0;
749         }
750
751         rc = lfsck_namespace_store(env, com, false);
752         up_write(&com->lc_sem);
753
754         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
755                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
756                lfsck->li_pos_current.lp_oit_cookie,
757                PFID(&lfsck->li_pos_current.lp_dir_parent),
758                lfsck->li_pos_current.lp_dir_cookie, rc);
759
760         return rc;
761 }
762
763 static int lfsck_namespace_prep(const struct lu_env *env,
764                                 struct lfsck_component *com,
765                                 struct lfsck_start_param *lsp)
766 {
767         struct lfsck_instance   *lfsck  = com->lc_lfsck;
768         struct lfsck_namespace  *ns     = com->lc_file_ram;
769         struct lfsck_position   *pos    = &com->lc_pos_start;
770
771         if (ns->ln_status == LS_COMPLETED) {
772                 int rc;
773
774                 rc = lfsck_namespace_reset(env, com, false);
775                 if (rc == 0)
776                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
777
778                 if (rc != 0) {
779                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
780                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
781
782                         return rc;
783                 }
784         }
785
786         down_write(&com->lc_sem);
787         ns->ln_time_latest_start = cfs_time_current_sec();
788
789         spin_lock(&lfsck->li_lock);
790         if (ns->ln_flags & LF_SCANNED_ONCE) {
791                 if (!lfsck->li_drop_dryrun ||
792                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
793                         ns->ln_status = LS_SCANNING_PHASE2;
794                         list_move_tail(&com->lc_link,
795                                        &lfsck->li_list_double_scan);
796                         if (!list_empty(&com->lc_link_dir))
797                                 list_del_init(&com->lc_link_dir);
798                         lfsck_pos_set_zero(pos);
799                 } else {
800                         ns->ln_status = LS_SCANNING_PHASE1;
801                         ns->ln_run_time_phase1 = 0;
802                         ns->ln_run_time_phase2 = 0;
803                         ns->ln_items_checked = 0;
804                         ns->ln_items_repaired = 0;
805                         ns->ln_items_failed = 0;
806                         ns->ln_dirs_checked = 0;
807                         ns->ln_mlinked_checked = 0;
808                         ns->ln_objs_checked_phase2 = 0;
809                         ns->ln_objs_repaired_phase2 = 0;
810                         ns->ln_objs_failed_phase2 = 0;
811                         ns->ln_objs_nlink_repaired = 0;
812                         ns->ln_objs_lost_found = 0;
813                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
814                         if (list_empty(&com->lc_link_dir))
815                                 list_add_tail(&com->lc_link_dir,
816                                               &lfsck->li_list_dir);
817                         *pos = ns->ln_pos_first_inconsistent;
818                 }
819         } else {
820                 ns->ln_status = LS_SCANNING_PHASE1;
821                 if (list_empty(&com->lc_link_dir))
822                         list_add_tail(&com->lc_link_dir,
823                                       &lfsck->li_list_dir);
824                 if (!lfsck->li_drop_dryrun ||
825                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
826                         *pos = ns->ln_pos_last_checkpoint;
827                         pos->lp_oit_cookie++;
828                 } else {
829                         *pos = ns->ln_pos_first_inconsistent;
830                 }
831         }
832         spin_unlock(&lfsck->li_lock);
833         up_write(&com->lc_sem);
834
835         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
836                DFID", "LPX64"]\n", lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
837                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
838
839         return 0;
840 }
841
842 static int lfsck_namespace_exec_oit(const struct lu_env *env,
843                                     struct lfsck_component *com,
844                                     struct dt_object *obj)
845 {
846         down_write(&com->lc_sem);
847         com->lc_new_checked++;
848         if (S_ISDIR(lfsck_object_type(obj)))
849                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
850         up_write(&com->lc_sem);
851         return 0;
852 }
853
854 static int lfsck_namespace_exec_dir(const struct lu_env *env,
855                                     struct lfsck_component *com,
856                                     struct dt_object *obj,
857                                     struct lu_dirent *ent)
858 {
859         struct lfsck_thread_info   *info     = lfsck_env_info(env);
860         struct lu_attr             *la       = &info->lti_la;
861         struct lfsck_instance      *lfsck    = com->lc_lfsck;
862         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
863         struct lfsck_namespace     *ns       = com->lc_file_ram;
864         struct linkea_data          ldata    = { 0 };
865         const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
866         const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
867         const struct lu_name       *cname;
868         struct thandle             *handle   = NULL;
869         bool                        repaired = false;
870         bool                        locked   = false;
871         bool                        remove;
872         bool                        newdata;
873         bool                        log      = false;
874         int                         count    = 0;
875         int                         rc;
876         ENTRY;
877
878         cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
879         down_write(&com->lc_sem);
880         com->lc_new_checked++;
881
882         if (ent->lde_attrs & LUDA_UPGRADE) {
883                 ns->ln_flags |= LF_UPGRADE;
884                 ns->ln_dirent_repaired++;
885                 repaired = true;
886         } else if (ent->lde_attrs & LUDA_REPAIR) {
887                 ns->ln_flags |= LF_INCONSISTENT;
888                 ns->ln_dirent_repaired++;
889                 repaired = true;
890         }
891
892         if (ent->lde_name[0] == '.' &&
893             (ent->lde_namelen == 1 ||
894              (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
895              fid_seq_is_dot(fid_seq(&ent->lde_fid))))
896                 GOTO(out, rc = 0);
897
898         if (!(bk->lb_param & LPF_DRYRUN) &&
899             (com->lc_journal || repaired)) {
900
901 again:
902                 LASSERT(!locked);
903
904                 com->lc_journal = 1;
905                 handle = dt_trans_create(env, lfsck->li_next);
906                 if (IS_ERR(handle))
907                         GOTO(out, rc = PTR_ERR(handle));
908
909                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
910                 if (rc != 0)
911                         GOTO(stop, rc);
912
913                 rc = dt_trans_start(env, lfsck->li_next, handle);
914                 if (rc != 0)
915                         GOTO(stop, rc);
916
917                 dt_write_lock(env, obj, MOR_TGT_CHILD);
918                 locked = true;
919         }
920
921         rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
922         if (rc != 0)
923                 GOTO(stop, rc);
924
925         rc = lfsck_links_read(env, obj, &ldata);
926         if (rc == 0) {
927                 count = ldata.ld_leh->leh_reccount;
928                 rc = linkea_links_find(&ldata, cname, pfid);
929                 if ((rc == 0) &&
930                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
931                         goto record;
932
933                 ns->ln_flags |= LF_INCONSISTENT;
934                 /* For dir, if there are more than one linkea entries, or the
935                  * linkea entry does not match the name entry, then remove all
936                  * and add the correct one. */
937                 if (S_ISDIR(lfsck_object_type(obj))) {
938                         remove = true;
939                         newdata = true;
940                 } else {
941                         remove = false;
942                         newdata = false;
943                 }
944                 goto nodata;
945         } else if (unlikely(rc == -EINVAL)) {
946                 count = 1;
947                 ns->ln_flags |= LF_INCONSISTENT;
948                 /* The magic crashed, we are not sure whether there are more
949                  * corrupt data in the linkea, so remove all linkea entries. */
950                 remove = true;
951                 newdata = true;
952                 goto nodata;
953         } else if (rc == -ENODATA) {
954                 count = 1;
955                 ns->ln_flags |= LF_UPGRADE;
956                 remove = false;
957                 newdata = true;
958
959 nodata:
960                 if (bk->lb_param & LPF_DRYRUN) {
961                         ns->ln_linkea_repaired++;
962                         log = true;
963                         repaired = true;
964                         goto record;
965                 }
966
967                 if (!com->lc_journal)
968                         goto again;
969
970                 if (remove) {
971                         LASSERT(newdata);
972
973                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
974                                           BYPASS_CAPA);
975                         if (rc != 0)
976                                 GOTO(stop, rc);
977                 }
978
979                 if (newdata) {
980                         rc = linkea_data_new(&ldata,
981                                         &lfsck_env_info(env)->lti_linkea_buf);
982                         if (rc != 0)
983                                 GOTO(stop, rc);
984                 }
985
986                 rc = linkea_add_buf(&ldata, cname, pfid);
987                 if (rc != 0)
988                         GOTO(stop, rc);
989
990                 rc = lfsck_links_write(env, obj, &ldata, handle);
991                 if (rc != 0)
992                         GOTO(stop, rc);
993
994                 count = ldata.ld_leh->leh_reccount;
995                 ns->ln_linkea_repaired++;
996                 log = true;
997                 repaired = true;
998         } else {
999                 GOTO(stop, rc);
1000         }
1001
1002 record:
1003         LASSERT(count > 0);
1004
1005         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1006         if (rc != 0)
1007                 GOTO(stop, rc);
1008
1009         if ((count == 1) &&
1010             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
1011                 /* Usually, it is for single linked object or dir, do nothing.*/
1012                 GOTO(stop, rc);
1013
1014         /* Following modification will be in another transaction.  */
1015         if (handle != NULL) {
1016                 LASSERT(dt_write_locked(env, obj));
1017
1018                 dt_write_unlock(env, obj);
1019                 locked = false;
1020
1021                 dt_trans_stop(env, lfsck->li_next, handle);
1022                 handle = NULL;
1023
1024                 if (log)
1025                         CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired "
1026                               "linkEA for the object: "DFID", parent "
1027                               DFID", name %.*s\n",
1028                               lfsck_lfsck2name(lfsck), PFID(cfid), PFID(pfid),
1029                               ent->lde_namelen, ent->lde_name);
1030         }
1031
1032         ns->ln_mlinked_checked++;
1033         rc = lfsck_namespace_update(env, com, cfid,
1034                         count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
1035
1036         GOTO(out, rc);
1037
1038 stop:
1039         if (locked)
1040                 dt_write_unlock(env, obj);
1041
1042         if (handle != NULL)
1043                 dt_trans_stop(env, lfsck->li_next, handle);
1044
1045 out:
1046         if (rc < 0) {
1047                 CDEBUG(D_LFSCK, "%s: namespace LFSCK exec_dir failed, "
1048                        "parent "DFID", child name %.*s, child FID "DFID
1049                        ": rc = %d\n", lfsck_lfsck2name(lfsck), PFID(pfid),
1050                        ent->lde_namelen, ent->lde_name, PFID(cfid), rc);
1051
1052                 ns->ln_items_failed++;
1053                 if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1054                         lfsck_pos_fill(env, lfsck,
1055                                        &ns->ln_pos_first_inconsistent, false);
1056                 if (!(bk->lb_param & LPF_FAILOUT))
1057                         rc = 0;
1058         } else {
1059                 if (repaired) {
1060                         ns->ln_items_repaired++;
1061                         if (bk->lb_param & LPF_DRYRUN &&
1062                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1063                                 lfsck_pos_fill(env, lfsck,
1064                                                &ns->ln_pos_first_inconsistent,
1065                                                false);
1066                 } else {
1067                         com->lc_journal = 0;
1068                 }
1069                 rc = 0;
1070         }
1071         up_write(&com->lc_sem);
1072         return rc;
1073 }
1074
1075 static int lfsck_namespace_post(const struct lu_env *env,
1076                                 struct lfsck_component *com,
1077                                 int result, bool init)
1078 {
1079         struct lfsck_instance   *lfsck = com->lc_lfsck;
1080         struct lfsck_namespace  *ns    = com->lc_file_ram;
1081         int                      rc;
1082
1083         down_write(&com->lc_sem);
1084         spin_lock(&lfsck->li_lock);
1085         if (!init)
1086                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
1087         if (result > 0) {
1088                 ns->ln_status = LS_SCANNING_PHASE2;
1089                 ns->ln_flags |= LF_SCANNED_ONCE;
1090                 ns->ln_flags &= ~LF_UPGRADE;
1091                 list_del_init(&com->lc_link_dir);
1092                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
1093         } else if (result == 0) {
1094                 ns->ln_status = lfsck->li_status;
1095                 if (ns->ln_status == 0)
1096                         ns->ln_status = LS_STOPPED;
1097                 if (ns->ln_status != LS_PAUSED) {
1098                         list_del_init(&com->lc_link_dir);
1099                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1100                 }
1101         } else {
1102                 ns->ln_status = LS_FAILED;
1103                 list_del_init(&com->lc_link_dir);
1104                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1105         }
1106         spin_unlock(&lfsck->li_lock);
1107
1108         if (!init) {
1109                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1110                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1111                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1112                 ns->ln_items_checked += com->lc_new_checked;
1113                 com->lc_new_checked = 0;
1114         }
1115
1116         rc = lfsck_namespace_store(env, com, false);
1117         up_write(&com->lc_sem);
1118
1119         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
1120                lfsck_lfsck2name(lfsck), rc);
1121
1122         return rc;
1123 }
1124
1125 static int
1126 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1127                      struct seq_file *m)
1128 {
1129         struct lfsck_instance   *lfsck = com->lc_lfsck;
1130         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1131         struct lfsck_namespace  *ns    = com->lc_file_ram;
1132         int                      rc;
1133
1134         down_read(&com->lc_sem);
1135         seq_printf(m, "name: lfsck_namespace\n"
1136                    "magic: %#x\n"
1137                    "version: %d\n"
1138                    "status: %s\n",
1139                    ns->ln_magic,
1140                    bk->lb_version,
1141                    lfsck_status2names(ns->ln_status));
1142
1143         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
1144         if (rc < 0)
1145                 goto out;
1146
1147         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
1148         if (rc < 0)
1149                 goto out;
1150
1151         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
1152                              "time_since_last_completed");
1153         if (rc < 0)
1154                 goto out;
1155
1156         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
1157                              "time_since_latest_start");
1158         if (rc < 0)
1159                 goto out;
1160
1161         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
1162                              "time_since_last_checkpoint");
1163         if (rc < 0)
1164                 goto out;
1165
1166         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
1167                             "latest_start_position");
1168         if (rc < 0)
1169                 goto out;
1170
1171         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
1172                             "last_checkpoint_position");
1173         if (rc < 0)
1174                 goto out;
1175
1176         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
1177                             "first_failure_position");
1178         if (rc < 0)
1179                 goto out;
1180
1181         if (ns->ln_status == LS_SCANNING_PHASE1) {
1182                 struct lfsck_position pos;
1183                 const struct dt_it_ops *iops;
1184                 cfs_duration_t duration = cfs_time_current() -
1185                                           lfsck->li_time_last_checkpoint;
1186                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1187                 __u64 speed = checked;
1188                 __u64 new_checked = com->lc_new_checked * HZ;
1189                 __u32 rtime = ns->ln_run_time_phase1 +
1190                               cfs_duration_sec(duration + HALF_SEC);
1191
1192                 if (duration != 0)
1193                         do_div(new_checked, duration);
1194                 if (rtime != 0)
1195                         do_div(speed, rtime);
1196                 seq_printf(m, "checked_phase1: "LPU64"\n"
1197                               "checked_phase2: "LPU64"\n"
1198                               "updated_phase1: "LPU64"\n"
1199                               "updated_phase2: "LPU64"\n"
1200                               "failed_phase1: "LPU64"\n"
1201                               "failed_phase2: "LPU64"\n"
1202                               "directories: "LPU64"\n"
1203                               "multi_linked_files: "LPU64"\n"
1204                               "dirent_repaired: "LPU64"\n"
1205                               "linkea_repaired: "LPU64"\n"
1206                               "nlinks_repaired: "LPU64"\n"
1207                               "lost_found: "LPU64"\n"
1208                               "success_count: %u\n"
1209                               "run_time_phase1: %u seconds\n"
1210                               "run_time_phase2: %u seconds\n"
1211                               "average_speed_phase1: "LPU64" items/sec\n"
1212                               "average_speed_phase2: N/A\n"
1213                               "real_time_speed_phase1: "LPU64" items/sec\n"
1214                               "real_time_speed_phase2: N/A\n",
1215                               checked,
1216                               ns->ln_objs_checked_phase2,
1217                               ns->ln_items_repaired,
1218                               ns->ln_objs_repaired_phase2,
1219                               ns->ln_items_failed,
1220                               ns->ln_objs_failed_phase2,
1221                               ns->ln_dirs_checked,
1222                               ns->ln_mlinked_checked,
1223                               ns->ln_dirent_repaired,
1224                               ns->ln_linkea_repaired,
1225                               ns->ln_objs_nlink_repaired,
1226                               ns->ln_objs_lost_found,
1227                               ns->ln_success_count,
1228                               rtime,
1229                               ns->ln_run_time_phase2,
1230                               speed,
1231                               new_checked);
1232
1233                 LASSERT(lfsck->li_di_oit != NULL);
1234
1235                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1236
1237                 /* The low layer otable-based iteration position may NOT
1238                  * exactly match the namespace-based directory traversal
1239                  * cookie. Generally, it is not a serious issue. But the
1240                  * caller should NOT make assumption on that. */
1241                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1242                 if (!lfsck->li_current_oit_processed)
1243                         pos.lp_oit_cookie--;
1244
1245                 spin_lock(&lfsck->li_lock);
1246                 if (lfsck->li_di_dir != NULL) {
1247                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1248                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1249                                 fid_zero(&pos.lp_dir_parent);
1250                                 pos.lp_dir_cookie = 0;
1251                         } else {
1252                                 pos.lp_dir_parent =
1253                                         *lfsck_dto2fid(lfsck->li_obj_dir);
1254                         }
1255                 } else {
1256                         fid_zero(&pos.lp_dir_parent);
1257                         pos.lp_dir_cookie = 0;
1258                 }
1259                 spin_unlock(&lfsck->li_lock);
1260                 lfsck_pos_dump(m, &pos, "current_position");
1261         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1262                 cfs_duration_t duration = cfs_time_current() -
1263                                           lfsck->li_time_last_checkpoint;
1264                 __u64 checked = ns->ln_objs_checked_phase2 +
1265                                 com->lc_new_checked;
1266                 __u64 speed1 = ns->ln_items_checked;
1267                 __u64 speed2 = checked;
1268                 __u64 new_checked = com->lc_new_checked * HZ;
1269                 __u32 rtime = ns->ln_run_time_phase2 +
1270                               cfs_duration_sec(duration + HALF_SEC);
1271
1272                 if (duration != 0)
1273                         do_div(new_checked, duration);
1274                 if (ns->ln_run_time_phase1 != 0)
1275                         do_div(speed1, ns->ln_run_time_phase1);
1276                 if (rtime != 0)
1277                         do_div(speed2, rtime);
1278                 seq_printf(m, "checked_phase1: "LPU64"\n"
1279                               "checked_phase2: "LPU64"\n"
1280                               "updated_phase1: "LPU64"\n"
1281                               "updated_phase2: "LPU64"\n"
1282                               "failed_phase1: "LPU64"\n"
1283                               "failed_phase2: "LPU64"\n"
1284                               "directories: "LPU64"\n"
1285                               "multi_linked_files: "LPU64"\n"
1286                               "dirent_repaired: "LPU64"\n"
1287                               "linkea_repaired: "LPU64"\n"
1288                               "nlinks_repaired: "LPU64"\n"
1289                               "lost_found: "LPU64"\n"
1290                               "success_count: %u\n"
1291                               "run_time_phase1: %u seconds\n"
1292                               "run_time_phase2: %u seconds\n"
1293                               "average_speed_phase1: "LPU64" items/sec\n"
1294                               "average_speed_phase2: "LPU64" objs/sec\n"
1295                               "real_time_speed_phase1: N/A\n"
1296                               "real_time_speed_phase2: "LPU64" objs/sec\n"
1297                               "current_position: "DFID"\n",
1298                               ns->ln_items_checked,
1299                               checked,
1300                               ns->ln_items_repaired,
1301                               ns->ln_objs_repaired_phase2,
1302                               ns->ln_items_failed,
1303                               ns->ln_objs_failed_phase2,
1304                               ns->ln_dirs_checked,
1305                               ns->ln_mlinked_checked,
1306                               ns->ln_dirent_repaired,
1307                               ns->ln_linkea_repaired,
1308                               ns->ln_objs_nlink_repaired,
1309                               ns->ln_objs_lost_found,
1310                               ns->ln_success_count,
1311                               ns->ln_run_time_phase1,
1312                               rtime,
1313                               speed1,
1314                               speed2,
1315                               new_checked,
1316                               PFID(&ns->ln_fid_latest_scanned_phase2));
1317         } else {
1318                 __u64 speed1 = ns->ln_items_checked;
1319                 __u64 speed2 = ns->ln_objs_checked_phase2;
1320
1321                 if (ns->ln_run_time_phase1 != 0)
1322                         do_div(speed1, ns->ln_run_time_phase1);
1323                 if (ns->ln_run_time_phase2 != 0)
1324                         do_div(speed2, ns->ln_run_time_phase2);
1325                 seq_printf(m, "checked_phase1: "LPU64"\n"
1326                               "checked_phase2: "LPU64"\n"
1327                               "updated_phase1: "LPU64"\n"
1328                               "updated_phase2: "LPU64"\n"
1329                               "failed_phase1: "LPU64"\n"
1330                               "failed_phase2: "LPU64"\n"
1331                               "directories: "LPU64"\n"
1332                               "multi_linked_files: "LPU64"\n"
1333                               "dirent_repaired: "LPU64"\n"
1334                               "linkea_repaired: "LPU64"\n"
1335                               "nlinks_repaired: "LPU64"\n"
1336                               "lost_found: "LPU64"\n"
1337                               "success_count: %u\n"
1338                               "run_time_phase1: %u seconds\n"
1339                               "run_time_phase2: %u seconds\n"
1340                               "average_speed_phase1: "LPU64" items/sec\n"
1341                               "average_speed_phase2: "LPU64" objs/sec\n"
1342                               "real_time_speed_phase1: N/A\n"
1343                               "real_time_speed_phase2: N/A\n"
1344                               "current_position: N/A\n",
1345                               ns->ln_items_checked,
1346                               ns->ln_objs_checked_phase2,
1347                               ns->ln_items_repaired,
1348                               ns->ln_objs_repaired_phase2,
1349                               ns->ln_items_failed,
1350                               ns->ln_objs_failed_phase2,
1351                               ns->ln_dirs_checked,
1352                               ns->ln_mlinked_checked,
1353                               ns->ln_dirent_repaired,
1354                               ns->ln_linkea_repaired,
1355                               ns->ln_objs_nlink_repaired,
1356                               ns->ln_objs_lost_found,
1357                               ns->ln_success_count,
1358                               ns->ln_run_time_phase1,
1359                               ns->ln_run_time_phase2,
1360                               speed1,
1361                               speed2);
1362         }
1363 out:
1364         up_read(&com->lc_sem);
1365         return 0;
1366 }
1367
1368 static int lfsck_namespace_double_scan_main(void *args)
1369 {
1370         struct lfsck_thread_args *lta   = args;
1371         const struct lu_env     *env    = &lta->lta_env;
1372         struct lfsck_component  *com    = lta->lta_com;
1373         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1374         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1375         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1376         struct lfsck_namespace  *ns     = com->lc_file_ram;
1377         struct dt_object        *obj    = com->lc_obj;
1378         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
1379         struct dt_object        *target;
1380         struct dt_it            *di;
1381         struct dt_key           *key;
1382         struct lu_fid            fid;
1383         int                      rc;
1384         __u8                     flags = 0;
1385         ENTRY;
1386
1387         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
1388                lfsck_lfsck2name(lfsck));
1389
1390         com->lc_new_checked = 0;
1391         com->lc_new_scanned = 0;
1392         com->lc_time_last_checkpoint = cfs_time_current();
1393         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
1394                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1395
1396         di = iops->init(env, obj, 0, BYPASS_CAPA);
1397         if (IS_ERR(di))
1398                 GOTO(out, rc = PTR_ERR(di));
1399
1400         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
1401         rc = iops->get(env, di, (const struct dt_key *)&fid);
1402         if (rc < 0)
1403                 GOTO(fini, rc);
1404
1405         /* Skip the start one, which either has been processed or non-exist. */
1406         rc = iops->next(env, di);
1407         if (rc != 0)
1408                 GOTO(put, rc);
1409
1410         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1411                 GOTO(put, rc = 0);
1412
1413         do {
1414                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1415                     cfs_fail_val > 0) {
1416                         struct l_wait_info lwi;
1417
1418                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1419                                           NULL, NULL);
1420                         l_wait_event(thread->t_ctl_waitq,
1421                                      !thread_is_running(thread),
1422                                      &lwi);
1423                 }
1424
1425                 key = iops->key(env, di);
1426                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
1427                 target = lfsck_object_find(env, lfsck, &fid);
1428                 down_write(&com->lc_sem);
1429                 if (IS_ERR(target)) {
1430                         rc = PTR_ERR(target);
1431                         goto checkpoint;
1432                 }
1433
1434                 /* XXX: Currently, skip remote object, the consistency for
1435                  *      remote object will be processed in LFSCK phase III. */
1436                 if (dt_object_exists(target) && !dt_object_remote(target)) {
1437                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
1438                         if (rc == 0)
1439                                 rc = lfsck_namespace_double_scan_one(env, com,
1440                                                                 target, flags);
1441                 }
1442
1443                 lfsck_object_put(env, target);
1444
1445 checkpoint:
1446                 com->lc_new_checked++;
1447                 com->lc_new_scanned++;
1448                 ns->ln_fid_latest_scanned_phase2 = fid;
1449                 if (rc > 0)
1450                         ns->ln_objs_repaired_phase2++;
1451                 else if (rc < 0)
1452                         ns->ln_objs_failed_phase2++;
1453                 up_write(&com->lc_sem);
1454
1455                 if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
1456                         lfsck_namespace_delete(env, com, &fid);
1457                 } else if (rc < 0) {
1458                         flags |= LLF_REPAIR_FAILED;
1459                         lfsck_namespace_update(env, com, &fid, flags, true);
1460                 }
1461
1462                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1463                         GOTO(put, rc);
1464
1465                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
1466                                               cfs_time_current())) &&
1467                     com->lc_new_checked != 0) {
1468                         down_write(&com->lc_sem);
1469                         ns->ln_run_time_phase2 +=
1470                                 cfs_duration_sec(cfs_time_current() +
1471                                 HALF_SEC - com->lc_time_last_checkpoint);
1472                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1473                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1474                         com->lc_new_checked = 0;
1475                         rc = lfsck_namespace_store(env, com, false);
1476                         up_write(&com->lc_sem);
1477                         if (rc != 0)
1478                                 GOTO(put, rc);
1479
1480                         com->lc_time_last_checkpoint = cfs_time_current();
1481                         com->lc_time_next_checkpoint =
1482                                 com->lc_time_last_checkpoint +
1483                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1484                 }
1485
1486                 lfsck_control_speed_by_self(com);
1487                 if (unlikely(!thread_is_running(thread)))
1488                         GOTO(put, rc = 0);
1489
1490                 rc = iops->next(env, di);
1491         } while (rc == 0);
1492
1493         GOTO(put, rc);
1494
1495 put:
1496         iops->put(env, di);
1497
1498 fini:
1499         iops->fini(env, di);
1500
1501 out:
1502         down_write(&com->lc_sem);
1503         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1504                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1505         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1506         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1507         com->lc_new_checked = 0;
1508
1509         if (rc > 0) {
1510                 com->lc_journal = 0;
1511                 ns->ln_status = LS_COMPLETED;
1512                 if (!(bk->lb_param & LPF_DRYRUN))
1513                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1514                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1515                 ns->ln_success_count++;
1516         } else if (rc == 0) {
1517                 ns->ln_status = lfsck->li_status;
1518                 if (ns->ln_status == 0)
1519                         ns->ln_status = LS_STOPPED;
1520         } else {
1521                 ns->ln_status = LS_FAILED;
1522         }
1523
1524         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan finished, status %d: "
1525               "rc = %d\n", lfsck_lfsck2name(lfsck), ns->ln_status, rc);
1526
1527         rc = lfsck_namespace_store(env, com, false);
1528         up_write(&com->lc_sem);
1529         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
1530                 wake_up_all(&thread->t_ctl_waitq);
1531
1532         lfsck_thread_args_fini(lta);
1533
1534         return rc;
1535 }
1536
1537 static int lfsck_namespace_double_scan(const struct lu_env *env,
1538                                        struct lfsck_component *com)
1539 {
1540         struct lfsck_instance           *lfsck = com->lc_lfsck;
1541         struct lfsck_namespace          *ns    = com->lc_file_ram;
1542         struct lfsck_thread_args        *lta;
1543         struct task_struct              *task;
1544         int                              rc;
1545         ENTRY;
1546
1547         if (unlikely(ns->ln_status != LS_SCANNING_PHASE2))
1548                 RETURN(0);
1549
1550         lta = lfsck_thread_args_init(lfsck, com, NULL);
1551         if (IS_ERR(lta))
1552                 GOTO(out, rc = PTR_ERR(lta));
1553
1554         atomic_inc(&lfsck->li_double_scan_count);
1555         task = kthread_run(lfsck_namespace_double_scan_main, lta,
1556                            "lfsck_namespace");
1557         if (IS_ERR(task)) {
1558                 atomic_dec(&lfsck->li_double_scan_count);
1559                 lfsck_thread_args_fini(lta);
1560                 GOTO(out, rc = PTR_ERR(task));
1561         }
1562
1563         RETURN(0);
1564
1565 out:
1566         CERROR("%s: cannot start LFSCK namespace thread: rc = %d\n",
1567                lfsck_lfsck2name(lfsck), rc);
1568         return rc;
1569 }
1570
1571 static int lfsck_namespace_in_notify(const struct lu_env *env,
1572                                      struct lfsck_component *com,
1573                                      struct lfsck_request *lr)
1574 {
1575         return 0;
1576 }
1577
1578 static int lfsck_namespace_query(const struct lu_env *env,
1579                                  struct lfsck_component *com)
1580 {
1581         struct lfsck_namespace *ns = com->lc_file_ram;
1582
1583         return ns->ln_status;
1584 }
1585
1586 static struct lfsck_operations lfsck_namespace_ops = {
1587         .lfsck_reset            = lfsck_namespace_reset,
1588         .lfsck_fail             = lfsck_namespace_fail,
1589         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1590         .lfsck_prep             = lfsck_namespace_prep,
1591         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1592         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1593         .lfsck_post             = lfsck_namespace_post,
1594         .lfsck_dump             = lfsck_namespace_dump,
1595         .lfsck_double_scan      = lfsck_namespace_double_scan,
1596         .lfsck_in_notify        = lfsck_namespace_in_notify,
1597         .lfsck_query            = lfsck_namespace_query,
1598 };
1599
1600 /**
1601  * Verify the specified linkEA entry for the given directory object.
1602  * If the object has no such linkEA entry or it has more other linkEA
1603  * entries, then re-generate the linkEA with the given information.
1604  *
1605  * \param[in] env       pointer to the thread context
1606  * \param[in] dev       pointer to the dt_device
1607  * \param[in] obj       pointer to the dt_object to be handled
1608  * \param[in] cname     the name for the child in the parent directory
1609  * \param[in] pfid      the parent directory's FID for the linkEA
1610  *
1611  * \retval              0 for success
1612  * \retval              negative error number on failure
1613  */
1614 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
1615                         struct dt_object *obj, const struct lu_name *cname,
1616                         const struct lu_fid *pfid)
1617 {
1618         struct linkea_data       ldata  = { 0 };
1619         struct lu_buf            linkea_buf;
1620         struct thandle          *th;
1621         int                      rc;
1622         int                      fl     = LU_XATTR_CREATE;
1623         bool                     dirty  = false;
1624         ENTRY;
1625
1626         LASSERT(S_ISDIR(lfsck_object_type(obj)));
1627
1628         rc = lfsck_links_read(env, obj, &ldata);
1629         if (rc == -ENODATA) {
1630                 dirty = true;
1631         } else if (rc == 0) {
1632                 fl = LU_XATTR_REPLACE;
1633                 if (ldata.ld_leh->leh_reccount != 1) {
1634                         dirty = true;
1635                 } else {
1636                         rc = linkea_links_find(&ldata, cname, pfid);
1637                         if (rc != 0)
1638                                 dirty = true;
1639                 }
1640         }
1641
1642         if (!dirty)
1643                 RETURN(rc);
1644
1645         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
1646         if (rc != 0)
1647                 RETURN(rc);
1648
1649         rc = linkea_add_buf(&ldata, cname, pfid);
1650         if (rc != 0)
1651                 RETURN(rc);
1652
1653         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1654                        ldata.ld_leh->leh_len);
1655         th = dt_trans_create(env, dev);
1656         if (IS_ERR(th))
1657                 RETURN(PTR_ERR(th));
1658
1659         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1660                                   XATTR_NAME_LINK, fl, th);
1661         if (rc != 0)
1662                 GOTO(stop, rc = PTR_ERR(th));
1663
1664         rc = dt_trans_start_local(env, dev, th);
1665         if (rc != 0)
1666                 GOTO(stop, rc);
1667
1668         dt_write_lock(env, obj, 0);
1669         rc = dt_xattr_set(env, obj, &linkea_buf,
1670                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
1671         dt_write_unlock(env, obj);
1672
1673         GOTO(stop, rc);
1674
1675 stop:
1676         dt_trans_stop(env, dev, th);
1677         return rc;
1678 }
1679
1680 /**
1681  * Get the name and parent directory's FID from the first linkEA entry.
1682  *
1683  * \param[in] env       pointer to the thread context
1684  * \param[in] obj       pointer to the object which get linkEA from
1685  * \param[out] name     pointer to the buffer to hold the name
1686  *                      in the first linkEA entry
1687  * \param[out] pfid     pointer to the buffer to hold the parent
1688  *                      directory's FID in the first linkEA entry
1689  *
1690  * \retval              0 for success
1691  * \retval              negative error number on failure
1692  */
1693 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
1694                           char *name, struct lu_fid *pfid)
1695 {
1696         struct lu_name           *cname = &lfsck_env_info(env)->lti_name;
1697         struct linkea_data        ldata = { 0 };
1698         int                       rc;
1699
1700         rc = lfsck_links_read(env, obj, &ldata);
1701         if (rc != 0)
1702                 return rc;
1703
1704         linkea_first_entry(&ldata);
1705         if (ldata.ld_lee == NULL)
1706                 return -ENODATA;
1707
1708         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
1709         /* To guarantee the 'name' is terminated with '0'. */
1710         memcpy(name, cname->ln_name, cname->ln_namelen);
1711         name[cname->ln_namelen] = 0;
1712
1713         return 0;
1714 }
1715
1716 /**
1717  * Remove the name entry from the parent directory.
1718  *
1719  * No need to care about the object referenced by the name entry,
1720  * either the name entry is invalid or redundant, or the referenced
1721  * object has been processed has been or will be handled by others.
1722  *
1723  * \param[in] env       pointer to the thread context
1724  * \param[in] lfsck     pointer to the lfsck instance
1725  * \param[in] parent    pointer to the lost+found object
1726  * \param[in] name      the name for the name entry to be removed
1727  * \param[in] type      the type for the name entry to be removed
1728  *
1729  * \retval              0 for success
1730  * \retval              negative error number on failure
1731  */
1732 int lfsck_remove_name_entry(const struct lu_env *env,
1733                             struct lfsck_instance *lfsck,
1734                             struct dt_object *parent,
1735                             const char *name, __u32 type)
1736 {
1737         struct dt_device        *dev    = lfsck->li_next;
1738         struct thandle          *th;
1739         struct lustre_handle     lh     = { 0 };
1740         int                      rc;
1741         ENTRY;
1742
1743         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1744                               MDS_INODELOCK_UPDATE, LCK_EX);
1745         if (rc != 0)
1746                 RETURN(rc);
1747
1748         th = dt_trans_create(env, dev);
1749         if (IS_ERR(th))
1750                 GOTO(unlock, rc = PTR_ERR(th));
1751
1752         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1753         if (rc != 0)
1754                 GOTO(stop, rc);
1755
1756         if (S_ISDIR(type)) {
1757                 rc = dt_declare_ref_del(env, parent, th);
1758                 if (rc != 0)
1759                         GOTO(stop, rc);
1760         }
1761
1762         rc = dt_trans_start(env, dev, th);
1763         if (rc != 0)
1764                 GOTO(stop, rc);
1765
1766         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
1767                        BYPASS_CAPA);
1768         if (rc != 0)
1769                 GOTO(stop, rc);
1770
1771         if (S_ISDIR(type)) {
1772                 dt_write_lock(env, parent, 0);
1773                 rc = dt_ref_del(env, parent, th);
1774                 dt_write_unlock(env, parent);
1775         }
1776
1777         GOTO(stop, rc);
1778
1779 stop:
1780         dt_trans_stop(env, dev, th);
1781
1782 unlock:
1783         lfsck_ibits_unlock(&lh, LCK_EX);
1784
1785         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
1786                "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
1787                PFID(lfsck_dto2fid(parent)), name, type, rc);
1788
1789         return rc;
1790 }
1791
1792 /**
1793  * Update the object's name entry with the given FID.
1794  *
1795  * \param[in] env       pointer to the thread context
1796  * \param[in] lfsck     pointer to the lfsck instance
1797  * \param[in] parent    pointer to the parent directory that holds
1798  *                      the name entry
1799  * \param[in] name      the name for the entry to be updated
1800  * \param[in] pfid      the new PFID for the name entry
1801  * \param[in] type      the type for the name entry to be updated
1802  *
1803  * \retval              0 for success
1804  * \retval              negative error number on failure
1805  */
1806 int lfsck_update_name_entry(const struct lu_env *env,
1807                             struct lfsck_instance *lfsck,
1808                             struct dt_object *parent, const char *name,
1809                             const struct lu_fid *pfid, __u32 type)
1810 {
1811         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
1812         struct dt_device        *dev    = lfsck->li_next;
1813         struct lustre_handle     lh     = { 0 };
1814         struct thandle          *th;
1815         int                      rc;
1816         bool                     exists = true;
1817         ENTRY;
1818
1819         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1820                               MDS_INODELOCK_UPDATE, LCK_EX);
1821         if (rc != 0)
1822                 RETURN(rc);
1823
1824         th = dt_trans_create(env, dev);
1825         if (IS_ERR(th))
1826                 GOTO(unlock, rc = PTR_ERR(th));
1827
1828         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1829         if (rc != 0)
1830                 GOTO(stop, rc);
1831
1832         rec->rec_type = type;
1833         rec->rec_fid = pfid;
1834         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1835                                (const struct dt_key *)name, th);
1836         if (rc != 0)
1837                 GOTO(stop, rc);
1838
1839         rc = dt_declare_ref_add(env, parent, th);
1840         if (rc != 0)
1841                 GOTO(stop, rc);
1842
1843         rc = dt_trans_start(env, dev, th);
1844         if (rc != 0)
1845                 GOTO(stop, rc);
1846
1847         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
1848                        BYPASS_CAPA);
1849         if (rc == -ENOENT) {
1850                 exists = false;
1851                 rc = 0;
1852         }
1853
1854         if (rc != 0)
1855                 GOTO(stop, rc);
1856
1857         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1858                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1859         if (rc == 0 && S_ISDIR(type) && !exists) {
1860                 dt_write_lock(env, parent, 0);
1861                 rc = dt_ref_add(env, parent, th);
1862                 dt_write_unlock(env, parent);
1863         }
1864
1865         GOTO(stop, rc);
1866
1867 stop:
1868         dt_trans_stop(env, dev, th);
1869
1870 unlock:
1871         lfsck_ibits_unlock(&lh, LCK_EX);
1872
1873         CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
1874                " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
1875                PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
1876
1877         return rc;
1878 }
1879
1880 int lfsck_namespace_setup(const struct lu_env *env,
1881                           struct lfsck_instance *lfsck)
1882 {
1883         struct lfsck_component  *com;
1884         struct lfsck_namespace  *ns;
1885         struct dt_object        *root = NULL;
1886         struct dt_object        *obj;
1887         int                      rc;
1888         ENTRY;
1889
1890         LASSERT(lfsck->li_master);
1891
1892         OBD_ALLOC_PTR(com);
1893         if (com == NULL)
1894                 RETURN(-ENOMEM);
1895
1896         INIT_LIST_HEAD(&com->lc_link);
1897         INIT_LIST_HEAD(&com->lc_link_dir);
1898         init_rwsem(&com->lc_sem);
1899         atomic_set(&com->lc_ref, 1);
1900         com->lc_lfsck = lfsck;
1901         com->lc_type = LFSCK_TYPE_NAMESPACE;
1902         com->lc_ops = &lfsck_namespace_ops;
1903         com->lc_file_size = sizeof(struct lfsck_namespace);
1904         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
1905         if (com->lc_file_ram == NULL)
1906                 GOTO(out, rc = -ENOMEM);
1907
1908         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
1909         if (com->lc_file_disk == NULL)
1910                 GOTO(out, rc = -ENOMEM);
1911
1912         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
1913         if (IS_ERR(root))
1914                 GOTO(out, rc = PTR_ERR(root));
1915
1916         if (unlikely(!dt_try_as_dir(env, root)))
1917                 GOTO(out, rc = -ENOTDIR);
1918
1919         obj = local_index_find_or_create(env, lfsck->li_los, root,
1920                                          lfsck_namespace_name,
1921                                          S_IFREG | S_IRUGO | S_IWUSR,
1922                                          &dt_lfsck_features);
1923         if (IS_ERR(obj))
1924                 GOTO(out, rc = PTR_ERR(obj));
1925
1926         com->lc_obj = obj;
1927         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
1928         if (rc != 0)
1929                 GOTO(out, rc);
1930
1931         rc = lfsck_namespace_load(env, com);
1932         if (rc > 0)
1933                 rc = lfsck_namespace_reset(env, com, true);
1934         else if (rc == -ENODATA)
1935                 rc = lfsck_namespace_init(env, com);
1936         if (rc != 0)
1937                 GOTO(out, rc);
1938
1939         ns = com->lc_file_ram;
1940         switch (ns->ln_status) {
1941         case LS_INIT:
1942         case LS_COMPLETED:
1943         case LS_FAILED:
1944         case LS_STOPPED:
1945                 spin_lock(&lfsck->li_lock);
1946                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1947                 spin_unlock(&lfsck->li_lock);
1948                 break;
1949         default:
1950                 CERROR("%s: unknown lfsck_namespace status %d\n",
1951                        lfsck_lfsck2name(lfsck), ns->ln_status);
1952                 /* fall through */
1953         case LS_SCANNING_PHASE1:
1954         case LS_SCANNING_PHASE2:
1955                 /* No need to store the status to disk right now.
1956                  * If the system crashed before the status stored,
1957                  * it will be loaded back when next time. */
1958                 ns->ln_status = LS_CRASHED;
1959                 /* fall through */
1960         case LS_PAUSED:
1961         case LS_CRASHED:
1962                 spin_lock(&lfsck->li_lock);
1963                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
1964                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
1965                 spin_unlock(&lfsck->li_lock);
1966                 break;
1967         }
1968
1969         GOTO(out, rc = 0);
1970
1971 out:
1972         if (root != NULL && !IS_ERR(root))
1973                 lu_object_put(env, &root->do_lu);
1974         if (rc != 0) {
1975                 lfsck_component_cleanup(env, com);
1976                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
1977                        lfsck_lfsck2name(lfsck), rc);
1978         }
1979         return rc;
1980 }