Whamcloud - gitweb
LU-6109 lfsck: check FID validity before locating object
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC_V1        0xA0629D03
45 #define LFSCK_NAMESPACE_MAGIC_V2        0xA0621A0B
46
47 /* For Lustre-2.x (x <= 6), the namespace LFSCK used LFSCK_NAMESPACE_MAGIC_V1
48  * as the trace file magic. When downgrade to such old release, the old LFSCK
49  * will not recognize the new LFSCK_NAMESPACE_MAGIC_V2 in the new trace file,
50  * then it will reset the whole LFSCK, and will not cause start failure. The
51  * similar case will happen when upgrade from such old release. */
52 #define LFSCK_NAMESPACE_MAGIC           LFSCK_NAMESPACE_MAGIC_V2
53
54 enum lfsck_nameentry_check {
55         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
56         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
57         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
58 };
59
60 static struct lfsck_namespace_req *
61 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
62                                    struct lu_dirent *ent, __u16 type)
63 {
64         struct lfsck_namespace_req *lnr;
65         int                         size;
66
67         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
68         OBD_ALLOC(lnr, size);
69         if (lnr == NULL)
70                 return ERR_PTR(-ENOMEM);
71
72         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
73         lnr->lnr_obj = lfsck_object_get(lfsck->li_obj_dir);
74         lnr->lnr_lmv = lfsck_lmv_get(lfsck->li_lmv);
75         lnr->lnr_fid = ent->lde_fid;
76         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
77         lnr->lnr_dir_cookie = ent->lde_hash;
78         lnr->lnr_attr = ent->lde_attrs;
79         lnr->lnr_size = size;
80         lnr->lnr_type = type;
81         lnr->lnr_namelen = ent->lde_namelen;
82         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
83
84         return lnr;
85 }
86
87 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
88                                                struct lfsck_assistant_req *lar)
89 {
90         struct lfsck_namespace_req *lnr =
91                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
92
93         if (lnr->lnr_lmv != NULL)
94                 lfsck_lmv_put(env, lnr->lnr_lmv);
95
96         lu_object_put(env, &lnr->lnr_obj->do_lu);
97         OBD_FREE(lnr, lnr->lnr_size);
98 }
99
100 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
101                                       struct lfsck_namespace *src)
102 {
103         dst->ln_magic = le32_to_cpu(src->ln_magic);
104         dst->ln_status = le32_to_cpu(src->ln_status);
105         dst->ln_flags = le32_to_cpu(src->ln_flags);
106         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
107         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
108         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
109         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
110         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
111         dst->ln_time_last_checkpoint =
112                                 le64_to_cpu(src->ln_time_last_checkpoint);
113         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
114                                  &src->ln_pos_latest_start);
115         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
116                                  &src->ln_pos_last_checkpoint);
117         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
118                                  &src->ln_pos_first_inconsistent);
119         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
120         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
121         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
122         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
123         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
124         dst->ln_objs_repaired_phase2 =
125                                 le64_to_cpu(src->ln_objs_repaired_phase2);
126         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
127         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
128         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
129                       &src->ln_fid_latest_scanned_phase2);
130         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
131         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
132         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
133         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
134         dst->ln_unknown_inconsistency =
135                                 le64_to_cpu(src->ln_unknown_inconsistency);
136         dst->ln_unmatched_pairs_repaired =
137                                 le64_to_cpu(src->ln_unmatched_pairs_repaired);
138         dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
139         dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
140         dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
141         dst->ln_lost_dirent_repaired =
142                                 le64_to_cpu(src->ln_lost_dirent_repaired);
143         dst->ln_striped_dirs_scanned =
144                                 le64_to_cpu(src->ln_striped_dirs_scanned);
145         dst->ln_striped_dirs_repaired =
146                                 le64_to_cpu(src->ln_striped_dirs_repaired);
147         dst->ln_striped_dirs_failed =
148                                 le64_to_cpu(src->ln_striped_dirs_failed);
149         dst->ln_striped_dirs_disabled =
150                                 le64_to_cpu(src->ln_striped_dirs_disabled);
151         dst->ln_striped_dirs_skipped =
152                                 le64_to_cpu(src->ln_striped_dirs_skipped);
153         dst->ln_striped_shards_scanned =
154                                 le64_to_cpu(src->ln_striped_shards_scanned);
155         dst->ln_striped_shards_repaired =
156                                 le64_to_cpu(src->ln_striped_shards_repaired);
157         dst->ln_striped_shards_failed =
158                                 le64_to_cpu(src->ln_striped_shards_failed);
159         dst->ln_striped_shards_skipped =
160                                 le64_to_cpu(src->ln_striped_shards_skipped);
161         dst->ln_name_hash_repaired = le64_to_cpu(src->ln_name_hash_repaired);
162         dst->ln_local_lpf_scanned = le64_to_cpu(src->ln_local_lpf_scanned);
163         dst->ln_local_lpf_moved = le64_to_cpu(src->ln_local_lpf_moved);
164         dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped);
165         dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed);
166         dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size);
167 }
168
169 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
170                                       struct lfsck_namespace *src)
171 {
172         dst->ln_magic = cpu_to_le32(src->ln_magic);
173         dst->ln_status = cpu_to_le32(src->ln_status);
174         dst->ln_flags = cpu_to_le32(src->ln_flags);
175         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
176         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
177         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
178         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
179         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
180         dst->ln_time_last_checkpoint =
181                                 cpu_to_le64(src->ln_time_last_checkpoint);
182         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
183                                  &src->ln_pos_latest_start);
184         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
185                                  &src->ln_pos_last_checkpoint);
186         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
187                                  &src->ln_pos_first_inconsistent);
188         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
189         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
190         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
191         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
192         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
193         dst->ln_objs_repaired_phase2 =
194                                 cpu_to_le64(src->ln_objs_repaired_phase2);
195         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
196         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
197         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
198                       &src->ln_fid_latest_scanned_phase2);
199         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
200         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
201         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
202         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
203         dst->ln_unknown_inconsistency =
204                                 cpu_to_le64(src->ln_unknown_inconsistency);
205         dst->ln_unmatched_pairs_repaired =
206                                 cpu_to_le64(src->ln_unmatched_pairs_repaired);
207         dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
208         dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
209         dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
210         dst->ln_lost_dirent_repaired =
211                                 cpu_to_le64(src->ln_lost_dirent_repaired);
212         dst->ln_striped_dirs_scanned =
213                                 cpu_to_le64(src->ln_striped_dirs_scanned);
214         dst->ln_striped_dirs_repaired =
215                                 cpu_to_le64(src->ln_striped_dirs_repaired);
216         dst->ln_striped_dirs_failed =
217                                 cpu_to_le64(src->ln_striped_dirs_failed);
218         dst->ln_striped_dirs_disabled =
219                                 cpu_to_le64(src->ln_striped_dirs_disabled);
220         dst->ln_striped_dirs_skipped =
221                                 cpu_to_le64(src->ln_striped_dirs_skipped);
222         dst->ln_striped_shards_scanned =
223                                 cpu_to_le64(src->ln_striped_shards_scanned);
224         dst->ln_striped_shards_repaired =
225                                 cpu_to_le64(src->ln_striped_shards_repaired);
226         dst->ln_striped_shards_failed =
227                                 cpu_to_le64(src->ln_striped_shards_failed);
228         dst->ln_striped_shards_skipped =
229                                 cpu_to_le64(src->ln_striped_shards_skipped);
230         dst->ln_name_hash_repaired = cpu_to_le64(src->ln_name_hash_repaired);
231         dst->ln_local_lpf_scanned = cpu_to_le64(src->ln_local_lpf_scanned);
232         dst->ln_local_lpf_moved = cpu_to_le64(src->ln_local_lpf_moved);
233         dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped);
234         dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed);
235         dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size);
236 }
237
238 static void lfsck_namespace_record_failure(const struct lu_env *env,
239                                            struct lfsck_instance *lfsck,
240                                            struct lfsck_namespace *ns)
241 {
242         struct lfsck_position pos;
243
244         ns->ln_items_failed++;
245         lfsck_pos_fill(env, lfsck, &pos, false);
246         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
247             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
248                 ns->ln_pos_first_inconsistent = pos;
249
250                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
251                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
252                        lfsck_lfsck2name(lfsck),
253                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
254                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
255                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
256         }
257 }
258
259 /**
260  * Load the MDT bitmap from the lfsck_namespace trace file.
261  *
262  * \param[in] env       pointer to the thread context
263  * \param[in] com       pointer to the lfsck component
264  *
265  * \retval              0 for success
266  * \retval              negative error number on failure or data corruption
267  */
268 static int lfsck_namespace_load_bitmap(const struct lu_env *env,
269                                        struct lfsck_component *com)
270 {
271         struct dt_object                *obj    = com->lc_obj;
272         struct lfsck_assistant_data     *lad    = com->lc_data;
273         struct lfsck_namespace          *ns     = com->lc_file_ram;
274         cfs_bitmap_t                    *bitmap = lad->lad_bitmap;
275         ssize_t                          size;
276         __u32                            nbits;
277         int                              rc;
278         ENTRY;
279
280         if (com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size >
281             ns->ln_bitmap_size)
282                 nbits = com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size;
283         else
284                 nbits = ns->ln_bitmap_size;
285
286         if (unlikely(nbits < BITS_PER_LONG))
287                 nbits = BITS_PER_LONG;
288
289         if (nbits > bitmap->size) {
290                 __u32 new_bits = bitmap->size;
291                 cfs_bitmap_t *new_bitmap;
292
293                 while (new_bits < nbits)
294                         new_bits <<= 1;
295
296                 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
297                 if (new_bitmap == NULL)
298                         RETURN(-ENOMEM);
299
300                 lad->lad_bitmap = new_bitmap;
301                 CFS_FREE_BITMAP(bitmap);
302                 bitmap = new_bitmap;
303         }
304
305         if (ns->ln_bitmap_size == 0) {
306                 lad->lad_incomplete = 0;
307                 CFS_RESET_BITMAP(bitmap);
308
309                 RETURN(0);
310         }
311
312         size = (ns->ln_bitmap_size + 7) >> 3;
313         rc = dt_xattr_get(env, obj,
314                           lfsck_buf_get(env, bitmap->data, size),
315                           XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA);
316         if (rc != size)
317                 RETURN(rc >= 0 ? -EINVAL : rc);
318
319         if (cfs_bitmap_check_empty(bitmap))
320                 lad->lad_incomplete = 0;
321         else
322                 lad->lad_incomplete = 1;
323
324         RETURN(0);
325 }
326
327 /**
328  * Load namespace LFSCK statistics information from the trace file.
329  *
330  * For old release (Lustre-2.6 or older), the statistics information was
331  * stored as XATTR_NAME_LFSCK_NAMESPACE_OLD EA. But in Lustre-2.7, we need
332  * more statistics information. To avoid confusing old MDT when downgrade,
333  * Lustre-2.7 stores the namespace LFSCK statistics information as new
334  * XATTR_NAME_LFSCK_NAMESPACE EA.
335  *
336  * \param[in] env       pointer to the thread context
337  * \param[in] com       pointer to the lfsck component
338  *
339  * \retval              0 for success
340  * \retval              negative error number on failure
341  */
342 static int lfsck_namespace_load(const struct lu_env *env,
343                                 struct lfsck_component *com)
344 {
345         int len = com->lc_file_size;
346         int rc;
347
348         rc = dt_xattr_get(env, com->lc_obj,
349                           lfsck_buf_get(env, com->lc_file_disk, len),
350                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
351         if (rc == len) {
352                 struct lfsck_namespace *ns = com->lc_file_ram;
353
354                 lfsck_namespace_le_to_cpu(ns,
355                                 (struct lfsck_namespace *)com->lc_file_disk);
356                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
357                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
358                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
359                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
360                         rc = -ESTALE;
361                 } else {
362                         rc = 0;
363                 }
364         } else if (rc != -ENODATA) {
365                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
366                        "expected = %d: rc = %d\n",
367                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
368                 if (rc >= 0)
369                         rc = -ESTALE;
370         } else {
371                 /* Check whether it is old trace file or not.
372                  * If yes, it should be reset via returning -ESTALE. */
373                 rc = dt_xattr_get(env, com->lc_obj,
374                                   lfsck_buf_get(env, com->lc_file_disk, len),
375                                   XATTR_NAME_LFSCK_NAMESPACE_OLD, BYPASS_CAPA);
376                 if (rc >= 0)
377                         rc = -ESTALE;
378         }
379
380         return rc;
381 }
382
383 static int lfsck_namespace_store(const struct lu_env *env,
384                                  struct lfsck_component *com, bool init)
385 {
386         struct dt_object                *obj    = com->lc_obj;
387         struct lfsck_instance           *lfsck  = com->lc_lfsck;
388         struct lfsck_namespace          *ns     = com->lc_file_ram;
389         struct lfsck_assistant_data     *lad    = com->lc_data;
390         cfs_bitmap_t                    *bitmap = NULL;
391         struct thandle                  *handle;
392         __u32                            nbits  = 0;
393         int                              len    = com->lc_file_size;
394         int                              rc;
395 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
396         struct lu_buf            tbuf   = { &len, sizeof(len) };
397 #endif
398         ENTRY;
399
400         if (lad != NULL) {
401                 bitmap = lad->lad_bitmap;
402                 nbits = bitmap->size;
403
404                 LASSERT(nbits > 0);
405                 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
406         }
407
408         ns->ln_bitmap_size = nbits;
409         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
410                                   ns);
411         handle = dt_trans_create(env, lfsck->li_bottom);
412         if (IS_ERR(handle))
413                 GOTO(log, rc = PTR_ERR(handle));
414
415         rc = dt_declare_xattr_set(env, obj,
416                                   lfsck_buf_get(env, com->lc_file_disk, len),
417                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
418         if (rc != 0)
419                 GOTO(out, rc);
420
421         if (bitmap != NULL) {
422                 rc = dt_declare_xattr_set(env, obj,
423                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
424                                 XATTR_NAME_LFSCK_BITMAP, 0, handle);
425                 if (rc != 0)
426                         GOTO(out, rc);
427         }
428
429 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
430         /* To be compatible with old Lustre-2.x MDT (x <= 6), generate dummy
431          * XATTR_NAME_LFSCK_NAMESPACE_OLD EA, then when downgrade to Lustre-2.x,
432          * the old LFSCK will find "invalid" XATTR_NAME_LFSCK_NAMESPACE_OLD EA,
433          * then reset the namespace LFSCK trace file. */
434         if (init) {
435                 rc = dt_declare_xattr_set(env, obj, &tbuf,
436                                           XATTR_NAME_LFSCK_NAMESPACE_OLD,
437                                           LU_XATTR_CREATE, handle);
438                 if (rc != 0)
439                         GOTO(out, rc);
440         }
441 #endif
442
443         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
444         if (rc != 0)
445                 GOTO(out, rc);
446
447         rc = dt_xattr_set(env, obj,
448                           lfsck_buf_get(env, com->lc_file_disk, len),
449                           XATTR_NAME_LFSCK_NAMESPACE, 0, handle, BYPASS_CAPA);
450         if (rc == 0 && bitmap != NULL)
451                 rc = dt_xattr_set(env, obj,
452                                   lfsck_buf_get(env, bitmap->data, nbits >> 3),
453                                   XATTR_NAME_LFSCK_BITMAP, 0, handle,
454                                   BYPASS_CAPA);
455
456 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
457         if (rc == 0 && init)
458                 rc = dt_xattr_set(env, obj, &tbuf,
459                                   XATTR_NAME_LFSCK_NAMESPACE_OLD,
460                                   LU_XATTR_CREATE, handle, BYPASS_CAPA);
461 #endif
462
463         GOTO(out, rc);
464
465 out:
466         dt_trans_stop(env, lfsck->li_bottom, handle);
467
468 log:
469         if (rc != 0)
470                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
471                        lfsck_lfsck2name(lfsck), rc);
472         return rc;
473 }
474
475 static struct dt_object *
476 lfsck_namespace_load_one_trace_file(const struct lu_env *env,
477                                     struct lfsck_component *com,
478                                     struct dt_object *parent,
479                                     const char *name,
480                                     const struct dt_index_features *ft,
481                                     bool reset)
482 {
483         struct lfsck_instance   *lfsck = com->lc_lfsck;
484         struct dt_object        *obj;
485         int                      rc;
486
487         if (reset) {
488                 rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
489                 if (rc != 0 && rc != -ENOENT)
490                         return ERR_PTR(rc);
491         }
492
493         if (ft != NULL)
494                 obj = local_index_find_or_create(env, lfsck->li_los, parent,
495                                         name, S_IFREG | S_IRUGO | S_IWUSR, ft);
496         else
497                 obj = local_file_find_or_create(env, lfsck->li_los, parent,
498                                         name, S_IFREG | S_IRUGO | S_IWUSR);
499
500         return obj;
501 }
502
503 static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env,
504                                                 struct lfsck_component *com,
505                                                 bool reset)
506 {
507         char                            *name = lfsck_env_info(env)->lti_key;
508         struct lfsck_sub_trace_obj      *lsto;
509         struct dt_object                *obj;
510         int                              rc;
511         int                              i;
512
513         for (i = 0, lsto = &com->lc_sub_trace_objs[0];
514              i < LFSCK_STF_COUNT; i++, lsto++) {
515                 snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i);
516                 if (lsto->lsto_obj != NULL) {
517                         if (!reset)
518                                 continue;
519
520                         lu_object_put(env, &lsto->lsto_obj->do_lu);
521                         lsto->lsto_obj = NULL;
522                 }
523
524                 obj = lfsck_namespace_load_one_trace_file(env, com,
525                                         com->lc_lfsck->li_lfsck_dir,
526                                         name, &dt_lfsck_features, reset);
527                 if (IS_ERR(obj))
528                         return PTR_ERR(obj);
529
530                 lsto->lsto_obj = obj;
531                 rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
532                 if (rc != 0)
533                         return rc;
534         }
535
536         return 0;
537 }
538
539 static int lfsck_namespace_init(const struct lu_env *env,
540                                 struct lfsck_component *com)
541 {
542         struct lfsck_namespace *ns = com->lc_file_ram;
543         int rc;
544
545         memset(ns, 0, sizeof(*ns));
546         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
547         ns->ln_status = LS_INIT;
548         down_write(&com->lc_sem);
549         rc = lfsck_namespace_store(env, com, true);
550         up_write(&com->lc_sem);
551         if (rc == 0)
552                 rc = lfsck_namespace_load_sub_trace_files(env, com, true);
553
554         return rc;
555 }
556
557 /**
558  * Update the namespace LFSCK trace file for the given @fid
559  *
560  * \param[in] env       pointer to the thread context
561  * \param[in] com       pointer to the lfsck component
562  * \param[in] fid       the fid which flags to be updated in the lfsck
563  *                      trace file
564  * \param[in] add       true if add new flags, otherwise remove flags
565  *
566  * \retval              0 for succeed or nothing to be done
567  * \retval              negative error number on failure
568  */
569 int lfsck_namespace_trace_update(const struct lu_env *env,
570                                  struct lfsck_component *com,
571                                  const struct lu_fid *fid,
572                                  const __u8 flags, bool add)
573 {
574         struct lfsck_instance   *lfsck  = com->lc_lfsck;
575         struct dt_object        *obj;
576         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
577         struct dt_device        *dev    = lfsck->li_bottom;
578         struct thandle          *th     = NULL;
579         int                      idx;
580         int                      rc     = 0;
581         __u8                     old    = 0;
582         __u8                     new    = 0;
583         ENTRY;
584
585         LASSERT(flags != 0);
586
587         if (unlikely(!fid_is_sane(fid)))
588                 RETURN(0);
589
590         idx = lfsck_sub_trace_file_fid2idx(fid);
591         obj = com->lc_sub_trace_objs[idx].lsto_obj;
592         mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
593         fid_cpu_to_be(key, fid);
594         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
595                        (const struct dt_key *)key, BYPASS_CAPA);
596         if (rc == -ENOENT) {
597                 if (!add)
598                         GOTO(unlock, rc = 0);
599
600                 old = 0;
601                 new = flags;
602         } else if (rc == 0) {
603                 if (add) {
604                         if ((old & flags) == flags)
605                                 GOTO(unlock, rc = 0);
606
607                         new = old | flags;
608                 } else {
609                         if ((old & flags) == 0)
610                                 GOTO(unlock, rc = 0);
611
612                         new = old & ~flags;
613                 }
614         } else {
615                 GOTO(log, rc);
616         }
617
618         th = dt_trans_create(env, dev);
619         if (IS_ERR(th))
620                 GOTO(log, rc = PTR_ERR(th));
621
622         if (old != 0) {
623                 rc = dt_declare_delete(env, obj,
624                                        (const struct dt_key *)key, th);
625                 if (rc != 0)
626                         GOTO(log, rc);
627         }
628
629         if (new != 0) {
630                 rc = dt_declare_insert(env, obj,
631                                        (const struct dt_rec *)&new,
632                                        (const struct dt_key *)key, th);
633                 if (rc != 0)
634                         GOTO(log, rc);
635         }
636
637         rc = dt_trans_start_local(env, dev, th);
638         if (rc != 0)
639                 GOTO(log, rc);
640
641         if (old != 0) {
642                 rc = dt_delete(env, obj, (const struct dt_key *)key,
643                                th, BYPASS_CAPA);
644                 if (rc != 0)
645                         GOTO(log, rc);
646         }
647
648         if (new != 0) {
649                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
650                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
651                 if (rc != 0)
652                         GOTO(log, rc);
653         }
654
655         GOTO(log, rc);
656
657 log:
658         if (th != NULL && !IS_ERR(th))
659                 dt_trans_stop(env, dev, th);
660
661         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
662                "trace file, flags %x, old %x, new %x: rc = %d\n",
663                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
664                (__u32)flags, (__u32)old, (__u32)new, rc);
665
666 unlock:
667         mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
668
669         return rc;
670 }
671
672 int lfsck_namespace_check_exist(const struct lu_env *env,
673                                 struct dt_object *dir,
674                                 struct dt_object *obj, const char *name)
675 {
676         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
677         int               rc;
678         ENTRY;
679
680         if (unlikely(lfsck_is_dead_obj(obj)))
681                 RETURN(LFSCK_NAMEENTRY_DEAD);
682
683         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
684                        (const struct dt_key *)name, BYPASS_CAPA);
685         if (rc == -ENOENT)
686                 RETURN(LFSCK_NAMEENTRY_REMOVED);
687
688         if (rc < 0)
689                 RETURN(rc);
690
691         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
692                 RETURN(LFSCK_NAMEENTRY_RECREATED);
693
694         RETURN(0);
695 }
696
697 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
698                                             struct dt_object *obj,
699                                             struct thandle *handle)
700 {
701         int rc;
702
703         /* For destroying all invalid linkEA entries. */
704         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
705         if (rc != 0)
706                 return rc;
707
708         /* For insert new linkEA entry. */
709         rc = dt_declare_xattr_set(env, obj,
710                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
711                         XATTR_NAME_LINK, 0, handle);
712         return rc;
713 }
714
715 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
716                        struct linkea_data *ldata)
717 {
718         int rc;
719
720         if (ldata->ld_buf->lb_buf == NULL)
721                 return -ENOMEM;
722
723         if (!dt_object_exists(obj))
724                 return -ENOENT;
725
726         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
727         if (rc == -ERANGE) {
728                 /* Buf was too small, figure out what we need. */
729                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
730                                   BYPASS_CAPA);
731                 if (rc <= 0)
732                         return rc;
733
734                 lu_buf_realloc(ldata->ld_buf, rc);
735                 if (ldata->ld_buf->lb_buf == NULL)
736                         return -ENOMEM;
737
738                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
739                                   BYPASS_CAPA);
740         }
741
742         if (rc > 0)
743                 rc = linkea_init(ldata);
744
745         return rc;
746 }
747
748 /**
749  * Remove linkEA for the given object.
750  *
751  * The caller should take the ldlm lock before the calling.
752  *
753  * \param[in] env       pointer to the thread context
754  * \param[in] com       pointer to the lfsck component
755  * \param[in] obj       pointer to the dt_object to be handled
756  *
757  * \retval              0 for repaired cases
758  * \retval              negative error number on failure
759  */
760 static int lfsck_namespace_links_remove(const struct lu_env *env,
761                                         struct lfsck_component *com,
762                                         struct dt_object *obj)
763 {
764         struct lfsck_instance           *lfsck  = com->lc_lfsck;
765         struct dt_device                *dev    = lfsck->li_bottom;
766         struct thandle                  *th     = NULL;
767         int                              rc     = 0;
768         ENTRY;
769
770         LASSERT(dt_object_remote(obj) == 0);
771
772         th = dt_trans_create(env, dev);
773         if (IS_ERR(th))
774                 GOTO(log, rc = PTR_ERR(th));
775
776         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
777         if (rc != 0)
778                 GOTO(stop, rc);
779
780         rc = dt_trans_start_local(env, dev, th);
781         if (rc != 0)
782                 GOTO(stop, rc);
783
784         dt_write_lock(env, obj, 0);
785         if (unlikely(lfsck_is_dead_obj(obj)))
786                 GOTO(unlock, rc = -ENOENT);
787
788         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
789                 GOTO(unlock, rc = 0);
790
791         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
792
793         GOTO(unlock, rc);
794
795 unlock:
796         dt_write_unlock(env, obj);
797
798 stop:
799         dt_trans_stop(env, dev, th);
800
801 log:
802         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
803                "for the object "DFID": rc = %d\n",
804                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
805
806         if (rc == 0) {
807                 struct lfsck_namespace *ns = com->lc_file_ram;
808
809                 ns->ln_flags |= LF_INCONSISTENT;
810         }
811
812         return rc;
813 }
814
815 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
816                              struct linkea_data *ldata, struct thandle *handle)
817 {
818         const struct lu_buf *buf = lfsck_buf_get_const(env,
819                                                        ldata->ld_buf->lb_buf,
820                                                        ldata->ld_leh->leh_len);
821
822         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
823                             BYPASS_CAPA);
824 }
825
826 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
827                                                 struct lu_name *cname,
828                                                 struct lu_fid *pfid,
829                                                 char *buf)
830 {
831         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
832         /* To guarantee the 'name' is terminated with '0'. */
833         memcpy(buf, cname->ln_name, cname->ln_namelen);
834         buf[cname->ln_namelen] = 0;
835         cname->ln_name = buf;
836 }
837
838 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
839                                                struct lu_name *cname,
840                                                struct lu_fid *pfid,
841                                                bool remove)
842 {
843         struct link_ea_entry    *oldlee;
844         int                      oldlen;
845         int                      repeated = 0;
846
847         oldlee = ldata->ld_lee;
848         oldlen = ldata->ld_reclen;
849         linkea_next_entry(ldata);
850         while (ldata->ld_lee != NULL) {
851                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
852                                    ldata->ld_lee->lee_reclen[1];
853                 if (unlikely(ldata->ld_reclen == oldlen &&
854                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
855                         repeated++;
856                         if (!remove)
857                                 break;
858
859                         linkea_del_buf(ldata, cname);
860                 } else {
861                         linkea_next_entry(ldata);
862                 }
863         }
864         ldata->ld_lee = oldlee;
865         ldata->ld_reclen = oldlen;
866
867         return repeated;
868 }
869
870 /**
871  * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
872  *
873  * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
874  * with the given type to generate the name, the detailed rules for name
875  * have been described as following.
876  *
877  * The function also generates the linkEA corresponding to the name entry
878  * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
879  *
880  * \param[in] env       pointer to the thread context
881  * \param[in] com       pointer to the lfsck component
882  * \param[in] orphan    pointer to the orphan MDT-object
883  * \param[in] infix     additional information for the orphan name, such as
884  *                      the FID for original
885  * \param[in] type      the type for describing why the orphan MDT-object is
886  *                      created. The rules are as following:
887  *
888  *  type "D":           The MDT-object is a directory, it may knows its parent
889  *                      but because there is no valid linkEA, the LFSCK cannot
890  *                      know where to put it back to the namespace.
891  *  type "O":           The MDT-object has no linkEA, and there is no name
892  *                      entry that references the MDT-object.
893  *
894  *  type "S":           The orphan MDT-object is a shard of a striped directory
895  *
896  * \see lfsck_layout_recreate_parent() for more types.
897  *
898  * The orphan name will be like:
899  * ${FID}-${infix}-${type}-${conflict_version}
900  *
901  * \param[out] count    if some others inserted some linkEA entries by race,
902  *                      then return the linkEA entries count.
903  *
904  * \retval              positive number for repaired cases
905  * \retval              0 if needs to repair nothing
906  * \retval              negative error number on failure
907  */
908 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
909                                          struct lfsck_component *com,
910                                          struct dt_object *orphan,
911                                          const char *infix, const char *type,
912                                          int *count)
913 {
914         struct lfsck_thread_info        *info   = lfsck_env_info(env);
915         struct lu_name                  *cname  = &info->lti_name;
916         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
917         struct lu_attr                  *la     = &info->lti_la3;
918         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
919         const struct lu_fid             *pfid;
920         struct lu_fid                    tfid;
921         struct lfsck_instance           *lfsck  = com->lc_lfsck;
922         struct dt_device                *dev    = lfsck->li_bottom;
923         struct dt_object                *parent;
924         struct thandle                  *th     = NULL;
925         struct lustre_handle             plh    = { 0 };
926         struct lustre_handle             clh    = { 0 };
927         struct linkea_data               ldata  = { NULL };
928         struct lu_buf                    linkea_buf;
929         int                              namelen;
930         int                              idx    = 0;
931         int                              rc     = 0;
932         bool                             exist  = false;
933         ENTRY;
934
935         cname->ln_name = NULL;
936         if (unlikely(lfsck->li_lpf_obj == NULL))
937                 GOTO(log, rc = -ENXIO);
938
939         parent = lfsck->li_lpf_obj;
940         pfid = lfsck_dto2fid(parent);
941
942         /* Hold update lock on the parent to prevent others to access. */
943         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
944                               MDS_INODELOCK_UPDATE, LCK_EX);
945         if (rc != 0)
946                 GOTO(log, rc);
947
948         do {
949                 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
950                                    PFID(cfid), infix, type, idx++);
951                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
952                                (const struct dt_key *)info->lti_key,
953                                BYPASS_CAPA);
954                 if (rc != 0 && rc != -ENOENT)
955                         GOTO(log, rc);
956
957                 if (unlikely(rc == 0 && lu_fid_eq(cfid, &tfid)))
958                         exist = true;
959         } while (rc == 0 && !exist);
960
961         cname->ln_name = info->lti_key;
962         cname->ln_namelen = namelen;
963         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
964         if (rc != 0)
965                 GOTO(log, rc);
966
967         rc = linkea_add_buf(&ldata, cname, pfid);
968         if (rc != 0)
969                 GOTO(log, rc);
970
971         rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
972                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
973                               LCK_EX);
974         if (rc != 0)
975                 GOTO(log, rc);
976
977         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
978                        ldata.ld_leh->leh_len);
979         th = dt_trans_create(env, dev);
980         if (IS_ERR(th))
981                 GOTO(log, rc = PTR_ERR(th));
982
983         if (S_ISDIR(lfsck_object_type(orphan))) {
984                 rc = dt_declare_delete(env, orphan,
985                                        (const struct dt_key *)dotdot, th);
986                 if (rc != 0)
987                         GOTO(stop, rc);
988
989                 rec->rec_type = S_IFDIR;
990                 rec->rec_fid = pfid;
991                 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
992                                        (const struct dt_key *)dotdot, th);
993                 if (rc != 0)
994                         GOTO(stop, rc);
995         }
996
997         rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
998                                   XATTR_NAME_LINK, 0, th);
999         if (rc != 0)
1000                 GOTO(stop, rc);
1001
1002         if (!exist) {
1003                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
1004                 rec->rec_fid = cfid;
1005                 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1006                                        (const struct dt_key *)cname->ln_name,
1007                                        th);
1008                 if (rc != 0)
1009                         GOTO(stop, rc);
1010
1011                 if (S_ISDIR(rec->rec_type)) {
1012                         rc = dt_declare_ref_add(env, parent, th);
1013                         if (rc != 0)
1014                                 GOTO(stop, rc);
1015                 }
1016         }
1017
1018         memset(la, 0, sizeof(*la));
1019         la->la_ctime = cfs_time_current_sec();
1020         la->la_valid = LA_CTIME;
1021         rc = dt_declare_attr_set(env, orphan, la, th);
1022         if (rc != 0)
1023                 GOTO(stop, rc);
1024
1025         rc = dt_trans_start_local(env, dev, th);
1026         if (rc != 0)
1027                 GOTO(stop, rc);
1028
1029         dt_write_lock(env, orphan, 0);
1030         rc = lfsck_links_read(env, orphan, &ldata);
1031         if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
1032                    (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
1033                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1034                         GOTO(unlock, rc = 1);
1035
1036                 if (S_ISDIR(lfsck_object_type(orphan))) {
1037                         rc = dt_delete(env, orphan,
1038                                        (const struct dt_key *)dotdot, th,
1039                                        BYPASS_CAPA);
1040                         if (rc != 0)
1041                                 GOTO(unlock, rc);
1042
1043                         rec->rec_type = S_IFDIR;
1044                         rec->rec_fid = pfid;
1045                         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
1046                                        (const struct dt_key *)dotdot, th,
1047                                        BYPASS_CAPA, 1);
1048                         if (rc != 0)
1049                                 GOTO(unlock, rc);
1050                 }
1051
1052                 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
1053                                   th, BYPASS_CAPA);
1054         } else {
1055                 if (rc == 0 && count != NULL)
1056                         *count = ldata.ld_leh->leh_reccount;
1057
1058                 GOTO(unlock, rc);
1059         }
1060         dt_write_unlock(env, orphan);
1061
1062         if (rc == 0 && !exist) {
1063                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
1064                 rec->rec_fid = cfid;
1065                 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1066                                (const struct dt_key *)cname->ln_name,
1067                                th, BYPASS_CAPA, 1);
1068                 if (rc == 0 && S_ISDIR(rec->rec_type)) {
1069                         dt_write_lock(env, parent, 0);
1070                         rc = dt_ref_add(env, parent, th);
1071                         dt_write_unlock(env, parent);
1072                 }
1073         }
1074
1075         if (rc == 0)
1076                 rc = dt_attr_set(env, orphan, la, th, BYPASS_CAPA);
1077
1078         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1079
1080 unlock:
1081         dt_write_unlock(env, orphan);
1082
1083 stop:
1084         dt_trans_stop(env, dev, th);
1085
1086 log:
1087         lfsck_ibits_unlock(&clh, LCK_EX);
1088         lfsck_ibits_unlock(&plh, LCK_EX);
1089         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
1090                "object "DFID", name = %s: rc = %d\n",
1091                lfsck_lfsck2name(lfsck), PFID(cfid),
1092                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1093
1094         if (rc != 0) {
1095                 struct lfsck_namespace *ns = com->lc_file_ram;
1096
1097                 ns->ln_flags |= LF_INCONSISTENT;
1098         }
1099
1100         return rc;
1101 }
1102
1103 /**
1104  * Add the specified name entry back to namespace.
1105  *
1106  * If there is a linkEA entry that back references a name entry under
1107  * some parent directory, but such parent directory does not have the
1108  * claimed name entry. On the other hand, the linkEA entries count is
1109  * not larger than the MDT-object's hard link count. Under such case,
1110  * it is quite possible that the name entry is lost. Then the LFSCK
1111  * should add the name entry back to the namespace.
1112  *
1113  * \param[in] env       pointer to the thread context
1114  * \param[in] com       pointer to the lfsck component
1115  * \param[in] parent    pointer to the directory under which the name entry
1116  *                      will be inserted into
1117  * \param[in] child     pointer to the object referenced by the name entry
1118  *                      that to be inserted into the parent
1119  * \param[in] name      the name for the child in the parent directory
1120  *
1121  * \retval              positive number for repaired cases
1122  * \retval              0 if nothing to be repaired
1123  * \retval              negative error number on failure
1124  */
1125 static int lfsck_namespace_insert_normal(const struct lu_env *env,
1126                                          struct lfsck_component *com,
1127                                          struct dt_object *parent,
1128                                          struct dt_object *child,
1129                                          const char *name)
1130 {
1131         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1132         struct lu_attr                  *la     = &info->lti_la;
1133         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1134         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1135         struct dt_device                *dev    = lfsck->li_next;
1136         struct thandle                  *th     = NULL;
1137         struct lustre_handle             lh     = { 0 };
1138         int                              rc     = 0;
1139         ENTRY;
1140
1141         if (unlikely(!dt_try_as_dir(env, parent)))
1142                 GOTO(log, rc = -ENOTDIR);
1143
1144         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1145                 GOTO(log, rc = 1);
1146
1147         /* Hold update lock on the parent to prevent others to access. */
1148         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1149                               MDS_INODELOCK_UPDATE, LCK_EX);
1150         if (rc != 0)
1151                 GOTO(log, rc);
1152
1153         th = dt_trans_create(env, dev);
1154         if (IS_ERR(th))
1155                 GOTO(unlock, rc = PTR_ERR(th));
1156
1157         rec->rec_type = lfsck_object_type(child) & S_IFMT;
1158         rec->rec_fid = lfsck_dto2fid(child);
1159         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1160                                (const struct dt_key *)name, th);
1161         if (rc != 0)
1162                 GOTO(stop, rc);
1163
1164         if (S_ISDIR(rec->rec_type)) {
1165                 rc = dt_declare_ref_add(env, parent, th);
1166                 if (rc != 0)
1167                         GOTO(stop, rc);
1168         }
1169
1170         memset(la, 0, sizeof(*la));
1171         la->la_ctime = cfs_time_current_sec();
1172         la->la_valid = LA_CTIME;
1173         rc = dt_declare_attr_set(env, parent, la, th);
1174         if (rc != 0)
1175                 GOTO(stop, rc);
1176
1177         rc = dt_declare_attr_set(env, child, la, th);
1178         if (rc != 0)
1179                 GOTO(stop, rc);
1180
1181         rc = dt_trans_start_local(env, dev, th);
1182         if (rc != 0)
1183                 GOTO(stop, rc);
1184
1185         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1186                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1187         if (rc != 0)
1188                 GOTO(stop, rc);
1189
1190         if (S_ISDIR(rec->rec_type)) {
1191                 dt_write_lock(env, parent, 0);
1192                 rc = dt_ref_add(env, parent, th);
1193                 dt_write_unlock(env, parent);
1194                 if (rc != 0)
1195                         GOTO(stop, rc);
1196         }
1197
1198         la->la_ctime = cfs_time_current_sec();
1199         rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA);
1200         if (rc != 0)
1201                 GOTO(stop, rc);
1202
1203         rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
1204
1205         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1206
1207 stop:
1208         dt_trans_stop(env, dev, th);
1209
1210 unlock:
1211         lfsck_ibits_unlock(&lh, LCK_EX);
1212
1213 log:
1214         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
1215                "the name %s and type %o to the parent "DFID": rc = %d\n",
1216                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name,
1217                lfsck_object_type(child) & S_IFMT,
1218                PFID(lfsck_dto2fid(parent)), rc);
1219
1220         if (rc != 0) {
1221                 struct lfsck_namespace *ns = com->lc_file_ram;
1222
1223                 ns->ln_flags |= LF_INCONSISTENT;
1224                 if (rc > 0)
1225                         ns->ln_lost_dirent_repaired++;
1226         }
1227
1228         return rc;
1229 }
1230
1231 /**
1232  * Create the specified orphan directory.
1233  *
1234  * For the case that the parent MDT-object stored in some MDT-object's
1235  * linkEA entry is lost, the LFSCK will re-create the parent object as
1236  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1237  * with the name ${FID}-P-${conflict_version}.
1238  *
1239  * \param[in] env       pointer to the thread context
1240  * \param[in] com       pointer to the lfsck component
1241  * \param[in] orphan    pointer to the orphan MDT-object to be created
1242  * \param[in] lmv       pointer to master LMV EA that will be set to the orphan
1243  *
1244  * \retval              positive number for repaired cases
1245  * \retval              negative error number on failure
1246  */
1247 static int lfsck_namespace_create_orphan_dir(const struct lu_env *env,
1248                                              struct lfsck_component *com,
1249                                              struct dt_object *orphan,
1250                                              struct lmv_mds_md_v1 *lmv)
1251 {
1252         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1253         struct lu_attr                  *la     = &info->lti_la;
1254         struct dt_allocation_hint       *hint   = &info->lti_hint;
1255         struct dt_object_format         *dof    = &info->lti_dof;
1256         struct lu_name                  *cname  = &info->lti_name2;
1257         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1258         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1259         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
1260         struct lu_fid                    tfid;
1261         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1262         struct lfsck_namespace          *ns     = com->lc_file_ram;
1263         struct dt_device                *dev;
1264         struct dt_object                *parent = NULL;
1265         struct dt_object                *child  = NULL;
1266         struct thandle                  *th     = NULL;
1267         struct lustre_handle             lh     = { 0 };
1268         struct linkea_data               ldata  = { NULL };
1269         struct lu_buf                    linkea_buf;
1270         struct lu_buf                    lmv_buf;
1271         char                             name[32];
1272         int                              namelen;
1273         int                              idx    = 0;
1274         int                              rc     = 0;
1275         ENTRY;
1276
1277         LASSERT(!dt_object_exists(orphan));
1278
1279         cname->ln_name = NULL;
1280         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1281                 GOTO(log, rc = 1);
1282
1283         if (dt_object_remote(orphan)) {
1284                 LASSERT(lfsck->li_lpf_root_obj != NULL);
1285
1286                 idx = lfsck_find_mdt_idx_by_fid(env, lfsck, cfid);
1287                 if (idx < 0)
1288                         GOTO(log, rc = idx);
1289
1290                 snprintf(name, 8, "MDT%04x", idx);
1291                 rc = dt_lookup(env, lfsck->li_lpf_root_obj,
1292                                (struct dt_rec *)&tfid,
1293                                (const struct dt_key *)name, BYPASS_CAPA);
1294                 if (rc != 0)
1295                         GOTO(log, rc = (rc == -ENOENT ? -ENXIO : rc));
1296
1297                 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
1298                 if (IS_ERR(parent))
1299                         GOTO(log, rc = PTR_ERR(parent));
1300
1301                 if (unlikely(!dt_try_as_dir(env, parent)))
1302                         GOTO(log, rc = -ENOTDIR);
1303         } else {
1304                 if (unlikely(lfsck->li_lpf_obj == NULL))
1305                         GOTO(log, rc = -ENXIO);
1306
1307                 parent = lfsck->li_lpf_obj;
1308         }
1309
1310         dev = lfsck_find_dev_by_fid(env, lfsck, cfid);
1311         if (IS_ERR(dev))
1312                 GOTO(log, rc = PTR_ERR(dev));
1313
1314         child = lfsck_object_find_by_dev(env, dev, cfid);
1315         if (IS_ERR(child))
1316                 GOTO(log, rc = PTR_ERR(child));
1317
1318         /* Hold update lock on the parent to prevent others to access. */
1319         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1320                               MDS_INODELOCK_UPDATE, LCK_EX);
1321         if (rc != 0)
1322                 GOTO(log, rc);
1323
1324         idx = 0;
1325         do {
1326                 namelen = snprintf(name, 31, DFID"-P-%d",
1327                                    PFID(cfid), idx++);
1328                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1329                                (const struct dt_key *)name, BYPASS_CAPA);
1330                 if (rc != 0 && rc != -ENOENT)
1331                         GOTO(unlock1, rc);
1332         } while (rc == 0);
1333
1334         cname->ln_name = name;
1335         cname->ln_namelen = namelen;
1336
1337         memset(la, 0, sizeof(*la));
1338         la->la_mode = S_IFDIR | 0700;
1339         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
1340                        LA_ATIME | LA_MTIME | LA_CTIME;
1341
1342         child->do_ops->do_ah_init(env, hint, parent, child,
1343                                   la->la_mode & S_IFMT);
1344
1345         memset(dof, 0, sizeof(*dof));
1346         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1347
1348         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
1349         if (rc != 0)
1350                 GOTO(unlock1, rc);
1351
1352         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
1353         if (rc != 0)
1354                 GOTO(unlock1, rc);
1355
1356         th = dt_trans_create(env, dev);
1357         if (IS_ERR(th))
1358                 GOTO(unlock1, rc = PTR_ERR(th));
1359
1360         /* Sync the remote transaction to guarantee that the subsequent
1361          * lock against the @orphan can find the @orphan in time. */
1362         if (dt_object_remote(orphan))
1363                 th->th_sync = 1;
1364
1365         rc = dt_declare_create(env, child, la, hint, dof, th);
1366         if (rc != 0)
1367                 GOTO(stop, rc);
1368
1369         if (unlikely(!dt_try_as_dir(env, child)))
1370                 GOTO(stop, rc = -ENOTDIR);
1371
1372         rec->rec_type = S_IFDIR;
1373         rec->rec_fid = cfid;
1374         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
1375                                (const struct dt_key *)dot, th);
1376         if (rc != 0)
1377                 GOTO(stop, rc);
1378
1379         rec->rec_fid = lfsck_dto2fid(parent);
1380         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
1381                                (const struct dt_key *)dotdot, th);
1382         if (rc == 0)
1383                 rc = dt_declare_ref_add(env, child, th);
1384
1385         if (rc != 0)
1386                 GOTO(stop, rc);
1387
1388         rc = dt_declare_ref_add(env, child, th);
1389         if (rc != 0)
1390                 GOTO(stop, rc);
1391
1392         if (lmv != NULL) {
1393                 lmv->lmv_magic = LMV_MAGIC;
1394                 lmv->lmv_master_mdt_index = lfsck_dev_idx(dev);
1395                 lfsck_lmv_header_cpu_to_le(lmv2, lmv);
1396                 lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
1397                 rc = dt_declare_xattr_set(env, child, &lmv_buf,
1398                                           XATTR_NAME_LMV, 0, th);
1399                 if (rc != 0)
1400                         GOTO(stop, rc);
1401         }
1402
1403         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1404                        ldata.ld_leh->leh_len);
1405         rc = dt_declare_xattr_set(env, child, &linkea_buf,
1406                                   XATTR_NAME_LINK, 0, th);
1407         if (rc != 0)
1408                 GOTO(stop, rc);
1409
1410         rec->rec_fid = cfid;
1411         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1412                                (const struct dt_key *)name, th);
1413         if (rc == 0)
1414                 rc = dt_declare_ref_add(env, parent, th);
1415
1416         if (rc != 0)
1417                 GOTO(stop, rc);
1418
1419         rc = dt_trans_start_local(env, dev, th);
1420         if (rc != 0)
1421                 GOTO(stop, rc);
1422
1423         dt_write_lock(env, child, 0);
1424         rc = dt_create(env, child, la, hint, dof, th);
1425         if (rc != 0)
1426                 GOTO(unlock2, rc);
1427
1428         rec->rec_fid = cfid;
1429         rc = dt_insert(env, child, (const struct dt_rec *)rec,
1430                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
1431         if (rc != 0)
1432                 GOTO(unlock2, rc);
1433
1434         rec->rec_fid = lfsck_dto2fid(parent);
1435         rc = dt_insert(env, child, (const struct dt_rec *)rec,
1436                        (const struct dt_key *)dotdot, th,
1437                        BYPASS_CAPA, 1);
1438         if (rc != 0)
1439                 GOTO(unlock2, rc);
1440
1441         rc = dt_ref_add(env, child, th);
1442         if (rc != 0)
1443                 GOTO(unlock2, rc);
1444
1445         if (lmv != NULL) {
1446                 rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV, 0,
1447                                   th, BYPASS_CAPA);
1448                 if (rc != 0)
1449                         GOTO(unlock2, rc);
1450         }
1451
1452         rc = dt_xattr_set(env, child, &linkea_buf,
1453                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1454         dt_write_unlock(env, child);
1455         if (rc != 0)
1456                 GOTO(stop, rc);
1457
1458         rec->rec_fid = cfid;
1459         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1460                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1461         if (rc == 0) {
1462                 dt_write_lock(env, parent, 0);
1463                 rc = dt_ref_add(env, parent, th);
1464                 dt_write_unlock(env, parent);
1465         }
1466
1467         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1468
1469 unlock2:
1470         dt_write_unlock(env, child);
1471
1472 stop:
1473         dt_trans_stop(env, dev, th);
1474
1475 unlock1:
1476         lfsck_ibits_unlock(&lh, LCK_EX);
1477
1478 log:
1479         CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for "
1480                "the object "DFID", name = %s: rc = %d\n",
1481                lfsck_lfsck2name(lfsck), PFID(cfid),
1482                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1483
1484         if (child != NULL && !IS_ERR(child))
1485                 lfsck_object_put(env, child);
1486
1487         if (parent != NULL && !IS_ERR(parent) && parent != lfsck->li_lpf_obj)
1488                 lfsck_object_put(env, parent);
1489
1490         if (rc != 0)
1491                 ns->ln_flags |= LF_INCONSISTENT;
1492
1493         return rc;
1494 }
1495
1496 /**
1497  * Remove the specified entry from the linkEA.
1498  *
1499  * Locate the linkEA entry with the given @cname and @pfid, then
1500  * remove this entry or the other entries those are repeated with
1501  * this entry.
1502  *
1503  * \param[in] env       pointer to the thread context
1504  * \param[in] com       pointer to the lfsck component
1505  * \param[in] obj       pointer to the dt_object to be handled
1506  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1507  * \param[in] cname     the name for the child in the parent directory
1508  * \param[in] pfid      the parent directory's FID for the linkEA
1509  * \param[in] next      if true, then remove the first found linkEA
1510  *                      entry, and move the ldata->ld_lee to next entry
1511  *
1512  * \retval              positive number for repaired cases
1513  * \retval              0 if nothing to be repaired
1514  * \retval              negative error number on failure
1515  */
1516 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
1517                                          struct lfsck_component *com,
1518                                          struct dt_object *obj,
1519                                          struct linkea_data *ldata,
1520                                          struct lu_name *cname,
1521                                          struct lu_fid *pfid,
1522                                          bool next)
1523 {
1524         struct lfsck_instance           *lfsck     = com->lc_lfsck;
1525         struct dt_device                *dev       = lfsck->li_bottom;
1526         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
1527         struct thandle                  *th        = NULL;
1528         struct lustre_handle             lh        = { 0 };
1529         struct linkea_data               ldata_new = { NULL };
1530         struct lu_buf                    linkea_buf;
1531         int                              rc        = 0;
1532         ENTRY;
1533
1534         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1535                               MDS_INODELOCK_UPDATE |
1536                               MDS_INODELOCK_XATTR, LCK_EX);
1537         if (rc != 0)
1538                 GOTO(log, rc);
1539
1540         if (next)
1541                 linkea_del_buf(ldata, cname);
1542         else
1543                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
1544                                                     true);
1545         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1546                        ldata->ld_leh->leh_len);
1547
1548 again:
1549         th = dt_trans_create(env, dev);
1550         if (IS_ERR(th))
1551                 GOTO(unlock1, rc = PTR_ERR(th));
1552
1553         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1554                                   XATTR_NAME_LINK, 0, th);
1555         if (rc != 0)
1556                 GOTO(stop, rc);
1557
1558         rc = dt_trans_start_local(env, dev, th);
1559         if (rc != 0)
1560                 GOTO(stop, rc);
1561
1562         dt_write_lock(env, obj, 0);
1563         if (unlikely(lfsck_is_dead_obj(obj)))
1564                 GOTO(unlock2, rc = -ENOENT);
1565
1566         rc = lfsck_links_read2(env, obj, &ldata_new);
1567         if (rc != 0)
1568                 GOTO(unlock2, rc);
1569
1570         /* The specified linkEA entry has been removed by race. */
1571         rc = linkea_links_find(&ldata_new, cname, pfid);
1572         if (rc != 0)
1573                 GOTO(unlock2, rc = 0);
1574
1575         if (bk->lb_param & LPF_DRYRUN)
1576                 GOTO(unlock2, rc = 1);
1577
1578         if (next)
1579                 linkea_del_buf(&ldata_new, cname);
1580         else
1581                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
1582                                                     true);
1583
1584         if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
1585                 dt_write_unlock(env, obj);
1586                 dt_trans_stop(env, dev, th);
1587                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1588                                ldata_new.ld_leh->leh_len);
1589                 goto again;
1590         }
1591
1592         lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1593                        ldata_new.ld_leh->leh_len);
1594         rc = dt_xattr_set(env, obj, &linkea_buf,
1595                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1596
1597         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1598
1599 unlock2:
1600         dt_write_unlock(env, obj);
1601
1602 stop:
1603         dt_trans_stop(env, dev, th);
1604
1605 unlock1:
1606         lfsck_ibits_unlock(&lh, LCK_EX);
1607
1608 log:
1609         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
1610                "for the object: "DFID", parent "DFID", name %.*s\n",
1611                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
1612                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
1613                cname->ln_name);
1614
1615         if (rc != 0) {
1616                 struct lfsck_namespace *ns = com->lc_file_ram;
1617
1618                 ns->ln_flags |= LF_INCONSISTENT;
1619         }
1620
1621         return rc;
1622 }
1623
1624 /**
1625  * Conditionally remove the specified entry from the linkEA.
1626  *
1627  * Take the parent lock firstly, then check whether the specified
1628  * name entry exists or not: if yes, do nothing; otherwise, call
1629  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
1630  *
1631  * \param[in] env       pointer to the thread context
1632  * \param[in] com       pointer to the lfsck component
1633  * \param[in] parent    pointer to the parent directory
1634  * \param[in] child     pointer to the child object that holds the linkEA
1635  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1636  * \param[in] cname     the name for the child in the parent directory
1637  * \param[in] pfid      the parent directory's FID for the linkEA
1638  *
1639  * \retval              positive number for repaired cases
1640  * \retval              0 if nothing to be repaired
1641  * \retval              negative error number on failure
1642  */
1643 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
1644                                               struct lfsck_component *com,
1645                                               struct dt_object *parent,
1646                                               struct dt_object *child,
1647                                               struct linkea_data *ldata,
1648                                               struct lu_name *cname,
1649                                               struct lu_fid *pfid)
1650 {
1651         struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
1652         struct lustre_handle     lh     = { 0 };
1653         int                      rc;
1654         ENTRY;
1655
1656         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
1657                               MDS_INODELOCK_UPDATE, LCK_EX);
1658         if (rc != 0)
1659                 RETURN(rc);
1660
1661         dt_read_lock(env, parent, 0);
1662         if (unlikely(lfsck_is_dead_obj(parent))) {
1663                 dt_read_unlock(env, parent);
1664                 lfsck_ibits_unlock(&lh, LCK_EX);
1665                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1666                                                    cname, pfid, true);
1667
1668                 RETURN(rc);
1669         }
1670
1671         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1672                        (const struct dt_key *)cname->ln_name,
1673                        BYPASS_CAPA);
1674         dt_read_unlock(env, parent);
1675
1676         /* It is safe to release the ldlm lock, because when the logic come
1677          * here, we have got all the needed information above whether the
1678          * linkEA entry is valid or not. It is not important that others
1679          * may add new linkEA entry after the ldlm lock released. If other
1680          * has removed the specified linkEA entry by race, then it is OK,
1681          * because the subsequent lfsck_namespace_shrink_linkea() can handle
1682          * such case. */
1683         lfsck_ibits_unlock(&lh, LCK_EX);
1684         if (rc == -ENOENT) {
1685                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1686                                                    cname, pfid, true);
1687
1688                 RETURN(rc);
1689         }
1690
1691         if (rc != 0)
1692                 RETURN(rc);
1693
1694         /* The LFSCK just found some internal status of cross-MDTs
1695          * create operation. That is normal. */
1696         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1697                 linkea_next_entry(ldata);
1698
1699                 RETURN(0);
1700         }
1701
1702         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1703                                            pfid, true);
1704
1705         RETURN(rc);
1706 }
1707
1708 /**
1709  * Conditionally replace name entry in the parent.
1710  *
1711  * As required, the LFSCK may re-create the lost MDT-object for dangling
1712  * name entry, but such repairing may be wrong because of bad FID in the
1713  * name entry. As the LFSCK processing, the real MDT-object may be found,
1714  * then the LFSCK should check whether the former re-created MDT-object
1715  * has been modified or not, if not, then destroy it and update the name
1716  * entry in the parent to reference the real MDT-object.
1717  *
1718  * \param[in] env       pointer to the thread context
1719  * \param[in] com       pointer to the lfsck component
1720  * \param[in] parent    pointer to the parent directory
1721  * \param[in] child     pointer to the MDT-object that may be the real
1722  *                      MDT-object corresponding to the name entry in parent
1723  * \param[in] cfid      the current FID in the name entry
1724  * \param[in] cname     contains the name of the child in the parent directory
1725  *
1726  * \retval              positive number for repaired cases
1727  * \retval              0 if nothing to be repaired
1728  * \retval              negative error number on failure
1729  */
1730 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1731                                         struct lfsck_component *com,
1732                                         struct dt_object *parent,
1733                                         struct dt_object *child,
1734                                         const struct lu_fid *cfid,
1735                                         const struct lu_name *cname)
1736 {
1737         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1738         struct lu_attr                  *la     = &info->lti_la;
1739         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1740         struct lu_fid                    tfid;
1741         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1742         struct dt_device                *dev    = lfsck->li_next;
1743         const char                      *name   = cname->ln_name;
1744         struct dt_object                *obj    = NULL;
1745         struct lustre_handle             plh    = { 0 };
1746         struct lustre_handle             clh    = { 0 };
1747         struct linkea_data               ldata  = { NULL };
1748         struct thandle                  *th     = NULL;
1749         bool                             exist  = true;
1750         int                              rc     = 0;
1751         ENTRY;
1752
1753         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
1754                               MDS_INODELOCK_UPDATE, LCK_EX);
1755         if (rc != 0)
1756                 GOTO(log, rc);
1757
1758         if (!fid_is_sane(cfid)) {
1759                 exist = false;
1760                 goto replace;
1761         }
1762
1763         obj = lfsck_object_find(env, lfsck, cfid);
1764         if (IS_ERR(obj)) {
1765                 rc = PTR_ERR(obj);
1766                 if (rc == -ENOENT) {
1767                         exist = false;
1768                         goto replace;
1769                 }
1770
1771                 GOTO(log, rc);
1772         }
1773
1774         if (!dt_object_exists(obj)) {
1775                 exist = false;
1776                 goto replace;
1777         }
1778
1779         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1780                        (const struct dt_key *)name, BYPASS_CAPA);
1781         if (rc == -ENOENT) {
1782                 exist = false;
1783                 goto replace;
1784         }
1785
1786         if (rc != 0)
1787                 GOTO(log, rc);
1788
1789         /* Someone changed the name entry, cannot replace it. */
1790         if (!lu_fid_eq(cfid, &tfid))
1791                 GOTO(log, rc = 0);
1792
1793         /* lock the object to be destroyed. */
1794         rc = lfsck_ibits_lock(env, lfsck, obj, &clh,
1795                               MDS_INODELOCK_UPDATE |
1796                               MDS_INODELOCK_XATTR, LCK_EX);
1797         if (rc != 0)
1798                 GOTO(log, rc);
1799
1800         if (unlikely(lfsck_is_dead_obj(obj))) {
1801                 exist = false;
1802                 goto replace;
1803         }
1804
1805         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1806         if (rc != 0)
1807                 GOTO(log, rc);
1808
1809         /* The object has been modified by other(s), or it is not created by
1810          * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1811         if (la->la_ctime != 0)
1812                 GOTO(log, rc);
1813
1814         if (S_ISREG(la->la_mode)) {
1815                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
1816                                   BYPASS_CAPA);
1817                 /* If someone has created related OST-object(s),
1818                  * then keep it. */
1819                 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1820                         GOTO(log, rc = (rc > 0 ? 0 : rc));
1821         }
1822
1823 replace:
1824         dt_read_lock(env, child, 0);
1825         rc = lfsck_links_read2(env, child, &ldata);
1826         dt_read_unlock(env, child);
1827
1828         /* Someone changed the child, no need to replace. */
1829         if (rc == -ENODATA)
1830                 GOTO(log, rc = 0);
1831
1832         if (rc != 0)
1833                 GOTO(log, rc);
1834
1835         rc = linkea_links_find(&ldata, cname, lfsck_dto2fid(parent));
1836         /* Someone moved the child, no need to replace. */
1837         if (rc != 0)
1838                 GOTO(log, rc = 0);
1839
1840         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1841                 GOTO(log, rc = 1);
1842
1843         th = dt_trans_create(env, dev);
1844         if (IS_ERR(th))
1845                 GOTO(log, rc = PTR_ERR(th));
1846
1847         if (exist) {
1848                 rc = dt_declare_destroy(env, obj, th);
1849                 if (rc != 0)
1850                         GOTO(stop, rc);
1851         }
1852
1853         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1854         if (rc != 0)
1855                 GOTO(stop, rc);
1856
1857         rec->rec_type = S_IFDIR;
1858         rec->rec_fid = lfsck_dto2fid(child);
1859         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1860                                (const struct dt_key *)name, th);
1861         if (rc != 0)
1862                 GOTO(stop, rc);
1863
1864         rc = dt_trans_start(env, dev, th);
1865         if (rc != 0)
1866                 GOTO(stop, rc);
1867
1868         if (exist) {
1869                 rc = dt_destroy(env, obj, th);
1870                 if (rc != 0)
1871                         GOTO(stop, rc);
1872         }
1873
1874         /* The old name entry maybe not exist. */
1875         dt_delete(env, parent, (const struct dt_key *)name, th,
1876                   BYPASS_CAPA);
1877
1878         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1879                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1880
1881         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1882
1883 stop:
1884         dt_trans_stop(env, dev, th);
1885
1886 log:
1887         lfsck_ibits_unlock(&clh, LCK_EX);
1888         lfsck_ibits_unlock(&plh, LCK_EX);
1889         if (obj != NULL && !IS_ERR(obj))
1890                 lfsck_object_put(env, obj);
1891
1892         CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1893                "object "DFID" because of conflict with the object "DFID
1894                " under the parent "DFID" with name %s: rc = %d\n",
1895                lfsck_lfsck2name(lfsck), PFID(cfid),
1896                PFID(lfsck_dto2fid(child)), PFID(lfsck_dto2fid(parent)),
1897                name, rc);
1898
1899         return rc;
1900 }
1901
1902 /**
1903  * Overwrite the linkEA for the object with the given ldata.
1904  *
1905  * The caller should take the ldlm lock before the calling.
1906  *
1907  * \param[in] env       pointer to the thread context
1908  * \param[in] com       pointer to the lfsck component
1909  * \param[in] obj       pointer to the dt_object to be handled
1910  * \param[in] ldata     pointer to the new linkEA data
1911  *
1912  * \retval              positive number for repaired cases
1913  * \retval              0 if nothing to be repaired
1914  * \retval              negative error number on failure
1915  */
1916 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1917                                    struct lfsck_component *com,
1918                                    struct dt_object *obj,
1919                                    struct linkea_data *ldata)
1920 {
1921         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1922         struct dt_device                *dev    = lfsck->li_bottom;
1923         struct thandle                  *th     = NULL;
1924         struct lu_buf                    linkea_buf;
1925         int                              rc     = 0;
1926         ENTRY;
1927
1928         th = dt_trans_create(env, dev);
1929         if (IS_ERR(th))
1930                 GOTO(log, rc = PTR_ERR(th));
1931
1932         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1933                        ldata->ld_leh->leh_len);
1934         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1935                                   XATTR_NAME_LINK, 0, th);
1936         if (rc != 0)
1937                 GOTO(stop, rc);
1938
1939         rc = dt_trans_start_local(env, dev, th);
1940         if (rc != 0)
1941                 GOTO(stop, rc);
1942
1943         dt_write_lock(env, obj, 0);
1944         if (unlikely(lfsck_is_dead_obj(obj)))
1945                 GOTO(unlock, rc = 0);
1946
1947         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1948                 GOTO(unlock, rc = 1);
1949
1950         rc = dt_xattr_set(env, obj, &linkea_buf,
1951                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1952
1953         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1954
1955 unlock:
1956         dt_write_unlock(env, obj);
1957
1958 stop:
1959         dt_trans_stop(env, dev, th);
1960
1961 log:
1962         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
1963                "object "DFID": rc = %d\n",
1964                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
1965
1966         if (rc != 0) {
1967                 struct lfsck_namespace *ns = com->lc_file_ram;
1968
1969                 ns->ln_flags |= LF_INCONSISTENT;
1970         }
1971
1972         return rc;
1973 }
1974
1975 /**
1976  * Repair invalid name entry.
1977  *
1978  * If the name entry contains invalid information, such as bad file type
1979  * or (and) corrupted object FID, then either remove the name entry or
1980  * udpate the name entry with the given (right) information.
1981  *
1982  * \param[in] env       pointer to the thread context
1983  * \param[in] com       pointer to the lfsck component
1984  * \param[in] parent    pointer to the parent directory
1985  * \param[in] child     pointer to the object referenced by the name entry
1986  * \param[in] name      the old name of the child under the parent directory
1987  * \param[in] name2     the new name of the child under the parent directory
1988  * \param[in] type      the type claimed by the name entry
1989  * \param[in] update    update the name entry if true; otherwise, remove it
1990  * \param[in] dec       decrease the parent nlink count if true
1991  *
1992  * \retval              positive number for repaired successfully
1993  * \retval              0 if nothing to be repaired
1994  * \retval              negative error number on failure
1995  */
1996 int lfsck_namespace_repair_dirent(const struct lu_env *env,
1997                                   struct lfsck_component *com,
1998                                   struct dt_object *parent,
1999                                   struct dt_object *child,
2000                                   const char *name, const char *name2,
2001                                   __u16 type, bool update, bool dec)
2002 {
2003         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
2004         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
2005         struct lu_fid            tfid;
2006         struct lfsck_instance   *lfsck  = com->lc_lfsck;
2007         struct dt_device        *dev    = lfsck->li_next;
2008         struct thandle          *th     = NULL;
2009         struct lustre_handle     lh     = { 0 };
2010         int                      rc     = 0;
2011         ENTRY;
2012
2013         if (unlikely(!dt_try_as_dir(env, parent)))
2014                 GOTO(log, rc = -ENOTDIR);
2015
2016         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2017                               MDS_INODELOCK_UPDATE, LCK_EX);
2018         if (rc != 0)
2019                 GOTO(log, rc);
2020
2021         th = dt_trans_create(env, dev);
2022         if (IS_ERR(th))
2023                 GOTO(unlock1, rc = PTR_ERR(th));
2024
2025         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2026         if (rc != 0)
2027                 GOTO(stop, rc);
2028
2029         if (update) {
2030                 rec->rec_type = lfsck_object_type(child) & S_IFMT;
2031                 rec->rec_fid = cfid;
2032                 rc = dt_declare_insert(env, parent,
2033                                        (const struct dt_rec *)rec,
2034                                        (const struct dt_key *)name2, th);
2035                 if (rc != 0)
2036                         GOTO(stop, rc);
2037         }
2038
2039         if (dec) {
2040                 rc = dt_declare_ref_del(env, parent, th);
2041                 if (rc != 0)
2042                         GOTO(stop, rc);
2043         }
2044
2045         rc = dt_trans_start(env, dev, th);
2046         if (rc != 0)
2047                 GOTO(stop, rc);
2048
2049         dt_write_lock(env, parent, 0);
2050         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2051                        (const struct dt_key *)name, BYPASS_CAPA);
2052         /* Someone has removed the bad name entry by race. */
2053         if (rc == -ENOENT)
2054                 GOTO(unlock2, rc = 0);
2055
2056         if (rc != 0)
2057                 GOTO(unlock2, rc);
2058
2059         /* Someone has removed the bad name entry and reused it for other
2060          * object by race. */
2061         if (!lu_fid_eq(&tfid, cfid))
2062                 GOTO(unlock2, rc = 0);
2063
2064         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2065                 GOTO(unlock2, rc = 1);
2066
2067         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2068                        BYPASS_CAPA);
2069         if (rc != 0)
2070                 GOTO(unlock2, rc);
2071
2072         if (update) {
2073                 rc = dt_insert(env, parent,
2074                                (const struct dt_rec *)rec,
2075                                (const struct dt_key *)name2, th,
2076                                BYPASS_CAPA, 1);
2077                 if (rc != 0)
2078                         GOTO(unlock2, rc);
2079         }
2080
2081         if (dec) {
2082                 rc = dt_ref_del(env, parent, th);
2083                 if (rc != 0)
2084                         GOTO(unlock2, rc);
2085         }
2086
2087         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2088
2089 unlock2:
2090         dt_write_unlock(env, parent);
2091
2092 stop:
2093         dt_trans_stop(env, dev, th);
2094
2095         /* We are not sure whether the child will become orphan or not.
2096          * Record it in the LFSCK trace file for further checking in
2097          * the second-stage scanning. */
2098         if (!update && !dec && rc == 0)
2099                 lfsck_namespace_trace_update(env, com, cfid,
2100                                              LNTF_CHECK_LINKEA, true);
2101
2102 unlock1:
2103         lfsck_ibits_unlock(&lh, LCK_EX);
2104
2105 log:
2106         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
2107                "entry for: parent "DFID", child "DFID", name %s, type "
2108                "in name entry %o, type claimed by child %o. repair it "
2109                "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck),
2110                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2111                name, type, update ? lfsck_object_type(child) : 0,
2112                update ? "updating" : "removing", name2, rc);
2113
2114         if (rc != 0) {
2115                 struct lfsck_namespace *ns = com->lc_file_ram;
2116
2117                 ns->ln_flags |= LF_INCONSISTENT;
2118         }
2119
2120         return rc;
2121 }
2122
2123 /**
2124  * Update the ".." name entry for the given object.
2125  *
2126  * The object's ".." is corrupted, this function will update the ".." name
2127  * entry with the given pfid, and the linkEA with the given ldata.
2128  *
2129  * The caller should take the ldlm lock before the calling.
2130  *
2131  * \param[in] env       pointer to the thread context
2132  * \param[in] com       pointer to the lfsck component
2133  * \param[in] obj       pointer to the dt_object to be handled
2134  * \param[in] pfid      the new fid for the object's ".." name entry
2135  * \param[in] cname     the name for the @obj in the parent directory
2136  *
2137  * \retval              positive number for repaired cases
2138  * \retval              0 if nothing to be repaired
2139  * \retval              negative error number on failure
2140  */
2141 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
2142                                                   struct lfsck_component *com,
2143                                                   struct dt_object *obj,
2144                                                   const struct lu_fid *pfid,
2145                                                   struct lu_name *cname)
2146 {
2147         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2148         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
2149         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2150         struct dt_device                *dev    = lfsck->li_bottom;
2151         struct thandle                  *th     = NULL;
2152         struct linkea_data               ldata  = { NULL };
2153         struct lu_buf                    linkea_buf;
2154         int                              rc     = 0;
2155         ENTRY;
2156
2157         LASSERT(!dt_object_remote(obj));
2158         LASSERT(S_ISDIR(lfsck_object_type(obj)));
2159
2160         rc = linkea_data_new(&ldata, &info->lti_big_buf);
2161         if (rc != 0)
2162                 GOTO(log, rc);
2163
2164         rc = linkea_add_buf(&ldata, cname, pfid);
2165         if (rc != 0)
2166                 GOTO(log, rc);
2167
2168         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2169                        ldata.ld_leh->leh_len);
2170
2171         th = dt_trans_create(env, dev);
2172         if (IS_ERR(th))
2173                 GOTO(log, rc = PTR_ERR(th));
2174
2175         rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
2176         if (rc != 0)
2177                 GOTO(stop, rc);
2178
2179         rec->rec_type = S_IFDIR;
2180         rec->rec_fid = pfid;
2181         rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
2182                                (const struct dt_key *)dotdot, th);
2183         if (rc != 0)
2184                 GOTO(stop, rc);
2185
2186         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
2187                                   XATTR_NAME_LINK, 0, th);
2188         if (rc != 0)
2189                 GOTO(stop, rc);
2190
2191         rc = dt_trans_start_local(env, dev, th);
2192         if (rc != 0)
2193                 GOTO(stop, rc);
2194
2195         dt_write_lock(env, obj, 0);
2196         if (unlikely(lfsck_is_dead_obj(obj)))
2197                 GOTO(unlock, rc = 0);
2198
2199         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2200                 GOTO(unlock, rc = 1);
2201
2202         /* The old ".." name entry maybe not exist. */
2203         dt_delete(env, obj, (const struct dt_key *)dotdot, th,
2204                   BYPASS_CAPA);
2205
2206         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
2207                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
2208         if (rc != 0)
2209                 GOTO(unlock, rc);
2210
2211         rc = dt_xattr_set(env, obj, &linkea_buf,
2212                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2213
2214         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2215
2216 unlock:
2217         dt_write_unlock(env, obj);
2218
2219 stop:
2220         dt_trans_stop(env, dev, th);
2221
2222 log:
2223         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
2224                "the object "DFID", new parent "DFID": rc = %d\n",
2225                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
2226                PFID(pfid), rc);
2227
2228         if (rc != 0) {
2229                 struct lfsck_namespace *ns = com->lc_file_ram;
2230
2231                 ns->ln_flags |= LF_INCONSISTENT;
2232         }
2233
2234         return rc;
2235 }
2236
2237 /**
2238  * Handle orphan @obj during Double Scan Directory.
2239  *
2240  * Remove the @obj's current (invalid) linkEA entries, and insert
2241  * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
2242  * ${FID}-${PFID}-D-${conflict_version}
2243  *
2244  * The caller should take the ldlm lock before the calling.
2245  *
2246  * \param[in] env       pointer to the thread context
2247  * \param[in] com       pointer to the lfsck component
2248  * \param[in] obj       pointer to the orphan object to be handled
2249  * \param[in] pfid      the new fid for the object's ".." name entry
2250  * \param[in,out] lh    ldlm lock handler for the given @obj
2251  * \param[out] type     to tell the caller what the inconsistency is
2252  *
2253  * \retval              positive number for repaired cases
2254  * \retval              0 if nothing to be repaired
2255  * \retval              negative error number on failure
2256  */
2257 static int
2258 lfsck_namespace_dsd_orphan(const struct lu_env *env,
2259                            struct lfsck_component *com,
2260                            struct dt_object *obj,
2261                            const struct lu_fid *pfid,
2262                            struct lustre_handle *lh,
2263                            enum lfsck_namespace_inconsistency_type *type)
2264 {
2265         struct lfsck_thread_info *info = lfsck_env_info(env);
2266         struct lfsck_namespace   *ns   = com->lc_file_ram;
2267         int                       rc;
2268         ENTRY;
2269
2270         /* Remove the unrecognized linkEA. */
2271         rc = lfsck_namespace_links_remove(env, com, obj);
2272         lfsck_ibits_unlock(lh, LCK_EX);
2273         if (rc < 0 && rc != -ENODATA)
2274                 RETURN(rc);
2275
2276         *type = LNIT_MUL_REF;
2277
2278         /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2279          * ever tried to verify some remote MDT-object that resides on this
2280          * MDT, but this MDT failed to respond such request. So means there
2281          * may be some remote name entry on other MDT that references this
2282          * object with another name, so we cannot know whether this linkEA
2283          * is valid or not. So keep it there and maybe resolved when next
2284          * LFSCK run. */
2285         if (ns->ln_flags & LF_INCOMPLETE)
2286                 RETURN(0);
2287
2288         /* The unique linkEA is invalid, even if the ".." name entry may be
2289          * valid, we still cannot know via which name entry this directory
2290          * will be referenced. Then handle it as pure orphan. */
2291         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2292                  "-"DFID, PFID(pfid));
2293         rc = lfsck_namespace_insert_orphan(env, com, obj,
2294                                            info->lti_tmpbuf, "D", NULL);
2295
2296         RETURN(rc);
2297 }
2298
2299 /**
2300  * Double Scan Directory object for single linkEA entry case.
2301  *
2302  * The given @child has unique linkEA entry. If the linkEA entry is valid,
2303  * then check whether the name is in the namespace or not, if not, add the
2304  * missing name entry back to namespace. If the linkEA entry is invalid,
2305  * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
2306  * as an orphan.
2307  *
2308  * \param[in] env       pointer to the thread context
2309  * \param[in] com       pointer to the lfsck component
2310  * \param[in] child     pointer to the directory to be double scanned
2311  * \param[in] pfid      the FID corresponding to the ".." entry
2312  * \param[in] ldata     pointer to the linkEA data for the given @child
2313  * \param[in,out] lh    ldlm lock handler for the given @child
2314  * \param[out] type     to tell the caller what the inconsistency is
2315  * \param[in] retry     if found inconsistency, but the caller does not hold
2316  *                      ldlm lock on the @child, then set @retry as true
2317  *
2318  * \retval              positive number for repaired cases
2319  * \retval              0 if nothing to be repaired
2320  * \retval              negative error number on failure
2321  */
2322 static int
2323 lfsck_namespace_dsd_single(const struct lu_env *env,
2324                            struct lfsck_component *com,
2325                            struct dt_object *child,
2326                            const struct lu_fid *pfid,
2327                            struct linkea_data *ldata,
2328                            struct lustre_handle *lh,
2329                            enum lfsck_namespace_inconsistency_type *type,
2330                            bool *retry)
2331 {
2332         struct lfsck_thread_info *info          = lfsck_env_info(env);
2333         struct lu_name           *cname         = &info->lti_name;
2334         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2335         struct lu_fid             tfid;
2336         struct lfsck_namespace   *ns            = com->lc_file_ram;
2337         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2338         struct dt_object         *parent        = NULL;
2339         struct lmv_mds_md_v1     *lmv;
2340         int                       rc            = 0;
2341         ENTRY;
2342
2343         lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, info->lti_key);
2344         /* The unique linkEA entry with bad parent will be handled as orphan. */
2345         if (!fid_is_sane(&tfid)) {
2346                 if (!lustre_handle_is_used(lh) && retry != NULL)
2347                         *retry = true;
2348                 else
2349                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2350                                                         pfid, lh, type);
2351
2352                 GOTO(out, rc);
2353         }
2354
2355         parent = lfsck_object_find_bottom(env, lfsck, &tfid);
2356         if (IS_ERR(parent))
2357                 GOTO(out, rc = PTR_ERR(parent));
2358
2359         /* We trust the unique linkEA entry in spite of whether it matches the
2360          * ".." name entry or not. Because even if the linkEA entry is wrong
2361          * and the ".." name entry is right, we still cannot know via which
2362          * name entry the child will be referenced, since all known entries
2363          * have been verified during the first-stage scanning. */
2364         if (!dt_object_exists(parent)) {
2365                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2366                  * has ever tried to verify some remote MDT-object that resides
2367                  * on this MDT, but this MDT failed to respond such request. So
2368                  * means there may be some remote name entry on other MDT that
2369                  * references this object with another name, so we cannot know
2370                  * whether this linkEA is valid or not. So keep it there and
2371                  * maybe resolved when next LFSCK run. */
2372                 if (ns->ln_flags & LF_INCOMPLETE)
2373                         GOTO(out, rc = 0);
2374
2375                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2376                         *retry = true;
2377
2378                         GOTO(out, rc = 0);
2379                 }
2380
2381                 lfsck_ibits_unlock(lh, LCK_EX);
2382
2383 lost_parent:
2384                 lmv = &info->lti_lmv;
2385                 rc = lfsck_read_stripe_lmv(env, child, lmv);
2386                 if (rc != 0 && rc != -ENODATA)
2387                         GOTO(out, rc);
2388
2389                 if (rc == -ENODATA || lmv->lmv_magic != LMV_MAGIC_STRIPE) {
2390                         lmv = NULL;
2391                 } else if (lfsck_shard_name_to_index(env,
2392                                         cname->ln_name, cname->ln_namelen,
2393                                         S_IFDIR, cfid) < 0) {
2394                         /* It is an invalid name entry, we
2395                          * cannot trust the parent also. */
2396                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2397                                                 ldata, cname, &tfid, true);
2398                         if (rc < 0)
2399                                 GOTO(out, rc);
2400
2401                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2402                                  "-"DFID, PFID(pfid));
2403                         rc = lfsck_namespace_insert_orphan(env, com, child,
2404                                                 info->lti_tmpbuf, "S", NULL);
2405
2406                         GOTO(out, rc);
2407                 }
2408
2409                 /* Create the lost parent as an orphan. */
2410                 rc = lfsck_namespace_create_orphan_dir(env, com, parent, lmv);
2411                 if (rc >= 0) {
2412                         /* Add the missing name entry to the parent. */
2413                         rc = lfsck_namespace_insert_normal(env, com, parent,
2414                                                         child, cname->ln_name);
2415                         if (unlikely(rc == -EEXIST)) {
2416                                 /* Unfortunately, someone reused the name
2417                                  * under the parent by race. So we have
2418                                  * to remove the linkEA entry from
2419                                  * current child object. It means that the
2420                                  * LFSCK cannot recover the system
2421                                  * totally back to its original status,
2422                                  * but it is necessary to make the
2423                                  * current system to be consistent. */
2424                                 rc = lfsck_namespace_shrink_linkea(env,
2425                                                 com, child, ldata,
2426                                                 cname, &tfid, true);
2427                                 if (rc >= 0) {
2428                                         snprintf(info->lti_tmpbuf,
2429                                                  sizeof(info->lti_tmpbuf),
2430                                                  "-"DFID, PFID(pfid));
2431                                         rc = lfsck_namespace_insert_orphan(env,
2432                                                 com, child, info->lti_tmpbuf,
2433                                                 "D", NULL);
2434                                 }
2435                         }
2436                 }
2437
2438                 GOTO(out, rc);
2439         }
2440
2441         /* The unique linkEA entry with bad parent will be handled as orphan. */
2442         if (unlikely(!dt_try_as_dir(env, parent))) {
2443                 if (!lustre_handle_is_used(lh) && retry != NULL)
2444                         *retry = true;
2445                 else
2446                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2447                                                         pfid, lh, type);
2448
2449                 GOTO(out, rc);
2450         }
2451
2452         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2453                        (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
2454         if (rc == -ENOENT) {
2455                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2456                  * has ever tried to verify some remote MDT-object that resides
2457                  * on this MDT, but this MDT failed to respond such request. So
2458                  * means there may be some remote name entry on other MDT that
2459                  * references this object with another name, so we cannot know
2460                  * whether this linkEA is valid or not. So keep it there and
2461                  * maybe resolved when next LFSCK run. */
2462                 if (ns->ln_flags & LF_INCOMPLETE)
2463                         GOTO(out, rc = 0);
2464
2465                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2466                         *retry = true;
2467
2468                         GOTO(out, rc = 0);
2469                 }
2470
2471                 lfsck_ibits_unlock(lh, LCK_EX);
2472                 rc = lfsck_namespace_check_name(env, parent, child, cname);
2473                 if (rc == -ENOENT)
2474                         goto lost_parent;
2475
2476                 if (rc < 0)
2477                         GOTO(out, rc);
2478
2479                 /* It is an invalid name entry, drop it. */
2480                 if (unlikely(rc > 0)) {
2481                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2482                                                 ldata, cname, &tfid, true);
2483                         if (rc >= 0) {
2484                                 snprintf(info->lti_tmpbuf,
2485                                          sizeof(info->lti_tmpbuf),
2486                                          "-"DFID, PFID(pfid));
2487                                 rc = lfsck_namespace_insert_orphan(env, com,
2488                                         child, info->lti_tmpbuf, "D", NULL);
2489                         }
2490
2491                         GOTO(out, rc);
2492                 }
2493
2494                 /* Add the missing name entry back to the namespace. */
2495                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2496                                                    cname->ln_name);
2497                 if (unlikely(rc == -ESTALE))
2498                         /* It may happen when the remote object has been
2499                          * removed, but the local MDT is not aware of that. */
2500                         goto lost_parent;
2501
2502                 if (unlikely(rc == -EEXIST)) {
2503                         /* Unfortunately, someone reused the name under the
2504                          * parent by race. So we have to remove the linkEA
2505                          * entry from current child object. It means that the
2506                          * LFSCK cannot recover the system totally back to
2507                          * its original status, but it is necessary to make
2508                          * the current system to be consistent.
2509                          *
2510                          * It also may be because of the LFSCK found some
2511                          * internal status of create operation. Under such
2512                          * case, nothing to be done. */
2513                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2514                                         parent, child, ldata, cname, &tfid);
2515                         if (rc >= 0) {
2516                                 snprintf(info->lti_tmpbuf,
2517                                          sizeof(info->lti_tmpbuf),
2518                                          "-"DFID, PFID(pfid));
2519                                 rc = lfsck_namespace_insert_orphan(env, com,
2520                                         child, info->lti_tmpbuf, "D", NULL);
2521                         }
2522                 }
2523
2524                 GOTO(out, rc);
2525         }
2526
2527         if (rc != 0)
2528                 GOTO(out, rc);
2529
2530         if (!lu_fid_eq(&tfid, cfid)) {
2531                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2532                         *retry = true;
2533
2534                         GOTO(out, rc = 0);
2535                 }
2536
2537                 lfsck_ibits_unlock(lh, LCK_EX);
2538                 /* The name entry references another MDT-object that
2539                  * may be created by the LFSCK for repairing dangling
2540                  * name entry. Try to replace it. */
2541                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2542                                                   &tfid, cname);
2543                 if (rc == 0)
2544                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2545                                                         pfid, lh, type);
2546
2547                 GOTO(out, rc);
2548         }
2549
2550         if (fid_is_zero(pfid))
2551                 GOTO(out, rc = 0);
2552
2553         /* The ".." name entry is wrong, update it. */
2554         if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
2555                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2556                         *retry = true;
2557
2558                         GOTO(out, rc = 0);
2559                 }
2560
2561                 *type = LNIT_UNMATCHED_PAIRS;
2562                 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
2563                                                 lfsck_dto2fid(parent), cname);
2564         }
2565
2566         GOTO(out, rc);
2567
2568 out:
2569         if (parent != NULL && !IS_ERR(parent))
2570                 lfsck_object_put(env, parent);
2571
2572         return rc;
2573 }
2574
2575 /**
2576  * Double Scan Directory object for multiple linkEA entries case.
2577  *
2578  * The given @child has multiple linkEA entries. There is at most one linkEA
2579  * entry will be valid, all the others will be removed. Firstly, the function
2580  * will try to find out the linkEA entry for which the name entry exists under
2581  * the given parent (@pfid). If there is no linkEA entry that matches the given
2582  * ".." name entry, then tries to find out the first linkEA entry that both the
2583  * parent and the name entry exist to rebuild a new ".." name entry.
2584  *
2585  * \param[in] env       pointer to the thread context
2586  * \param[in] com       pointer to the lfsck component
2587  * \param[in] child     pointer to the directory to be double scanned
2588  * \param[in] pfid      the FID corresponding to the ".." entry
2589  * \param[in] ldata     pointer to the linkEA data for the given @child
2590  * \param[in,out] lh    ldlm lock handler for the given @child
2591  * \param[out] type     to tell the caller what the inconsistency is
2592  * \param[in] lpf       true if the ".." entry is under lost+found/MDTxxxx/
2593  *
2594  * \retval              positive number for repaired cases
2595  * \retval              0 if nothing to be repaired
2596  * \retval              negative error number on failure
2597  */
2598 static int
2599 lfsck_namespace_dsd_multiple(const struct lu_env *env,
2600                              struct lfsck_component *com,
2601                              struct dt_object *child,
2602                              const struct lu_fid *pfid,
2603                              struct linkea_data *ldata,
2604                              struct lustre_handle *lh,
2605                              enum lfsck_namespace_inconsistency_type *type,
2606                              bool lpf)
2607 {
2608         struct lfsck_thread_info *info          = lfsck_env_info(env);
2609         struct lu_name           *cname         = &info->lti_name;
2610         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2611         struct lu_fid            *pfid2         = &info->lti_fid3;
2612         struct lu_fid             tfid;
2613         struct lfsck_namespace   *ns            = com->lc_file_ram;
2614         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2615         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2616         struct dt_object         *parent        = NULL;
2617         struct linkea_data        ldata_new     = { NULL };
2618         int                       dirent_count  = 0;
2619         int                       linkea_count  = 0;
2620         int                       rc            = 0;
2621         bool                      once          = true;
2622         ENTRY;
2623
2624 again:
2625         while (ldata->ld_lee != NULL) {
2626                 lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid,
2627                                                     info->lti_key);
2628                 /* Drop repeated linkEA entries. */
2629                 lfsck_namespace_filter_linkea_entry(ldata, cname, &tfid, true);
2630                 /* Drop invalid linkEA entry. */
2631                 if (!fid_is_sane(&tfid)) {
2632                         linkea_del_buf(ldata, cname);
2633                         linkea_count++;
2634                         continue;
2635                 }
2636
2637                 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
2638                  * then it is possible that: the directry object has ever
2639                  * been lost, but its name entry was there. In the former
2640                  * LFSCK run, during the first-stage scanning, the LFSCK
2641                  * found the dangling name entry, but it did not recreate
2642                  * the lost object, and when moved to the second-stage
2643                  * scanning, some children objects of the lost directory
2644                  * object were found, then the LFSCK recreated such lost
2645                  * directory object as an orphan.
2646                  *
2647                  * When the LFSCK runs again, if the dangling name is still
2648                  * there, the LFSCK should move the orphan directory object
2649                  * back to the normal namespace. */
2650                 if (!lpf && !lu_fid_eq(pfid, &tfid) && once) {
2651                         linkea_next_entry(ldata);
2652                         continue;
2653                 }
2654
2655                 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
2656                 if (IS_ERR(parent))
2657                         RETURN(PTR_ERR(parent));
2658
2659                 if (!dt_object_exists(parent)) {
2660                         lfsck_object_put(env, parent);
2661                         if (ldata->ld_leh->leh_reccount > 1) {
2662                                 /* If it is NOT the last linkEA entry, then
2663                                  * there is still other chance to make the
2664                                  * child to be visible via other parent, then
2665                                  * remove this linkEA entry. */
2666                                 linkea_del_buf(ldata, cname);
2667                                 linkea_count++;
2668                                 continue;
2669                         }
2670
2671                         break;
2672                 }
2673
2674                 /* The linkEA entry with bad parent will be removed. */
2675                 if (unlikely(!dt_try_as_dir(env, parent))) {
2676                         lfsck_object_put(env, parent);
2677                         linkea_del_buf(ldata, cname);
2678                         linkea_count++;
2679                         continue;
2680                 }
2681
2682                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2683                                (const struct dt_key *)cname->ln_name,
2684                                BYPASS_CAPA);
2685                 *pfid2 = *lfsck_dto2fid(parent);
2686                 if (rc == -ENOENT) {
2687                         lfsck_object_put(env, parent);
2688                         linkea_next_entry(ldata);
2689                         continue;
2690                 }
2691
2692                 if (rc != 0) {
2693                         lfsck_object_put(env, parent);
2694
2695                         RETURN(rc);
2696                 }
2697
2698                 if (lu_fid_eq(&tfid, cfid)) {
2699                         lfsck_object_put(env, parent);
2700                         if (!lu_fid_eq(pfid, pfid2)) {
2701                                 *type = LNIT_UNMATCHED_PAIRS;
2702                                 rc = lfsck_namespace_repair_unmatched_pairs(env,
2703                                                 com, child, pfid2, cname);
2704
2705                                 RETURN(rc);
2706                         }
2707
2708 rebuild:
2709                         /* It is the most common case that we find the
2710                          * name entry corresponding to the linkEA entry
2711                          * that matches the ".." name entry. */
2712                         rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
2713                         if (rc != 0)
2714                                 RETURN(rc);
2715
2716                         rc = linkea_add_buf(&ldata_new, cname, pfid2);
2717                         if (rc != 0)
2718                                 RETURN(rc);
2719
2720                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2721                                                             &ldata_new);
2722                         if (rc < 0)
2723                                 RETURN(rc);
2724
2725                         linkea_del_buf(ldata, cname);
2726                         linkea_count++;
2727                         linkea_first_entry(ldata);
2728                         /* There may be some invalid dangling name entries under
2729                          * other parent directories, remove all of them. */
2730                         while (ldata->ld_lee != NULL) {
2731                                 lfsck_namespace_unpack_linkea_entry(ldata,
2732                                                 cname, &tfid, info->lti_key);
2733                                 if (!fid_is_sane(&tfid))
2734                                         goto next;
2735
2736                                 parent = lfsck_object_find_bottom(env, lfsck,
2737                                                                   &tfid);
2738                                 if (IS_ERR(parent)) {
2739                                         rc = PTR_ERR(parent);
2740                                         if (rc != -ENOENT &&
2741                                             bk->lb_param & LPF_FAILOUT)
2742                                                 RETURN(rc);
2743
2744                                         goto next;
2745                                 }
2746
2747                                 if (!dt_object_exists(parent)) {
2748                                         lfsck_object_put(env, parent);
2749                                         goto next;
2750                                 }
2751
2752                                 rc = lfsck_namespace_repair_dirent(env, com,
2753                                         parent, child, cname->ln_name,
2754                                         cname->ln_name, S_IFDIR, false, true);
2755                                 lfsck_object_put(env, parent);
2756                                 if (rc < 0) {
2757                                         if (bk->lb_param & LPF_FAILOUT)
2758                                                 RETURN(rc);
2759
2760                                         goto next;
2761                                 }
2762
2763                                 dirent_count += rc;
2764
2765 next:
2766                                 linkea_del_buf(ldata, cname);
2767                         }
2768
2769                         ns->ln_dirent_repaired += dirent_count;
2770
2771                         RETURN(rc);
2772                 }
2773
2774                 lfsck_ibits_unlock(lh, LCK_EX);
2775                 /* The name entry references another MDT-object that may be
2776                  * created by the LFSCK for repairing dangling name entry.
2777                  * Try to replace it. */
2778                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2779                                                   &tfid, cname);
2780                 lfsck_object_put(env, parent);
2781                 if (rc < 0)
2782                         RETURN(rc);
2783
2784                 if (rc > 0)
2785                         goto rebuild;
2786
2787                 linkea_del_buf(ldata, cname);
2788         }
2789
2790         linkea_first_entry(ldata);
2791         if (ldata->ld_leh->leh_reccount == 1) {
2792                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
2793                                                 lh, type, NULL);
2794
2795                 if (rc == 0 && fid_is_zero(pfid) && linkea_count > 0)
2796                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2797                                                             ldata);
2798
2799                 RETURN(rc);
2800         }
2801
2802         /* All linkEA entries are invalid and removed, then handle the @child
2803          * as an orphan.*/
2804         if (ldata->ld_leh->leh_reccount == 0) {
2805                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
2806                                                 type);
2807
2808                 RETURN(rc);
2809         }
2810
2811         /* If the dangling name entry for the orphan directory object has
2812          * been remvoed, then just check whether the directory object is
2813          * still under the .lustre/lost+found/MDTxxxx/ or not. */
2814         if (lpf) {
2815                 lpf = false;
2816                 goto again;
2817         }
2818
2819         /* There is no linkEA entry that matches the ".." name entry. Find
2820          * the first linkEA entry that both parent and name entry exist to
2821          * rebuild a new ".." name entry. */
2822         if (once) {
2823                 once = false;
2824                 goto again;
2825         }
2826
2827         RETURN(rc);
2828 }
2829
2830 /**
2831  * Repair the object's nlink attribute.
2832  *
2833  * If all the known name entries have been verified, then the object's hard
2834  * link attribute should match the object's linkEA entries count unless the
2835  * object's has too much hard link to be recorded in the linkEA. Such cases
2836  * should have been marked in the LFSCK trace file. Otherwise, trust the
2837  * linkEA to update the object's nlink attribute.
2838  *
2839  * \param[in] env       pointer to the thread context
2840  * \param[in] com       pointer to the lfsck component
2841  * \param[in] obj       pointer to the dt_object to be handled
2842  * \param[in,out] la    pointer to buffer to object's attribute before
2843  *                      and after the repairing
2844  *
2845  * \retval              positive number for repaired cases
2846  * \retval              0 if nothing to be repaired
2847  * \retval              negative error number on failure
2848  */
2849 static int lfsck_namespace_repair_nlink(const struct lu_env *env,
2850                                         struct lfsck_component *com,
2851                                         struct dt_object *obj,
2852                                         struct lu_attr *la)
2853 {
2854         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2855         struct lu_fid                   *tfid   = &info->lti_fid3;
2856         struct lfsck_namespace          *ns     = com->lc_file_ram;
2857         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2858         struct dt_device                *dev    = lfsck->li_bottom;
2859         const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
2860         struct dt_object                *child  = NULL;
2861         struct thandle                  *th     = NULL;
2862         struct linkea_data               ldata  = { NULL };
2863         struct lustre_handle             lh     = { 0 };
2864         __u32                            old    = la->la_nlink;
2865         int                              idx;
2866         int                              rc     = 0;
2867         __u8                             flags;
2868         ENTRY;
2869
2870         LASSERT(!dt_object_remote(obj));
2871         LASSERT(S_ISREG(lfsck_object_type(obj)));
2872
2873         child = lfsck_object_find_by_dev(env, dev, cfid);
2874         if (IS_ERR(child))
2875                 GOTO(log, rc = PTR_ERR(child));
2876
2877         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
2878                               MDS_INODELOCK_UPDATE |
2879                               MDS_INODELOCK_XATTR, LCK_EX);
2880         if (rc != 0)
2881                 GOTO(log, rc);
2882
2883         th = dt_trans_create(env, dev);
2884         if (IS_ERR(th))
2885                 GOTO(log, rc = PTR_ERR(th));
2886
2887         la->la_valid = LA_NLINK;
2888         rc = dt_declare_attr_set(env, child, la, th);
2889         if (rc != 0)
2890                 GOTO(stop, rc);
2891
2892         rc = dt_trans_start_local(env, dev, th);
2893         if (rc != 0)
2894                 GOTO(stop, rc);
2895
2896         dt_write_lock(env, child, 0);
2897         /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2898          * ever tried to verify some remote MDT-object that resides on this
2899          * MDT, but this MDT failed to respond such request. So means there
2900          * may be some remote name entry on other MDT that references this
2901          * object with another name, so we cannot know whether this linkEA
2902          * is valid or not. So keep it there and maybe resolved when next
2903          * LFSCK run. */
2904         if (ns->ln_flags & LF_INCOMPLETE)
2905                 GOTO(unlock, rc = 0);
2906
2907         fid_cpu_to_be(tfid, cfid);
2908         idx = lfsck_sub_trace_file_fid2idx(cfid);
2909         rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj,
2910                        (struct dt_rec *)&flags, (const struct dt_key *)tfid,
2911                        BYPASS_CAPA);
2912         if (rc != 0)
2913                 GOTO(unlock, rc);
2914
2915         if (flags & LNTF_SKIP_NLINK)
2916                 GOTO(unlock, rc = 0);
2917
2918         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
2919         if (rc != 0)
2920                 GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc));
2921
2922         rc = lfsck_links_read2(env, child, &ldata);
2923         if (rc != 0)
2924                 GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc));
2925
2926         if (la->la_nlink == ldata.ld_leh->leh_reccount ||
2927             unlikely(la->la_nlink == 0))
2928                 GOTO(unlock, rc = 0);
2929
2930         la->la_nlink = ldata.ld_leh->leh_reccount;
2931         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2932                 GOTO(unlock, rc = 1);
2933
2934         rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
2935
2936         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2937
2938 unlock:
2939         dt_write_unlock(env, child);
2940
2941 stop:
2942         dt_trans_stop(env, dev, th);
2943
2944 log:
2945         lfsck_ibits_unlock(&lh, LCK_EX);
2946         if (child != NULL && !IS_ERR(child))
2947                 lfsck_object_put(env, child);
2948
2949         CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
2950                "nlink count from %u to %u: rc = %d\n",
2951                lfsck_lfsck2name(lfsck), PFID(cfid), old, la->la_nlink, rc);
2952
2953         if (rc != 0)
2954                 ns->ln_flags |= LF_INCONSISTENT;
2955
2956         return rc;
2957 }
2958
2959 /**
2960  * Double scan the directory object for namespace LFSCK.
2961  *
2962  * This function will verify the <parent, child> pairs in the namespace tree:
2963  * the parent references the child via some name entry that should be in the
2964  * child's linkEA entry, the child should back references the parent via its
2965  * ".." name entry.
2966  *
2967  * The LFSCK will scan every linkEA entry in turn until find out the first
2968  * matched pairs. If found, then all other linkEA entries will be dropped.
2969  * If all the linkEA entries cannot match the ".." name entry, then there
2970  * are serveral possible cases:
2971  *
2972  * 1) If there is only one linkEA entry, then trust it as long as the PFID
2973  *    in the linkEA entry is valid.
2974  *
2975  * 2) If there are multiple linkEA entries, then try to find the linkEA
2976  *    that matches the ".." name entry. If found, then all other entries
2977  *    are invalid; otherwise, it is quite possible that the ".." name entry
2978  *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
2979  *    entry according to the first valid linkEA entry (both the parent and
2980  *    the name entry should exist).
2981  *
2982  * 3) If the directory object has no (valid) linkEA entry, then the
2983  *    directory object will be handled as pure orphan and inserted
2984  *    in the .lustre/lost+found/MDTxxxx/ with the name:
2985  *    ${self_FID}-${PFID}-D-${conflict_version}
2986  *
2987  * \param[in] env       pointer to the thread context
2988  * \param[in] com       pointer to the lfsck component
2989  * \param[in] child     pointer to the directory object to be handled
2990  * \param[in] flags     to indicate the specical checking on the @child
2991  *
2992  * \retval              positive number for repaired cases
2993  * \retval              0 if nothing to be repaired
2994  * \retval              negative error number on failure
2995  */
2996 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
2997                                            struct lfsck_component *com,
2998                                            struct dt_object *child, __u8 flags)
2999 {
3000         struct lfsck_thread_info *info          = lfsck_env_info(env);
3001         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
3002         struct lu_fid            *pfid          = &info->lti_fid2;
3003         struct lfsck_namespace   *ns            = com->lc_file_ram;
3004         struct lfsck_instance    *lfsck         = com->lc_lfsck;
3005         struct lustre_handle      lh            = { 0 };
3006         struct linkea_data        ldata         = { NULL };
3007         bool                      unknown       = false;
3008         bool                      lpf           = false;
3009         bool                      retry         = false;
3010         enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
3011         int                       rc            = 0;
3012         ENTRY;
3013
3014         LASSERT(!dt_object_remote(child));
3015
3016         if (flags & LNTF_UNCERTAIN_LMV) {
3017                 if (flags & LNTF_RECHECK_NAME_HASH) {
3018                         rc = lfsck_namespace_scan_shard(env, com, child);
3019                         if (rc < 0)
3020                                 RETURN(rc);
3021
3022                         ns->ln_striped_shards_scanned++;
3023                 } else {
3024                         ns->ln_striped_shards_skipped++;
3025                 }
3026         }
3027
3028         flags &= ~(LNTF_RECHECK_NAME_HASH | LNTF_UNCERTAIN_LMV);
3029         if (flags == 0)
3030                 RETURN(0);
3031
3032         if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) &&
3033             !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
3034                 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
3035                        "the namespace LFSCK, then the LFSCK cannot guarantee"
3036                        "all the name entries have been verified in first-stage"
3037                        "scanning. So have to skip orphan related handling for"
3038                        "the directory object "DFID" with remote name entry\n",
3039                        lfsck_lfsck2name(lfsck), PFID(cfid));
3040
3041                 RETURN(0);
3042         }
3043
3044         if (unlikely(!dt_try_as_dir(env, child)))
3045                 GOTO(out, rc = -ENOTDIR);
3046
3047         /* We only take ldlm lock on the @child when required. When the
3048          * logic comes here for the first time, it is always false. */
3049         if (0) {
3050
3051 lock:
3052                 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
3053                                       MDS_INODELOCK_UPDATE |
3054                                       MDS_INODELOCK_XATTR, LCK_EX);
3055                 if (rc != 0)
3056                         GOTO(out, rc);
3057         }
3058
3059         dt_read_lock(env, child, 0);
3060         if (unlikely(lfsck_is_dead_obj(child))) {
3061                 dt_read_unlock(env, child);
3062
3063                 GOTO(out, rc = 0);
3064         }
3065
3066         rc = dt_lookup(env, child, (struct dt_rec *)pfid,
3067                        (const struct dt_key *)dotdot, BYPASS_CAPA);
3068         if (rc != 0) {
3069                 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
3070                         dt_read_unlock(env, child);
3071
3072                         GOTO(out, rc);
3073                 }
3074
3075                 if (!lustre_handle_is_used(&lh)) {
3076                         dt_read_unlock(env, child);
3077                         goto lock;
3078                 }
3079
3080                 fid_zero(pfid);
3081         } else if (lfsck->li_lpf_obj != NULL &&
3082                    lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
3083                 lpf = true;
3084         } else if (unlikely(!fid_is_sane(pfid))) {
3085                 fid_zero(pfid);
3086         }
3087
3088         rc = lfsck_links_read(env, child, &ldata);
3089         dt_read_unlock(env, child);
3090         if (rc != 0) {
3091                 if (rc != -ENODATA && rc != -EINVAL)
3092                         GOTO(out, rc);
3093
3094                 if (!lustre_handle_is_used(&lh))
3095                         goto lock;
3096
3097                 if (rc == -EINVAL && !fid_is_zero(pfid)) {
3098                         /* Remove the corrupted linkEA. */
3099                         rc = lfsck_namespace_links_remove(env, com, child);
3100                         if (rc == 0)
3101                                 /* Here, because of the crashed linkEA, we
3102                                  * cannot know whether there is some parent
3103                                  * that references the child directory via
3104                                  * some name entry or not. So keep it there,
3105                                  * when the LFSCK run next time, if there is
3106                                  * some parent that references this object,
3107                                  * then the LFSCK can rebuild the linkEA;
3108                                  * otherwise, this object will be handled
3109                                  * as orphan as above. */
3110                                 unknown = true;
3111                 } else {
3112                         /* 1. If we have neither ".." nor linkEA,
3113                          *    then it is an orphan.
3114                          *
3115                          * 2. If we only have the ".." name entry,
3116                          *    but no parent references this child
3117                          *    directory, then handle it as orphan. */
3118                         lfsck_ibits_unlock(&lh, LCK_EX);
3119                         type = LNIT_MUL_REF;
3120
3121                         /* If the LFSCK is marked as LF_INCOMPLETE,
3122                          * then means some MDT has ever tried to
3123                          * verify some remote MDT-object that resides
3124                          * on this MDT, but this MDT failed to respond
3125                          * such request. So means there may be some
3126                          * remote name entry on other MDT that
3127                          * references this object with another name,
3128                          * so we cannot know whether this linkEA is
3129                          * valid or not. So keep it there and maybe
3130                          * resolved when next LFSCK run. */
3131                         if (ns->ln_flags & LF_INCOMPLETE)
3132                                 GOTO(out, rc = 0);
3133
3134                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
3135                                  "-"DFID, PFID(pfid));
3136                         rc = lfsck_namespace_insert_orphan(env, com, child,
3137                                                 info->lti_tmpbuf, "D", NULL);
3138                 }
3139
3140                 GOTO(out, rc);
3141         }
3142
3143         linkea_first_entry(&ldata);
3144         /* This is the most common case: the object has unique linkEA entry. */
3145         if (ldata.ld_leh->leh_reccount == 1) {
3146                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
3147                                                 &lh, &type, &retry);
3148                 if (retry) {
3149                         LASSERT(!lustre_handle_is_used(&lh));
3150
3151                         retry = false;
3152                         goto lock;
3153                 }
3154
3155                 GOTO(out, rc);
3156         }
3157
3158         if (!lustre_handle_is_used(&lh))
3159                 goto lock;
3160
3161         if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
3162                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
3163                                                 &type);
3164
3165                 GOTO(out, rc);
3166         }
3167
3168         /* When we come here, the cases usually like that:
3169          * 1) The directory object has a corrupted linkEA entry. During the
3170          *    first-stage scanning, the LFSCK cannot know such corruption,
3171          *    then it appends the right linkEA entry according to the found
3172          *    name entry after the bad one.
3173          *
3174          * 2) The directory object has a right linkEA entry. During the
3175          *    first-stage scanning, the LFSCK finds some bad name entry,
3176          *    but the LFSCK cannot aware that at that time, then it adds
3177          *    the bad linkEA entry for further processing. */
3178         rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
3179                                           &lh, &type, lpf);
3180
3181         GOTO(out, rc);
3182
3183 out:
3184         lfsck_ibits_unlock(&lh, LCK_EX);
3185         if (rc > 0) {
3186                 switch (type) {
3187                 case LNIT_BAD_LINKEA:
3188                         ns->ln_linkea_repaired++;
3189                         break;
3190                 case LNIT_UNMATCHED_PAIRS:
3191                         ns->ln_unmatched_pairs_repaired++;
3192                         break;
3193                 case LNIT_MUL_REF:
3194                         ns->ln_mul_ref_repaired++;
3195                         break;
3196                 default:
3197                         break;
3198                 }
3199         }
3200
3201         if (unknown)
3202                 ns->ln_unknown_inconsistency++;
3203
3204         return rc;
3205 }
3206
3207 /**
3208  * Double scan the MDT-object for namespace LFSCK.
3209  *
3210  * If the MDT-object contains invalid or repeated linkEA entries, then drop
3211  * those entries from the linkEA; if the linkEA becomes empty or the object
3212  * has no linkEA, then it is an orphan and will be added into the directory
3213  * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
3214  * the remote parent; if the name entry corresponding to some linkEA entry
3215  * is lost, then add the name entry back to the namespace.
3216  *
3217  * \param[in] env       pointer to the thread context
3218  * \param[in] com       pointer to the lfsck component
3219  * \param[in] child     pointer to the dt_object to be handled
3220  * \param[in] flags     some hints to indicate how the @child should be handled
3221  *
3222  * \retval              positive number for repaired cases
3223  * \retval              0 if nothing to be repaired
3224  * \retval              negative error number on failure
3225  */
3226 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
3227                                            struct lfsck_component *com,
3228                                            struct dt_object *child, __u8 flags)
3229 {
3230         struct lfsck_thread_info *info     = lfsck_env_info(env);
3231         struct lu_attr           *la       = &info->lti_la;
3232         struct lu_name           *cname    = &info->lti_name;
3233         struct lu_fid            *pfid     = &info->lti_fid;
3234         struct lu_fid            *cfid     = &info->lti_fid2;
3235         struct lfsck_instance    *lfsck    = com->lc_lfsck;
3236         struct lfsck_namespace   *ns       = com->lc_file_ram;
3237         struct dt_object         *parent   = NULL;
3238         struct linkea_data        ldata    = { NULL };
3239         bool                      repaired = false;
3240         int                       count    = 0;
3241         int                       rc;
3242         ENTRY;
3243
3244         dt_read_lock(env, child, 0);
3245         if (unlikely(lfsck_is_dead_obj(child))) {
3246                 dt_read_unlock(env, child);
3247
3248                 RETURN(0);
3249         }
3250
3251         if (S_ISDIR(lfsck_object_type(child))) {
3252                 dt_read_unlock(env, child);
3253                 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
3254
3255                 RETURN(rc);
3256         }
3257
3258         rc = lfsck_links_read(env, child, &ldata);
3259         dt_read_unlock(env, child);
3260         if (rc != 0)
3261                 GOTO(out, rc);
3262
3263         linkea_first_entry(&ldata);
3264         while (ldata.ld_lee != NULL) {
3265                 lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
3266                                                     info->lti_key);
3267                 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
3268                                                          false);
3269                 /* Found repeated linkEA entries */
3270                 if (rc > 0) {
3271                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3272                                                 &ldata, cname, pfid, false);
3273                         if (rc < 0)
3274                                 GOTO(out, rc);
3275
3276                         if (rc == 0)
3277                                 continue;
3278
3279                         repaired = true;
3280
3281                         /* fall through */
3282                 }
3283
3284                 /* Invalid PFID in the linkEA entry. */
3285                 if (!fid_is_sane(pfid)) {
3286                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3287                                                 &ldata, cname, pfid, true);
3288                         if (rc < 0)
3289                                 GOTO(out, rc);
3290
3291                         if (rc > 0)
3292                                 repaired = true;
3293
3294                         continue;
3295                 }
3296
3297                 parent = lfsck_object_find_bottom(env, lfsck, pfid);
3298                 if (IS_ERR(parent))
3299                         GOTO(out, rc = PTR_ERR(parent));
3300
3301                 if (!dt_object_exists(parent)) {
3302
3303 lost_parent:
3304                         if (ldata.ld_leh->leh_reccount > 1) {
3305                                 /* If it is NOT the last linkEA entry, then
3306                                  * there is still other chance to make the
3307                                  * child to be visible via other parent, then
3308                                  * remove this linkEA entry. */
3309                                 rc = lfsck_namespace_shrink_linkea(env, com,
3310                                         child, &ldata, cname, pfid, true);
3311                         } else {
3312                                 /* If the LFSCK is marked as LF_INCOMPLETE,
3313                                  * then means some MDT has ever tried to
3314                                  * verify some remote MDT-object that resides
3315                                  * on this MDT, but this MDT failed to respond
3316                                  * such request. So means there may be some
3317                                  * remote name entry on other MDT that
3318                                  * references this object with another name,
3319                                  * so we cannot know whether this linkEA is
3320                                  * valid or not. So keep it there and maybe
3321                                  * resolved when next LFSCK run. */
3322                                 if (ns->ln_flags & LF_INCOMPLETE) {
3323                                         lfsck_object_put(env, parent);
3324
3325                                         GOTO(out, rc = 0);
3326                                 }
3327
3328                                 /* Create the lost parent as an orphan. */
3329                                 rc = lfsck_namespace_create_orphan_dir(env, com,
3330                                                                 parent, NULL);
3331                                 if (rc < 0) {
3332                                         lfsck_object_put(env, parent);
3333
3334                                         GOTO(out, rc);
3335                                 }
3336
3337                                 if (rc > 0)
3338                                         repaired = true;
3339
3340                                 /* Add the missing name entry to the parent. */
3341                                 rc = lfsck_namespace_insert_normal(env, com,
3342                                                 parent, child, cname->ln_name);
3343                                 if (unlikely(rc == -EEXIST))
3344                                         /* Unfortunately, someone reused the
3345                                          * name under the parent by race. So we
3346                                          * have to remove the linkEA entry from
3347                                          * current child object. It means that
3348                                          * the LFSCK cannot recover the system
3349                                          * totally back to its original status,
3350                                          * but it is necessary to make the
3351                                          * current system to be consistent. */
3352                                         rc = lfsck_namespace_shrink_linkea(env,
3353                                                         com, child, &ldata,
3354                                                         cname, pfid, true);
3355                                 else
3356                                         linkea_next_entry(&ldata);
3357                         }
3358
3359                         lfsck_object_put(env, parent);
3360                         if (rc < 0)
3361                                 GOTO(out, rc);
3362
3363                         if (rc > 0)
3364                                 repaired = true;
3365
3366                         continue;
3367                 }
3368
3369                 /* The linkEA entry with bad parent will be removed. */
3370                 if (unlikely(!dt_try_as_dir(env, parent))) {
3371                         lfsck_object_put(env, parent);
3372                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3373                                                 &ldata, cname, pfid, true);
3374                         if (rc < 0)
3375                                 GOTO(out, rc);
3376
3377                         if (rc > 0)
3378                                 repaired = true;
3379
3380                         continue;
3381                 }
3382
3383                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
3384                                (const struct dt_key *)cname->ln_name,
3385                                BYPASS_CAPA);
3386                 if (rc != 0 && rc != -ENOENT) {
3387                         lfsck_object_put(env, parent);
3388
3389                         GOTO(out, rc);
3390                 }
3391
3392                 if (rc == 0) {
3393                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
3394                                 /* It is the most common case that we
3395                                  * find the name entry corresponding
3396                                  * to the linkEA entry. */
3397                                 lfsck_object_put(env, parent);
3398                                 linkea_next_entry(&ldata);
3399                         } else {
3400                                 /* The name entry references another
3401                                  * MDT-object that may be created by
3402                                  * the LFSCK for repairing dangling
3403                                  * name entry. Try to replace it. */
3404                                 rc = lfsck_namespace_replace_cond(env, com,
3405                                                 parent, child, cfid, cname);
3406                                 lfsck_object_put(env, parent);
3407                                 if (rc < 0)
3408                                         GOTO(out, rc);
3409
3410                                 if (rc > 0) {
3411                                         repaired = true;
3412                                         linkea_next_entry(&ldata);
3413                                 } else {
3414                                         rc = lfsck_namespace_shrink_linkea(env,
3415                                                         com, child, &ldata,
3416                                                         cname, pfid, true);
3417                                         if (rc < 0)
3418                                                 GOTO(out, rc);
3419
3420                                         if (rc > 0)
3421                                                 repaired = true;
3422                                 }
3423                         }
3424
3425                         continue;
3426                 }
3427
3428                 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
3429                 if (rc != 0)
3430                         GOTO(out, rc);
3431
3432                 /* If there is no name entry in the parent dir and the object
3433                  * link count is less than the linkea entries count, then the
3434                  * linkea entry should be removed. */
3435                 if (ldata.ld_leh->leh_reccount > la->la_nlink) {
3436                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
3437                                         parent, child, &ldata, cname, pfid);
3438                         lfsck_object_put(env, parent);
3439                         if (rc < 0)
3440                                 GOTO(out, rc);
3441
3442                         if (rc > 0)
3443                                 repaired = true;
3444
3445                         continue;
3446                 }
3447
3448                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some
3449                  * MDT has ever tried to verify some remote MDT-object that
3450                  * resides on this MDT, but this MDT failed to respond such
3451                  * request. So means there may be some remote name entry on
3452                  * other MDT that references this object with another name,
3453                  * so we cannot know whether this linkEA is valid or not.
3454                  * So keep it there and maybe resolved when next LFSCK run. */
3455                 if (ns->ln_flags & LF_INCOMPLETE) {
3456                         lfsck_object_put(env, parent);
3457
3458                         GOTO(out, rc = 0);
3459                 }
3460
3461                 rc = lfsck_namespace_check_name(env, parent, child, cname);
3462                 if (rc == -ENOENT)
3463                         goto lost_parent;
3464
3465                 if (rc < 0) {
3466                         lfsck_object_put(env, parent);
3467
3468                         GOTO(out, rc);
3469                 }
3470
3471                 /* It is an invalid name entry, drop it. */
3472                 if (unlikely(rc > 0)) {
3473                         lfsck_object_put(env, parent);
3474                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3475                                                 &ldata, cname, pfid, true);
3476                         if (rc < 0)
3477                                 GOTO(out, rc);
3478
3479                         if (rc > 0)
3480                                 repaired = true;
3481
3482                         continue;
3483                 }
3484
3485                 /* Add the missing name entry back to the namespace. */
3486                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
3487                                                    cname->ln_name);
3488                 if (unlikely(rc == -ESTALE))
3489                         /* It may happen when the remote object has been
3490                          * removed, but the local MDT is not aware of that. */
3491                         goto lost_parent;
3492
3493                 if (unlikely(rc == -EEXIST))
3494                         /* Unfortunately, someone reused the name under the
3495                          * parent by race. So we have to remove the linkEA
3496                          * entry from current child object. It means that the
3497                          * LFSCK cannot recover the system totally back to
3498                          * its original status, but it is necessary to make
3499                          * the current system to be consistent.
3500                          *
3501                          * It also may be because of the LFSCK found some
3502                          * internal status of create operation. Under such
3503                          * case, nothing to be done. */
3504                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
3505                                         parent, child, &ldata, cname, pfid);
3506                 else
3507                         linkea_next_entry(&ldata);
3508
3509                 lfsck_object_put(env, parent);
3510                 if (rc < 0)
3511                         GOTO(out, rc);
3512
3513                 if (rc > 0)
3514                         repaired = true;
3515         }
3516
3517         GOTO(out, rc = 0);
3518
3519 out:
3520         if (rc < 0 && rc != -ENODATA)
3521                 return rc;
3522
3523         if (rc == 0) {
3524                 LASSERT(ldata.ld_leh != NULL);
3525
3526                 count = ldata.ld_leh->leh_reccount;
3527         }
3528
3529         if (count == 0) {
3530                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some
3531                  * MDT has ever tried to verify some remote MDT-object that
3532                  * resides on this MDT, but this MDT failed to respond such
3533                  * request. So means there may be some remote name entry on
3534                  * other MDT that references this object with another name,
3535                  * so we cannot know whether this linkEA is valid or not.
3536                  * So keep it there and maybe resolved when next LFSCK run. */
3537                 if (!(ns->ln_flags & LF_INCOMPLETE)) {
3538                         /* If the child becomes orphan, then insert it into
3539                          * the global .lustre/lost+found/MDTxxxx directory. */
3540                         rc = lfsck_namespace_insert_orphan(env, com, child,
3541                                                            "", "O", &count);
3542                         if (rc < 0)
3543                                 return rc;
3544
3545                         if (rc > 0) {
3546                                 ns->ln_mul_ref_repaired++;
3547                                 repaired = true;
3548                         }
3549                 }
3550         } else {
3551                 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
3552                 if (rc != 0)
3553                         return rc;
3554
3555                 if (la->la_nlink != 0 && la->la_nlink != count) {
3556                         rc = lfsck_namespace_repair_nlink(env, com, child, la);
3557                         if (rc > 0) {
3558                                 ns->ln_objs_nlink_repaired++;
3559                                 rc = 0;
3560                         }
3561                 }
3562         }
3563
3564         if (repaired) {
3565                 if (la->la_nlink > 1)
3566                         ns->ln_mul_linked_repaired++;
3567
3568                 if (rc == 0)
3569                         rc = 1;
3570         }
3571
3572         return rc;
3573 }
3574
3575 static void lfsck_namespace_dump_statistics(struct seq_file *m,
3576                                             struct lfsck_namespace *ns,
3577                                             __u64 checked_phase1,
3578                                             __u64 checked_phase2,
3579                                             __u32 time_phase1,
3580                                             __u32 time_phase2)
3581 {
3582         seq_printf(m, "checked_phase1: "LPU64"\n"
3583                       "checked_phase2: "LPU64"\n"
3584                       "updated_phase1: "LPU64"\n"
3585                       "updated_phase2: "LPU64"\n"
3586                       "failed_phase1: "LPU64"\n"
3587                       "failed_phase2: "LPU64"\n"
3588                       "directories: "LPU64"\n"
3589                       "dirent_repaired: "LPU64"\n"
3590                       "linkea_repaired: "LPU64"\n"
3591                       "nlinks_repaired: "LPU64"\n"
3592                       "multiple_linked_checked: "LPU64"\n"
3593                       "multiple_linked_repaired: "LPU64"\n"
3594                       "unknown_inconsistency: "LPU64"\n"
3595                       "unmatched_pairs_repaired: "LPU64"\n"
3596                       "dangling_repaired: "LPU64"\n"
3597                       "multiple_referenced_repaired: "LPU64"\n"
3598                       "bad_file_type_repaired: "LPU64"\n"
3599                       "lost_dirent_repaired: "LPU64"\n"
3600                       "local_lost_found_scanned: "LPU64"\n"
3601                       "local_lost_found_moved: "LPU64"\n"
3602                       "local_lost_found_skipped: "LPU64"\n"
3603                       "local_lost_found_failed: "LPU64"\n"
3604                       "striped_dirs_scanned: "LPU64"\n"
3605                       "striped_dirs_repaired: "LPU64"\n"
3606                       "striped_dirs_failed: "LPU64"\n"
3607                       "striped_dirs_disabled: "LPU64"\n"
3608                       "striped_dirs_skipped: "LPU64"\n"
3609                       "striped_shards_scanned: "LPU64"\n"
3610                       "striped_shards_repaired: "LPU64"\n"
3611                       "striped_shards_failed: "LPU64"\n"
3612                       "striped_shards_skipped: "LPU64"\n"
3613                       "name_hash_repaired: "LPU64"\n"
3614                       "success_count: %u\n"
3615                       "run_time_phase1: %u seconds\n"
3616                       "run_time_phase2: %u seconds\n",
3617                       checked_phase1,
3618                       checked_phase2,
3619                       ns->ln_items_repaired,
3620                       ns->ln_objs_repaired_phase2,
3621                       ns->ln_items_failed,
3622                       ns->ln_objs_failed_phase2,
3623                       ns->ln_dirs_checked,
3624                       ns->ln_dirent_repaired,
3625                       ns->ln_linkea_repaired,
3626                       ns->ln_objs_nlink_repaired,
3627                       ns->ln_mul_linked_checked,
3628                       ns->ln_mul_linked_repaired,
3629                       ns->ln_unknown_inconsistency,
3630                       ns->ln_unmatched_pairs_repaired,
3631                       ns->ln_dangling_repaired,
3632                       ns->ln_mul_ref_repaired,
3633                       ns->ln_bad_type_repaired,
3634                       ns->ln_lost_dirent_repaired,
3635                       ns->ln_local_lpf_scanned,
3636                       ns->ln_local_lpf_moved,
3637                       ns->ln_local_lpf_skipped,
3638                       ns->ln_local_lpf_failed,
3639                       ns->ln_striped_dirs_scanned,
3640                       ns->ln_striped_dirs_repaired,
3641                       ns->ln_striped_dirs_failed,
3642                       ns->ln_striped_dirs_disabled,
3643                       ns->ln_striped_dirs_skipped,
3644                       ns->ln_striped_shards_scanned,
3645                       ns->ln_striped_shards_repaired,
3646                       ns->ln_striped_shards_failed,
3647                       ns->ln_striped_shards_skipped,
3648                       ns->ln_name_hash_repaired,
3649                       ns->ln_success_count,
3650                       time_phase1,
3651                       time_phase2);
3652 }
3653
3654 static void lfsck_namespace_release_lmv(const struct lu_env *env,
3655                                         struct lfsck_component *com)
3656 {
3657         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3658         struct lfsck_namespace          *ns     = com->lc_file_ram;
3659
3660         while (!list_empty(&lfsck->li_list_lmv)) {
3661                 struct lfsck_lmv_unit   *llu;
3662                 struct lfsck_lmv        *llmv;
3663
3664                 llu = list_entry(lfsck->li_list_lmv.next,
3665                                  struct lfsck_lmv_unit, llu_link);
3666                 llmv = &llu->llu_lmv;
3667
3668                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
3669                          "still in using: %u\n",
3670                          atomic_read(&llmv->ll_ref));
3671
3672                 ns->ln_striped_dirs_skipped++;
3673                 lfsck_lmv_put(env, llmv);
3674         }
3675 }
3676
3677 static int lfsck_namespace_check_for_double_scan(const struct lu_env *env,
3678                                                  struct lfsck_component *com,
3679                                                  struct dt_object *obj)
3680 {
3681         struct lu_attr *la = &lfsck_env_info(env)->lti_la;
3682         int             rc;
3683
3684         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3685         if (rc != 0)
3686                 return rc;
3687
3688         /* zero-linkEA object may be orphan, but it also maybe because
3689          * of upgrading. Currently, we cannot record it for double scan.
3690          * Because it may cause the LFSCK trace file to be too large. */
3691
3692         /* "la_ctime" == 1 means that it has ever been removed from
3693          * backend /lost+found directory but not been added back to
3694          * the normal namespace yet. */
3695
3696         if ((S_ISREG(lfsck_object_type(obj)) && la->la_nlink > 1) ||
3697             unlikely(la->la_ctime == 1))
3698                 rc = lfsck_namespace_trace_update(env, com, lfsck_dto2fid(obj),
3699                                                   LNTF_CHECK_LINKEA, true);
3700
3701         return rc;
3702 }
3703
3704 /* namespace APIs */
3705
3706 static int lfsck_namespace_reset(const struct lu_env *env,
3707                                  struct lfsck_component *com, bool init)
3708 {
3709         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3710         struct lfsck_namespace          *ns     = com->lc_file_ram;
3711         struct lfsck_assistant_data     *lad    = com->lc_data;
3712         struct dt_object                *root;
3713         struct dt_object                *dto;
3714         int                              rc;
3715         ENTRY;
3716
3717         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
3718         if (IS_ERR(root))
3719                 GOTO(log, rc = PTR_ERR(root));
3720
3721         if (unlikely(!dt_try_as_dir(env, root)))
3722                 GOTO(put, rc = -ENOTDIR);
3723
3724         down_write(&com->lc_sem);
3725         if (init) {
3726                 memset(ns, 0, sizeof(*ns));
3727         } else {
3728                 __u32 count = ns->ln_success_count;
3729                 __u64 last_time = ns->ln_time_last_complete;
3730
3731                 memset(ns, 0, sizeof(*ns));
3732                 ns->ln_success_count = count;
3733                 ns->ln_time_last_complete = last_time;
3734         }
3735         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
3736         ns->ln_status = LS_INIT;
3737
3738         lfsck_object_put(env, com->lc_obj);
3739         com->lc_obj = NULL;
3740         dto = lfsck_namespace_load_one_trace_file(env, com, root,
3741                                 LFSCK_NAMESPACE, NULL, true);
3742         if (IS_ERR(dto))
3743                 GOTO(out, rc = PTR_ERR(dto));
3744
3745         com->lc_obj = dto;
3746         rc = lfsck_namespace_load_sub_trace_files(env, com, true);
3747         if (rc != 0)
3748                 GOTO(out, rc);
3749
3750         lad->lad_incomplete = 0;
3751         CFS_RESET_BITMAP(lad->lad_bitmap);
3752
3753         rc = lfsck_namespace_store(env, com, true);
3754
3755         GOTO(out, rc);
3756
3757 out:
3758         up_write(&com->lc_sem);
3759
3760 put:
3761         lu_object_put(env, &root->do_lu);
3762 log:
3763         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
3764                lfsck_lfsck2name(lfsck), rc);
3765         return rc;
3766 }
3767
3768 static void
3769 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
3770                      bool new_checked)
3771 {
3772         struct lfsck_namespace *ns = com->lc_file_ram;
3773
3774         down_write(&com->lc_sem);
3775         if (new_checked)
3776                 com->lc_new_checked++;
3777         lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3778         up_write(&com->lc_sem);
3779 }
3780
3781 static void lfsck_namespace_close_dir(const struct lu_env *env,
3782                                       struct lfsck_component *com)
3783 {
3784         struct lfsck_namespace          *ns     = com->lc_file_ram;
3785         struct lfsck_assistant_data     *lad    = com->lc_data;
3786         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3787         struct lfsck_lmv                *llmv   = lfsck->li_lmv;
3788         struct lfsck_namespace_req      *lnr;
3789         __u32                            size   =
3790                                 sizeof(*lnr) + LFSCK_TMPBUF_LEN;
3791         bool                             wakeup = false;
3792         ENTRY;
3793
3794         if (llmv == NULL)
3795                 RETURN_EXIT;
3796
3797         OBD_ALLOC(lnr, size);
3798         if (lnr == NULL) {
3799                 ns->ln_striped_dirs_skipped++;
3800
3801                 RETURN_EXIT;
3802         }
3803
3804         /* Generate a dummy request to indicate that all shards' name entry
3805          * in this striped directory has been scanned for the first time. */
3806         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
3807         lnr->lnr_obj = lfsck_object_get(lfsck->li_obj_dir);
3808         lnr->lnr_lmv = lfsck_lmv_get(llmv);
3809         lnr->lnr_fid = *lfsck_dto2fid(lfsck->li_obj_dir);
3810         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
3811         lnr->lnr_dir_cookie = MDS_DIR_END_OFF;
3812         lnr->lnr_size = size;
3813
3814         spin_lock(&lad->lad_lock);
3815         if (lad->lad_assistant_status < 0) {
3816                 spin_unlock(&lad->lad_lock);
3817                 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
3818                 ns->ln_striped_dirs_skipped++;
3819
3820                 RETURN_EXIT;
3821         }
3822
3823         list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
3824         if (lad->lad_prefetched == 0)
3825                 wakeup = true;
3826
3827         lad->lad_prefetched++;
3828         spin_unlock(&lad->lad_lock);
3829         if (wakeup)
3830                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
3831
3832         EXIT;
3833 }
3834
3835 static int lfsck_namespace_open_dir(const struct lu_env *env,
3836                                     struct lfsck_component *com)
3837 {
3838         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3839         struct lfsck_namespace  *ns     = com->lc_file_ram;
3840         struct lfsck_lmv        *llmv   = lfsck->li_lmv;
3841         int                      rc     = 0;
3842         ENTRY;
3843
3844         if (llmv == NULL)
3845                 RETURN(0);
3846
3847         if (llmv->ll_lmv_master) {
3848                 struct lmv_mds_md_v1 *lmv = &llmv->ll_lmv;
3849
3850                 if (lmv->lmv_master_mdt_index !=
3851                     lfsck_dev_idx(lfsck->li_bottom)) {
3852                         lmv->lmv_master_mdt_index =
3853                                 lfsck_dev_idx(lfsck->li_bottom);
3854                         ns->ln_flags |= LF_INCONSISTENT;
3855                         llmv->ll_lmv_updated = 1;
3856                 }
3857         } else {
3858                 rc = lfsck_namespace_verify_stripe_slave(env, com,
3859                                         lfsck->li_obj_dir, llmv);
3860         }
3861
3862         RETURN(rc > 0 ? 0 : rc);
3863 }
3864
3865 static int lfsck_namespace_checkpoint(const struct lu_env *env,
3866                                       struct lfsck_component *com, bool init)
3867 {
3868         struct lfsck_instance   *lfsck = com->lc_lfsck;
3869         struct lfsck_namespace  *ns    = com->lc_file_ram;
3870         int                      rc;
3871
3872         if (!init) {
3873                 rc = lfsck_checkpoint_generic(env, com);
3874                 if (rc != 0)
3875                         goto log;
3876         }
3877
3878         down_write(&com->lc_sem);
3879         if (init) {
3880                 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
3881         } else {
3882                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
3883                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3884                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3885                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
3886                 ns->ln_items_checked += com->lc_new_checked;
3887                 com->lc_new_checked = 0;
3888         }
3889
3890         rc = lfsck_namespace_store(env, com, false);
3891         up_write(&com->lc_sem);
3892
3893 log:
3894         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
3895                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
3896                lfsck->li_pos_current.lp_oit_cookie,
3897                PFID(&lfsck->li_pos_current.lp_dir_parent),
3898                lfsck->li_pos_current.lp_dir_cookie, rc);
3899
3900         return rc > 0 ? 0 : rc;
3901 }
3902
3903 static int lfsck_namespace_prep(const struct lu_env *env,
3904                                 struct lfsck_component *com,
3905                                 struct lfsck_start_param *lsp)
3906 {
3907         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3908         struct lfsck_namespace  *ns     = com->lc_file_ram;
3909         struct lfsck_position   *pos    = &com->lc_pos_start;
3910         int                      rc;
3911
3912         rc = lfsck_namespace_load_bitmap(env, com);
3913         if (rc != 0 || ns->ln_status == LS_COMPLETED) {
3914                 rc = lfsck_namespace_reset(env, com, false);
3915                 if (rc == 0)
3916                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
3917
3918                 if (rc != 0) {
3919                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
3920                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
3921
3922                         return rc;
3923                 }
3924         }
3925
3926         down_write(&com->lc_sem);
3927         ns->ln_time_latest_start = cfs_time_current_sec();
3928         spin_lock(&lfsck->li_lock);
3929
3930         if (ns->ln_flags & LF_SCANNED_ONCE) {
3931                 if (!lfsck->li_drop_dryrun ||
3932                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
3933                         ns->ln_status = LS_SCANNING_PHASE2;
3934                         list_move_tail(&com->lc_link,
3935                                        &lfsck->li_list_double_scan);
3936                         if (!list_empty(&com->lc_link_dir))
3937                                 list_del_init(&com->lc_link_dir);
3938                         lfsck_pos_set_zero(pos);
3939                 } else {
3940                         ns->ln_status = LS_SCANNING_PHASE1;
3941                         ns->ln_run_time_phase1 = 0;
3942                         ns->ln_run_time_phase2 = 0;
3943                         ns->ln_items_checked = 0;
3944                         ns->ln_items_repaired = 0;
3945                         ns->ln_items_failed = 0;
3946                         ns->ln_dirs_checked = 0;
3947                         ns->ln_objs_checked_phase2 = 0;
3948                         ns->ln_objs_repaired_phase2 = 0;
3949                         ns->ln_objs_failed_phase2 = 0;
3950                         ns->ln_objs_nlink_repaired = 0;
3951                         ns->ln_dirent_repaired = 0;
3952                         ns->ln_linkea_repaired = 0;
3953                         ns->ln_mul_linked_checked = 0;
3954                         ns->ln_mul_linked_repaired = 0;
3955                         ns->ln_unknown_inconsistency = 0;
3956                         ns->ln_unmatched_pairs_repaired = 0;
3957                         ns->ln_dangling_repaired = 0;
3958                         ns->ln_mul_ref_repaired = 0;
3959                         ns->ln_bad_type_repaired = 0;
3960                         ns->ln_lost_dirent_repaired = 0;
3961                         ns->ln_striped_dirs_scanned = 0;
3962                         ns->ln_striped_dirs_repaired = 0;
3963                         ns->ln_striped_dirs_failed = 0;
3964                         ns->ln_striped_dirs_disabled = 0;
3965                         ns->ln_striped_dirs_skipped = 0;
3966                         ns->ln_striped_shards_scanned = 0;
3967                         ns->ln_striped_shards_repaired = 0;
3968                         ns->ln_striped_shards_failed = 0;
3969                         ns->ln_striped_shards_skipped = 0;
3970                         ns->ln_name_hash_repaired = 0;
3971                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
3972                         if (list_empty(&com->lc_link_dir))
3973                                 list_add_tail(&com->lc_link_dir,
3974                                               &lfsck->li_list_dir);
3975                         *pos = ns->ln_pos_first_inconsistent;
3976                 }
3977         } else {
3978                 ns->ln_status = LS_SCANNING_PHASE1;
3979                 if (list_empty(&com->lc_link_dir))
3980                         list_add_tail(&com->lc_link_dir,
3981                                       &lfsck->li_list_dir);
3982                 if (!lfsck->li_drop_dryrun ||
3983                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
3984                         *pos = ns->ln_pos_last_checkpoint;
3985                         pos->lp_oit_cookie++;
3986                 } else {
3987                         *pos = ns->ln_pos_first_inconsistent;
3988                 }
3989         }
3990
3991         spin_unlock(&lfsck->li_lock);
3992         up_write(&com->lc_sem);
3993
3994         rc = lfsck_start_assistant(env, com, lsp);
3995
3996         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
3997                DFID", "LPX64"]: rc = %d\n",
3998                lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
3999                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
4000
4001         return rc;
4002 }
4003
4004 static int lfsck_namespace_exec_oit(const struct lu_env *env,
4005                                     struct lfsck_component *com,
4006                                     struct dt_object *obj)
4007 {
4008         struct lfsck_thread_info *info  = lfsck_env_info(env);
4009         struct lfsck_namespace   *ns    = com->lc_file_ram;
4010         struct lfsck_instance    *lfsck = com->lc_lfsck;
4011         const struct lu_fid      *fid   = lfsck_dto2fid(obj);
4012         struct lu_fid            *pfid  = &info->lti_fid2;
4013         struct lu_name           *cname = &info->lti_name;
4014         struct lu_seq_range      *range = &info->lti_range;
4015         struct dt_device         *dev   = lfsck->li_bottom;
4016         struct seq_server_site   *ss    =
4017                                 lu_site2seq(dev->dd_lu_dev.ld_site);
4018         struct linkea_data        ldata = { NULL };
4019         __u32                     idx   = lfsck_dev_idx(dev);
4020         int                       rc;
4021         ENTRY;
4022
4023         rc = lfsck_links_read(env, obj, &ldata);
4024         if (rc == -ENOENT)
4025                 GOTO(out, rc = 0);
4026
4027         /* -EINVAL means crashed linkEA, should be verified. */
4028         if (rc == -EINVAL) {
4029                 rc = lfsck_namespace_trace_update(env, com, fid,
4030                                                   LNTF_CHECK_LINKEA, true);
4031                 if (rc == 0) {
4032                         struct lustre_handle lh = { 0 };
4033
4034                         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
4035                                               MDS_INODELOCK_UPDATE |
4036                                               MDS_INODELOCK_XATTR, LCK_EX);
4037                         if (rc == 0) {
4038                                 rc = lfsck_namespace_links_remove(env, com,
4039                                                                   obj);
4040                                 lfsck_ibits_unlock(&lh, LCK_EX);
4041                         }
4042                 }
4043
4044                 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
4045         }
4046
4047         if (rc == -ENODATA) {
4048                 rc = lfsck_namespace_check_for_double_scan(env, com, obj);
4049
4050                 GOTO(out, rc);
4051         }
4052
4053         if (rc != 0)
4054                 GOTO(out, rc);
4055
4056         /* Record multiple-linked object. */
4057         if (ldata.ld_leh->leh_reccount > 1) {
4058                 rc = lfsck_namespace_trace_update(env, com, fid,
4059                                                   LNTF_CHECK_LINKEA, true);
4060
4061                 GOTO(out, rc);
4062         }
4063
4064         linkea_first_entry(&ldata);
4065         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
4066         if (!fid_is_sane(pfid)) {
4067                 rc = lfsck_namespace_trace_update(env, com, fid,
4068                                                   LNTF_CHECK_PARENT, true);
4069         } else {
4070                 fld_range_set_mdt(range);
4071                 rc = fld_local_lookup(env, ss->ss_server_fld,
4072                                       fid_seq(pfid), range);
4073                 if ((rc == -ENOENT) ||
4074                     (rc == 0 && range->lsr_index != idx))
4075                         rc = lfsck_namespace_trace_update(env, com, fid,
4076                                                 LNTF_CHECK_LINKEA, true);
4077                 else
4078                         rc = lfsck_namespace_check_for_double_scan(env, com,
4079                                                                    obj);
4080         }
4081
4082         GOTO(out, rc);
4083
4084 out:
4085         down_write(&com->lc_sem);
4086         com->lc_new_checked++;
4087         if (S_ISDIR(lfsck_object_type(obj)))
4088                 ns->ln_dirs_checked++;
4089         if (rc != 0)
4090                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
4091         up_write(&com->lc_sem);
4092
4093         return rc;
4094 }
4095
4096 static int lfsck_namespace_exec_dir(const struct lu_env *env,
4097                                     struct lfsck_component *com,
4098                                     struct lu_dirent *ent, __u16 type)
4099 {
4100         struct lfsck_assistant_data     *lad     = com->lc_data;
4101         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4102         struct lfsck_namespace_req      *lnr;
4103         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
4104         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4105         struct ptlrpc_thread            *athread = &lad->lad_thread;
4106         struct l_wait_info               lwi     = { 0 };
4107         bool                             wakeup  = false;
4108
4109         l_wait_event(mthread->t_ctl_waitq,
4110                      bk->lb_async_windows == 0 ||
4111                      lad->lad_prefetched < bk->lb_async_windows ||
4112                      !thread_is_running(mthread) ||
4113                      thread_is_stopped(athread),
4114                      &lwi);
4115
4116         if (unlikely(!thread_is_running(mthread)) ||
4117                      thread_is_stopped(athread))
4118                 return 0;
4119
4120         if (unlikely(lfsck_is_dead_obj(lfsck->li_obj_dir)))
4121                 return 0;
4122
4123         lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
4124         if (IS_ERR(lnr)) {
4125                 struct lfsck_namespace *ns = com->lc_file_ram;
4126
4127                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
4128                 return PTR_ERR(lnr);
4129         }
4130
4131         spin_lock(&lad->lad_lock);
4132         if (lad->lad_assistant_status < 0) {
4133                 spin_unlock(&lad->lad_lock);
4134                 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
4135                 return lad->lad_assistant_status;
4136         }
4137
4138         list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
4139         if (lad->lad_prefetched == 0)
4140                 wakeup = true;
4141
4142         lad->lad_prefetched++;
4143         spin_unlock(&lad->lad_lock);
4144         if (wakeup)
4145                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
4146
4147         down_write(&com->lc_sem);
4148         com->lc_new_checked++;
4149         up_write(&com->lc_sem);
4150
4151         return 0;
4152 }
4153
4154 static int lfsck_namespace_post(const struct lu_env *env,
4155                                 struct lfsck_component *com,
4156                                 int result, bool init)
4157 {
4158         struct lfsck_instance   *lfsck = com->lc_lfsck;
4159         struct lfsck_namespace  *ns    = com->lc_file_ram;
4160         int                      rc;
4161         ENTRY;
4162
4163         lfsck_post_generic(env, com, &result);
4164
4165         down_write(&com->lc_sem);
4166         lfsck_namespace_release_lmv(env, com);
4167
4168         spin_lock(&lfsck->li_lock);
4169         if (!init)
4170                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
4171         if (result > 0) {
4172                 ns->ln_status = LS_SCANNING_PHASE2;
4173                 ns->ln_flags |= LF_SCANNED_ONCE;
4174                 ns->ln_flags &= ~LF_UPGRADE;
4175                 list_del_init(&com->lc_link_dir);
4176                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
4177         } else if (result == 0) {
4178                 if (lfsck->li_status != 0)
4179                         ns->ln_status = lfsck->li_status;
4180                 else
4181                         ns->ln_status = LS_STOPPED;
4182                 if (ns->ln_status != LS_PAUSED) {
4183                         list_del_init(&com->lc_link_dir);
4184                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4185                 }
4186         } else {
4187                 ns->ln_status = LS_FAILED;
4188                 list_del_init(&com->lc_link_dir);
4189                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4190         }
4191         spin_unlock(&lfsck->li_lock);
4192
4193         if (!init) {
4194                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4195                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4196                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
4197                 ns->ln_items_checked += com->lc_new_checked;
4198                 com->lc_new_checked = 0;
4199         }
4200
4201         rc = lfsck_namespace_store(env, com, false);
4202         up_write(&com->lc_sem);
4203
4204         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
4205                lfsck_lfsck2name(lfsck), rc);
4206
4207         RETURN(rc);
4208 }
4209
4210 static int
4211 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
4212                      struct seq_file *m)
4213 {
4214         struct lfsck_instance   *lfsck = com->lc_lfsck;
4215         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
4216         struct lfsck_namespace  *ns    = com->lc_file_ram;
4217         int                      rc;
4218
4219         down_read(&com->lc_sem);
4220         seq_printf(m, "name: lfsck_namespace\n"
4221                    "magic: %#x\n"
4222                    "version: %d\n"
4223                    "status: %s\n",
4224                    ns->ln_magic,
4225                    bk->lb_version,
4226                    lfsck_status2names(ns->ln_status));
4227
4228         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
4229         if (rc < 0)
4230                 goto out;
4231
4232         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
4233         if (rc < 0)
4234                 goto out;
4235
4236         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
4237                              "time_since_last_completed");
4238         if (rc < 0)
4239                 goto out;
4240
4241         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
4242                              "time_since_latest_start");
4243         if (rc < 0)
4244                 goto out;
4245
4246         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
4247                              "time_since_last_checkpoint");
4248         if (rc < 0)
4249                 goto out;
4250
4251         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
4252                             "latest_start_position");
4253         if (rc < 0)
4254                 goto out;
4255
4256         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
4257                             "last_checkpoint_position");
4258         if (rc < 0)
4259                 goto out;
4260
4261         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
4262                             "first_failure_position");
4263         if (rc < 0)
4264                 goto out;
4265
4266         if (ns->ln_status == LS_SCANNING_PHASE1) {
4267                 struct lfsck_position pos;
4268                 const struct dt_it_ops *iops;
4269                 cfs_duration_t duration = cfs_time_current() -
4270                                           lfsck->li_time_last_checkpoint;
4271                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
4272                 __u64 speed = checked;
4273                 __u64 new_checked = com->lc_new_checked *
4274                                     msecs_to_jiffies(MSEC_PER_SEC);
4275                 __u32 rtime = ns->ln_run_time_phase1 +
4276                               cfs_duration_sec(duration + HALF_SEC);
4277
4278                 if (duration != 0)
4279                         do_div(new_checked, duration);
4280                 if (rtime != 0)
4281                         do_div(speed, rtime);
4282                 lfsck_namespace_dump_statistics(m, ns, checked,
4283                                                 ns->ln_objs_checked_phase2,
4284                                                 rtime, ns->ln_run_time_phase2);
4285
4286                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
4287                               "average_speed_phase2: N/A\n"
4288                               "real_time_speed_phase1: "LPU64" items/sec\n"
4289                               "real_time_speed_phase2: N/A\n",
4290                               speed,
4291                               new_checked);
4292
4293                 LASSERT(lfsck->li_di_oit != NULL);
4294
4295                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
4296
4297                 /* The low layer otable-based iteration position may NOT
4298                  * exactly match the namespace-based directory traversal
4299                  * cookie. Generally, it is not a serious issue. But the
4300                  * caller should NOT make assumption on that. */
4301                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
4302                 if (!lfsck->li_current_oit_processed)
4303                         pos.lp_oit_cookie--;
4304
4305                 spin_lock(&lfsck->li_lock);
4306                 if (lfsck->li_di_dir != NULL) {
4307                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
4308                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
4309                                 fid_zero(&pos.lp_dir_parent);
4310                                 pos.lp_dir_cookie = 0;
4311                         } else {
4312                                 pos.lp_dir_parent =
4313                                         *lfsck_dto2fid(lfsck->li_obj_dir);
4314                         }
4315                 } else {
4316                         fid_zero(&pos.lp_dir_parent);
4317                         pos.lp_dir_cookie = 0;
4318                 }
4319                 spin_unlock(&lfsck->li_lock);
4320                 lfsck_pos_dump(m, &pos, "current_position");
4321         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
4322                 cfs_duration_t duration = cfs_time_current() -
4323                                           lfsck->li_time_last_checkpoint;
4324                 __u64 checked = ns->ln_objs_checked_phase2 +
4325                                 com->lc_new_checked;
4326                 __u64 speed1 = ns->ln_items_checked;
4327                 __u64 speed2 = checked;
4328                 __u64 new_checked = com->lc_new_checked *
4329                                     msecs_to_jiffies(MSEC_PER_SEC);
4330                 __u32 rtime = ns->ln_run_time_phase2 +
4331                               cfs_duration_sec(duration + HALF_SEC);
4332
4333                 if (duration != 0)
4334                         do_div(new_checked, duration);
4335                 if (ns->ln_run_time_phase1 != 0)
4336                         do_div(speed1, ns->ln_run_time_phase1);
4337                 if (rtime != 0)
4338                         do_div(speed2, rtime);
4339                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
4340                                                 checked,
4341                                                 ns->ln_run_time_phase1, rtime);
4342
4343                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
4344                               "average_speed_phase2: "LPU64" objs/sec\n"
4345                               "real_time_speed_phase1: N/A\n"
4346                               "real_time_speed_phase2: "LPU64" objs/sec\n"
4347                               "current_position: "DFID"\n",
4348                               speed1,
4349                               speed2,
4350                               new_checked,
4351                               PFID(&ns->ln_fid_latest_scanned_phase2));
4352         } else {
4353                 __u64 speed1 = ns->ln_items_checked;
4354                 __u64 speed2 = ns->ln_objs_checked_phase2;
4355
4356                 if (ns->ln_run_time_phase1 != 0)
4357                         do_div(speed1, ns->ln_run_time_phase1);
4358                 if (ns->ln_run_time_phase2 != 0)
4359                         do_div(speed2, ns->ln_run_time_phase2);
4360                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
4361                                                 ns->ln_objs_checked_phase2,
4362                                                 ns->ln_run_time_phase1,
4363                                                 ns->ln_run_time_phase2);
4364
4365                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
4366                               "average_speed_phase2: "LPU64" objs/sec\n"
4367                               "real_time_speed_phase1: N/A\n"
4368                               "real_time_speed_phase2: N/A\n"
4369                               "current_position: N/A\n",
4370                               speed1,
4371                               speed2);
4372         }
4373 out:
4374         up_read(&com->lc_sem);
4375         return 0;
4376 }
4377
4378 static int lfsck_namespace_double_scan(const struct lu_env *env,
4379                                        struct lfsck_component *com)
4380 {
4381         struct lfsck_namespace          *ns     = com->lc_file_ram;
4382         struct lfsck_assistant_data     *lad    = com->lc_data;
4383         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
4384         struct lfsck_tgt_desc           *ltd;
4385         struct lfsck_tgt_desc           *next;
4386         int                              rc;
4387
4388         rc = lfsck_double_scan_generic(env, com, ns->ln_status);
4389         if (thread_is_stopped(&lad->lad_thread)) {
4390                 LASSERT(list_empty(&lad->lad_req_list));
4391                 LASSERT(list_empty(&lad->lad_mdt_phase1_list));
4392
4393                 spin_lock(&ltds->ltd_lock);
4394                 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4395                                          ltd_namespace_phase_list) {
4396                         list_del_init(&ltd->ltd_namespace_phase_list);
4397                 }
4398                 spin_unlock(&ltds->ltd_lock);
4399         }
4400
4401         return rc;
4402 }
4403
4404 static void lfsck_namespace_data_release(const struct lu_env *env,
4405                                          struct lfsck_component *com)
4406 {
4407         struct lfsck_assistant_data     *lad    = com->lc_data;
4408         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
4409         struct lfsck_tgt_desc           *ltd;
4410         struct lfsck_tgt_desc           *next;
4411
4412         LASSERT(lad != NULL);
4413         LASSERT(thread_is_init(&lad->lad_thread) ||
4414                 thread_is_stopped(&lad->lad_thread));
4415         LASSERT(list_empty(&lad->lad_req_list));
4416
4417         com->lc_data = NULL;
4418         lfsck_namespace_release_lmv(env, com);
4419
4420         spin_lock(&ltds->ltd_lock);
4421         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
4422                                  ltd_namespace_phase_list) {
4423                 list_del_init(&ltd->ltd_namespace_phase_list);
4424         }
4425         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4426                                  ltd_namespace_phase_list) {
4427                 list_del_init(&ltd->ltd_namespace_phase_list);
4428         }
4429         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
4430                                  ltd_namespace_list) {
4431                 list_del_init(&ltd->ltd_namespace_list);
4432         }
4433         spin_unlock(&ltds->ltd_lock);
4434
4435         if (likely(lad->lad_bitmap != NULL))
4436                 CFS_FREE_BITMAP(lad->lad_bitmap);
4437
4438         OBD_FREE_PTR(lad);
4439 }
4440
4441 static void lfsck_namespace_quit(const struct lu_env *env,
4442                                  struct lfsck_component *com)
4443 {
4444         struct lfsck_assistant_data     *lad    = com->lc_data;
4445         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
4446         struct lfsck_tgt_desc           *ltd;
4447         struct lfsck_tgt_desc           *next;
4448
4449         LASSERT(lad != NULL);
4450
4451         lfsck_quit_generic(env, com);
4452
4453         LASSERT(thread_is_init(&lad->lad_thread) ||
4454                 thread_is_stopped(&lad->lad_thread));
4455         LASSERT(list_empty(&lad->lad_req_list));
4456
4457         lfsck_namespace_release_lmv(env, com);
4458
4459         spin_lock(&ltds->ltd_lock);
4460         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
4461                                  ltd_namespace_phase_list) {
4462                 list_del_init(&ltd->ltd_namespace_phase_list);
4463         }
4464         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4465                                  ltd_namespace_phase_list) {
4466                 list_del_init(&ltd->ltd_namespace_phase_list);
4467         }
4468         spin_unlock(&ltds->ltd_lock);
4469 }
4470
4471 static int lfsck_namespace_in_notify(const struct lu_env *env,
4472                                      struct lfsck_component *com,
4473                                      struct lfsck_request *lr,
4474                                      struct thandle *th)
4475 {
4476         struct lfsck_instance           *lfsck = com->lc_lfsck;
4477         struct lfsck_namespace          *ns    = com->lc_file_ram;
4478         struct lfsck_assistant_data     *lad   = com->lc_data;
4479         struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
4480         struct lfsck_tgt_desc           *ltd;
4481         int                              rc;
4482         bool                             fail  = false;
4483         ENTRY;
4484
4485         switch (lr->lr_event) {
4486         case LE_SKIP_NLINK_DECLARE: {
4487                 struct dt_object        *obj;
4488                 struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
4489                 int                      idx;
4490                 __u8                     flags = 0;
4491
4492                 LASSERT(th != NULL);
4493
4494                 idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
4495                 obj = com->lc_sub_trace_objs[idx].lsto_obj;
4496                 fid_cpu_to_be(key, &lr->lr_fid);
4497                 mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4498                 rc = dt_declare_delete(env, obj,
4499                                        (const struct dt_key *)key, th);
4500                 if (rc == 0)
4501                         rc = dt_declare_insert(env, obj,
4502                                                (const struct dt_rec *)&flags,
4503                                                (const struct dt_key *)key, th);
4504                 mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4505
4506                 RETURN(rc);
4507         }
4508         case LE_SKIP_NLINK: {
4509                 struct dt_object        *obj;
4510                 struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
4511                 int                      idx;
4512                 __u8                     flags = 0;
4513                 bool                     exist = false;
4514                 ENTRY;
4515
4516                 LASSERT(th != NULL);
4517
4518                 idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
4519                 obj = com->lc_sub_trace_objs[idx].lsto_obj;
4520                 fid_cpu_to_be(key, &lr->lr_fid);
4521                 mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4522                 rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
4523                                (const struct dt_key *)key, BYPASS_CAPA);
4524                 if (rc == 0) {
4525                         if (flags & LNTF_SKIP_NLINK) {
4526                                 mutex_unlock(
4527                                 &com->lc_sub_trace_objs[idx].lsto_mutex);
4528
4529                                 RETURN(0);
4530                         }
4531
4532                         exist = true;
4533                 } else if (rc != -ENOENT) {
4534                         GOTO(log, rc);
4535                 }
4536
4537                 flags |= LNTF_SKIP_NLINK;
4538                 if (exist) {
4539                         rc = dt_delete(env, obj, (const struct dt_key *)key,
4540                                        th, BYPASS_CAPA);
4541                         if (rc != 0)
4542                                 GOTO(log, rc);
4543                 }
4544
4545                 rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
4546                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
4547
4548                 GOTO(log, rc);
4549
4550 log:
4551                 mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4552                 CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
4553                        " to be skipped for namespace double scan: rc = %d\n",
4554                        lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
4555
4556                 if (rc != 0)
4557                         /* If we cannot record this object in the LFSCK tracing,
4558                          * we have to mark the LFSC as LF_INCOMPLETE, then the
4559                          * LFSCK will skip nlink attribute verification for
4560                          * all objects. */
4561                         ns->ln_flags |= LF_INCOMPLETE;
4562
4563                 return 0;
4564         }
4565         case LE_SET_LMV_MASTER: {
4566                 struct dt_object        *obj;
4567
4568                 obj = lfsck_object_find_by_dev(env, lfsck->li_bottom,
4569                                                &lr->lr_fid);
4570                 if (IS_ERR(obj))
4571                         RETURN(PTR_ERR(obj));
4572
4573                 rc = lfsck_namespace_notify_lmv_master_local(env, com, obj);
4574                 lfsck_object_put(env, obj);
4575
4576                 RETURN(rc > 0 ? 0 : rc);
4577         }
4578         case LE_SET_LMV_SLAVE: {
4579                 if (!(lr->lr_flags & LEF_RECHECK_NAME_HASH))
4580                         ns->ln_striped_shards_repaired++;
4581
4582                 rc = lfsck_namespace_trace_update(env, com, &lr->lr_fid,
4583                                                   LNTF_RECHECK_NAME_HASH, true);
4584
4585                 RETURN(rc > 0 ? 0 : rc);
4586         }
4587         case LE_PHASE1_DONE:
4588         case LE_PHASE2_DONE:
4589         case LE_PEER_EXIT:
4590                 break;
4591         default:
4592                 RETURN(-EINVAL);
4593         }
4594
4595         CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
4596                "status %d, flags %x\n", lfsck_lfsck2name(lfsck), lr->lr_event,
4597                lr->lr_index, lr->lr_status, lr->lr_flags2);
4598
4599         spin_lock(&ltds->ltd_lock);
4600         ltd = LTD_TGT(ltds, lr->lr_index);
4601         if (ltd == NULL) {
4602                 spin_unlock(&ltds->ltd_lock);
4603
4604                 RETURN(-ENXIO);
4605         }
4606
4607         list_del_init(&ltd->ltd_namespace_phase_list);
4608         switch (lr->lr_event) {
4609         case LE_PHASE1_DONE:
4610                 if (lr->lr_status <= 0) {
4611                         ltd->ltd_namespace_done = 1;
4612                         list_del_init(&ltd->ltd_namespace_list);
4613                         CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
4614                                "phase1 for namespace LFSCK: rc = %d.\n",
4615                                lfsck_lfsck2name(lfsck),
4616                                ltd->ltd_index, lr->lr_status);
4617                         ns->ln_flags |= LF_INCOMPLETE;
4618                         fail = true;
4619                         break;
4620                 }
4621
4622                 if (lr->lr_flags2 & LF_INCOMPLETE)
4623                         ns->ln_flags |= LF_INCOMPLETE;
4624
4625                 if (list_empty(&ltd->ltd_namespace_list))
4626                         list_add_tail(&ltd->ltd_namespace_list,
4627                                       &lad->lad_mdt_list);
4628                 list_add_tail(&ltd->ltd_namespace_phase_list,
4629                               &lad->lad_mdt_phase2_list);
4630                 break;
4631         case LE_PHASE2_DONE:
4632                 ltd->ltd_namespace_done = 1;
4633                 list_del_init(&ltd->ltd_namespace_list);
4634                 break;
4635         case LE_PEER_EXIT:
4636                 fail = true;
4637                 ltd->ltd_namespace_done = 1;
4638                 list_del_init(&ltd->ltd_namespace_list);
4639                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
4640                         CDEBUG(D_LFSCK,
4641                                "%s: the peer MDT %x exit namespace LFSCK\n",
4642                                lfsck_lfsck2name(lfsck), ltd->ltd_index);
4643                         ns->ln_flags |= LF_INCOMPLETE;
4644                 }
4645                 break;
4646         default:
4647                 break;
4648         }
4649         spin_unlock(&ltds->ltd_lock);
4650
4651         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
4652                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
4653
4654                 memset(stop, 0, sizeof(*stop));
4655                 stop->ls_status = lr->lr_status;
4656                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
4657                 lfsck_stop(env, lfsck->li_bottom, stop);
4658         } else if (lfsck_phase2_next_ready(lad)) {
4659                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
4660         }
4661
4662         RETURN(0);
4663 }
4664
4665 static int lfsck_namespace_query(const struct lu_env *env,
4666                                  struct lfsck_component *com)
4667 {
4668         struct lfsck_namespace *ns = com->lc_file_ram;
4669
4670         return ns->ln_status;
4671 }
4672
4673 static struct lfsck_operations lfsck_namespace_ops = {
4674         .lfsck_reset            = lfsck_namespace_reset,
4675         .lfsck_fail             = lfsck_namespace_fail,
4676         .lfsck_close_dir        = lfsck_namespace_close_dir,
4677         .lfsck_open_dir         = lfsck_namespace_open_dir,
4678         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
4679         .lfsck_prep             = lfsck_namespace_prep,
4680         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
4681         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
4682         .lfsck_post             = lfsck_namespace_post,
4683         .lfsck_dump             = lfsck_namespace_dump,
4684         .lfsck_double_scan      = lfsck_namespace_double_scan,
4685         .lfsck_data_release     = lfsck_namespace_data_release,
4686         .lfsck_quit             = lfsck_namespace_quit,
4687         .lfsck_in_notify        = lfsck_namespace_in_notify,
4688         .lfsck_query            = lfsck_namespace_query,
4689 };
4690
4691 /**
4692  * Repair dangling name entry.
4693  *
4694  * For the name entry with dangling reference, we need to repare the
4695  * inconsistency according to the LFSCK sponsor's requirement:
4696  *
4697  * 1) Keep the inconsistency there and report the inconsistency case,
4698  *    then give the chance to the application to find related issues,
4699  *    and the users can make the decision about how to handle it with
4700  *    more human knownledge. (by default)
4701  *
4702  * 2) Re-create the missing MDT-object with the FID information.
4703  *
4704  * \param[in] env       pointer to the thread context
4705  * \param[in] com       pointer to the lfsck component
4706  * \param[in] child     pointer to the object corresponding to the dangling
4707  *                      name entry
4708  * \param[in] lnr       pointer to the namespace request that contains the
4709  *                      name's name, parent object, parent's LMV, and ect.
4710  *
4711  * \retval              positive number if no need to repair
4712  * \retval              zero for repaired successfully
4713  * \retval              negative error number on failure
4714  */
4715 int lfsck_namespace_repair_dangling(const struct lu_env *env,
4716                                     struct lfsck_component *com,
4717                                     struct dt_object *child,
4718                                     struct lfsck_namespace_req *lnr)
4719 {
4720         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4721         struct lu_attr                  *la     = &info->lti_la;
4722         struct dt_allocation_hint       *hint   = &info->lti_hint;
4723         struct dt_object_format         *dof    = &info->lti_dof;
4724         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
4725         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
4726         struct dt_object                *parent = lnr->lnr_obj;
4727         const struct lu_name            *cname;
4728         struct linkea_data               ldata  = { NULL };
4729         struct lustre_handle             lh     = { 0 };
4730         struct lu_buf                    linkea_buf;
4731         struct lu_buf                    lmv_buf;
4732         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4733         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
4734         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
4735         struct thandle                  *th     = NULL;
4736         int                              rc     = 0;
4737         __u16                            type   = lnr->lnr_type;
4738         bool                             create;
4739         ENTRY;
4740
4741         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
4742         if (bk->lb_param & LPF_CREATE_MDTOBJ)
4743                 create = true;
4744         else
4745                 create = false;
4746
4747         if (!create || bk->lb_param & LPF_DRYRUN)
4748                 GOTO(log, rc = 0);
4749
4750         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
4751         if (rc != 0)
4752                 GOTO(log, rc);
4753
4754         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
4755         if (rc != 0)
4756                 GOTO(log, rc);
4757
4758         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
4759                               MDS_INODELOCK_UPDATE, LCK_EX);
4760         if (rc != 0)
4761                 GOTO(log, rc);
4762
4763         rc = lfsck_namespace_check_exist(env, parent, child, lnr->lnr_name);
4764         if (rc != 0)
4765                 GOTO(log, rc);
4766
4767         th = dt_trans_create(env, dev);
4768         if (IS_ERR(th))
4769                 GOTO(log, rc = PTR_ERR(th));
4770
4771         /* Set the ctime as zero, then others can know it is created for
4772          * repairing dangling name entry by LFSCK. And if the LFSCK made
4773          * wrong decision and the real MDT-object has been found later,
4774          * then the LFSCK has chance to fix the incosistency properly. */
4775         memset(la, 0, sizeof(*la));
4776         la->la_mode = (type & S_IFMT) | 0600;
4777         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
4778                         LA_ATIME | LA_MTIME | LA_CTIME;
4779
4780         child->do_ops->do_ah_init(env, hint, parent, child,
4781                                   la->la_mode & S_IFMT);
4782
4783         memset(dof, 0, sizeof(*dof));
4784         dof->dof_type = dt_mode_to_dft(type);
4785         /* If the target is a regular file, then the LFSCK will only create
4786          * the MDT-object without stripes (dof->dof_reg.striped = 0). related
4787          * OST-objects will be created when write open. */
4788
4789         /* 1a. create child. */
4790         rc = dt_declare_create(env, child, la, hint, dof, th);
4791         if (rc != 0)
4792                 GOTO(stop, rc);
4793
4794         if (S_ISDIR(type)) {
4795                 if (unlikely(!dt_try_as_dir(env, child)))
4796                         GOTO(stop, rc = -ENOTDIR);
4797
4798                 /* 2a. insert dot into child dir */
4799                 rec->rec_type = S_IFDIR;
4800                 rec->rec_fid = lfsck_dto2fid(child);
4801                 rc = dt_declare_insert(env, child,
4802                                        (const struct dt_rec *)rec,
4803                                        (const struct dt_key *)dot, th);
4804                 if (rc != 0)
4805                         GOTO(stop, rc);
4806
4807                 /* 3a. insert dotdot into child dir */
4808                 rec->rec_fid = lfsck_dto2fid(parent);
4809                 rc = dt_declare_insert(env, child,
4810                                        (const struct dt_rec *)rec,
4811                                        (const struct dt_key *)dotdot, th);
4812                 if (rc != 0)
4813                         GOTO(stop, rc);
4814
4815                 /* 4a. increase child nlink */
4816                 rc = dt_declare_ref_add(env, child, th);
4817                 if (rc != 0)
4818                         GOTO(stop, rc);
4819
4820                 /* 5a. generate slave LMV EA. */
4821                 if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
4822                         int idx;
4823
4824                         idx = lfsck_shard_name_to_index(env,
4825                                         lnr->lnr_name, lnr->lnr_namelen,
4826                                         type, lfsck_dto2fid(child));
4827                         if (unlikely(idx < 0))
4828                                 GOTO(stop, rc = idx);
4829
4830                         *lmv2 = lnr->lnr_lmv->ll_lmv;
4831                         lmv2->lmv_magic = LMV_MAGIC_STRIPE;
4832                         lmv2->lmv_master_mdt_index = idx;
4833
4834                         lfsck_lmv_header_cpu_to_le(lmv2, lmv2);
4835                         lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
4836                         rc = dt_declare_xattr_set(env, child, &lmv_buf,
4837                                                   XATTR_NAME_LMV, 0, th);
4838                         if (rc != 0)
4839                                 GOTO(stop, rc);
4840                 }
4841         }
4842
4843         /* 6a. insert linkEA for child */
4844         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
4845                        ldata.ld_leh->leh_len);
4846         rc = dt_declare_xattr_set(env, child, &linkea_buf,
4847                                   XATTR_NAME_LINK, 0, th);
4848         if (rc != 0)
4849                 GOTO(stop, rc);
4850
4851         rc = dt_trans_start(env, dev, th);
4852         if (rc != 0)
4853                 GOTO(stop, rc = (rc == -EEXIST ? 1 : rc));
4854
4855         dt_write_lock(env, child, 0);
4856         /* 1b. create child */
4857         rc = dt_create(env, child, la, hint, dof, th);
4858         if (rc != 0)
4859                 GOTO(unlock, rc = (rc == -EEXIST ? 1 : rc));
4860
4861         if (S_ISDIR(type)) {
4862                 if (unlikely(!dt_try_as_dir(env, child)))
4863                         GOTO(unlock, rc = -ENOTDIR);
4864
4865                 /* 2b. insert dot into child dir */
4866                 rec->rec_type = S_IFDIR;
4867                 rec->rec_fid = lfsck_dto2fid(child);
4868                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
4869                                (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
4870                 if (rc != 0)
4871                         GOTO(unlock, rc);
4872
4873                 /* 3b. insert dotdot into child dir */
4874                 rec->rec_fid = lfsck_dto2fid(parent);
4875                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
4876                                (const struct dt_key *)dotdot, th,
4877                                BYPASS_CAPA, 1);
4878                 if (rc != 0)
4879                         GOTO(unlock, rc);
4880
4881                 /* 4b. increase child nlink */
4882                 rc = dt_ref_add(env, child, th);
4883                 if (rc != 0)
4884                         GOTO(unlock, rc);
4885
4886                 /* 5b. generate slave LMV EA. */
4887                 if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
4888                         rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV,
4889                                           0, th, BYPASS_CAPA);
4890                         if (rc != 0)
4891                                 GOTO(unlock, rc);
4892                 }
4893         }
4894
4895         /* 6b. insert linkEA for child. */
4896         rc = dt_xattr_set(env, child, &linkea_buf,
4897                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
4898
4899         GOTO(unlock, rc);
4900
4901 unlock:
4902         dt_write_unlock(env, child);
4903
4904 stop:
4905         dt_trans_stop(env, dev, th);
4906
4907 log:
4908         lfsck_ibits_unlock(&lh, LCK_EX);
4909         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling "
4910                "reference for: parent "DFID", child "DFID", type %u, "
4911                "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck),
4912                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
4913                type, cname->ln_name,
4914                create ? "Create the lost OST-object as required" :
4915                         "Keep the MDT-object there by default", rc);
4916
4917         if (rc <= 0) {
4918                 struct lfsck_namespace *ns = com->lc_file_ram;
4919
4920                 ns->ln_flags |= LF_INCONSISTENT;
4921         }
4922
4923         return rc;
4924 }
4925
4926 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
4927                                                 struct lfsck_component *com,
4928                                                 struct lfsck_assistant_req *lar)
4929 {
4930         struct lfsck_thread_info   *info     = lfsck_env_info(env);
4931         struct lu_attr             *la       = &info->lti_la;
4932         struct lfsck_instance      *lfsck    = com->lc_lfsck;
4933         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
4934         struct lfsck_namespace     *ns       = com->lc_file_ram;
4935         struct linkea_data          ldata    = { NULL };
4936         const struct lu_name       *cname;
4937         struct thandle             *handle   = NULL;
4938         struct lfsck_namespace_req *lnr      =
4939                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
4940         struct dt_object           *dir      = lnr->lnr_obj;
4941         struct dt_object           *obj      = NULL;
4942         const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
4943         struct dt_device           *dev      = NULL;
4944         struct lustre_handle        lh       = { 0 };
4945         bool                        repaired = false;
4946         bool                        dtlocked = false;
4947         bool                        remove;
4948         bool                        newdata;
4949         bool                        log      = false;
4950         bool                        bad_hash = false;
4951         int                         idx      = 0;
4952         int                         count    = 0;
4953         int                         rc       = 0;
4954         enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
4955         ENTRY;
4956
4957         la->la_nlink = 0;
4958         if (lnr->lnr_attr & LUDA_UPGRADE) {
4959                 ns->ln_flags |= LF_UPGRADE;
4960                 ns->ln_dirent_repaired++;
4961                 repaired = true;
4962         } else if (lnr->lnr_attr & LUDA_REPAIR) {
4963                 ns->ln_flags |= LF_INCONSISTENT;
4964                 ns->ln_dirent_repaired++;
4965                 repaired = true;
4966         }
4967
4968         if (unlikely(fid_is_zero(&lnr->lnr_fid))) {
4969                 if (strcmp(lnr->lnr_name, dotdot) != 0)
4970                         LBUG();
4971                 else
4972                         rc = lfsck_namespace_trace_update(env, com, pfid,
4973                                                 LNTF_CHECK_PARENT, true);
4974
4975                 GOTO(out, rc);
4976         }
4977
4978         if (unlikely(!fid_is_sane(&lnr->lnr_fid))) {
4979                 CDEBUG(D_LFSCK, "%s: dir scan find invalid FID "DFID
4980                        " for the name entry %.*s under "DFID"\n",
4981                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
4982                        lnr->lnr_namelen, lnr->lnr_name, PFID(pfid));
4983
4984                 if (strcmp(lnr->lnr_name, dotdot) != 0)
4985                         /* invalid FID means bad name entry, remove it. */
4986                         type = LNIT_BAD_DIRENT;
4987                 else
4988                         /* If the parent FID is invalid, we cannot remove
4989                          * the ".." entry directly. */
4990                         rc = lfsck_namespace_trace_update(env, com, pfid,
4991                                                 LNTF_CHECK_PARENT, true);
4992
4993                 GOTO(out, rc);
4994         }
4995
4996         if (unlikely(lnr->lnr_dir_cookie == MDS_DIR_END_OFF)) {
4997                 rc = lfsck_namespace_striped_dir_rescan(env, com, lnr);
4998
4999                 RETURN(rc);
5000         }
5001
5002         if (lnr->lnr_name[0] == '.' &&
5003             (lnr->lnr_namelen == 1 || fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
5004                 GOTO(out, rc = 0);
5005
5006         if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
5007                 rc = lfsck_namespace_handle_striped_master(env, com, lnr);
5008
5009                 RETURN(rc);
5010         }
5011
5012         idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
5013         if (idx < 0)
5014                 GOTO(out, rc = idx);
5015
5016         if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
5017                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
5018                         GOTO(out, rc = 0);
5019
5020                 dev = lfsck->li_next;
5021         } else {
5022                 struct lfsck_tgt_desc *ltd;
5023
5024                 /* Usually, some local filesystem consistency verification
5025                  * tools can guarantee the local namespace tree consistenct.
5026                  * So the LFSCK will only verify the remote directory. */
5027                 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
5028                         rc = lfsck_namespace_trace_update(env, com, pfid,
5029                                                 LNTF_CHECK_PARENT, true);
5030
5031                         GOTO(out, rc);
5032                 }
5033
5034                 ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
5035                 if (unlikely(ltd == NULL)) {
5036                         CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
5037                                "did not join the namespace LFSCK\n",
5038                                lfsck_lfsck2name(lfsck), idx);
5039                         lfsck_lad_set_bitmap(env, com, idx);
5040
5041                         GOTO(out, rc = -ENODEV);
5042                 }
5043
5044                 dev = ltd->ltd_tgt;
5045         }
5046
5047         obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
5048         if (IS_ERR(obj))
5049                 GOTO(out, rc = PTR_ERR(obj));
5050
5051         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
5052         if (dt_object_exists(obj) == 0) {
5053
5054 dangling:
5055                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
5056                 if (rc == 0) {
5057                         if (!lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv,
5058                                         lnr->lnr_name, lnr->lnr_namelen)) {
5059                                 type = LNIT_BAD_DIRENT;
5060
5061                                 GOTO(out, rc);
5062                         }
5063
5064                         type = LNIT_DANGLING;
5065                         rc = lfsck_namespace_repair_dangling(env, com,
5066                                                              obj, lnr);
5067                         if (rc == 0)
5068                                 repaired = true;
5069                 }
5070
5071                 GOTO(out, rc);
5072         }
5073
5074         if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
5075
5076 again:
5077                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
5078                                       MDS_INODELOCK_UPDATE |
5079                                       MDS_INODELOCK_XATTR, LCK_EX);
5080                 if (rc != 0)
5081                         GOTO(out, rc);
5082
5083                 handle = dt_trans_create(env, dev);
5084                 if (IS_ERR(handle))
5085                         GOTO(out, rc = PTR_ERR(handle));
5086
5087                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
5088                 if (rc != 0)
5089                         GOTO(stop, rc);
5090
5091                 rc = dt_trans_start(env, dev, handle);
5092                 if (rc != 0)
5093                         GOTO(stop, rc);
5094
5095                 dt_write_lock(env, obj, 0);
5096                 dtlocked = true;
5097         }
5098
5099         rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
5100         if (rc != 0)
5101                 GOTO(stop, rc);
5102
5103         rc = lfsck_links_read(env, obj, &ldata);
5104         if (unlikely(rc == -ENOENT)) {
5105                 if (handle != NULL) {
5106                         dt_write_unlock(env, obj);
5107                         dtlocked = false;
5108
5109                         dt_trans_stop(env, dev, handle);
5110                         handle = NULL;
5111
5112                         lfsck_ibits_unlock(&lh, LCK_EX);
5113                 }
5114
5115                 /* It may happen when the remote object has been removed,
5116                  * but the local MDT is not aware of that. */
5117                 goto dangling;
5118         } else if (rc == 0) {
5119                 count = ldata.ld_leh->leh_reccount;
5120                 rc = linkea_links_find(&ldata, cname, pfid);
5121                 if ((rc == 0) &&
5122                     (count == 1 || !S_ISDIR(lfsck_object_type(obj)))) {
5123                         if ((lfsck_object_type(obj) & S_IFMT) !=
5124                             lnr->lnr_type) {
5125                                 ns->ln_flags |= LF_INCONSISTENT;
5126                                 type = LNIT_BAD_TYPE;
5127                         }
5128
5129                         goto stop;
5130                 }
5131
5132                 ns->ln_flags |= LF_INCONSISTENT;
5133
5134                 /* If the name entry hash does not match the slave striped
5135                  * directory, and the name entry does not match also, then
5136                  * it is quite possible that name entry is corrupted. */
5137                 if (!lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv,
5138                                         lnr->lnr_name, lnr->lnr_namelen)) {
5139                         type = LNIT_BAD_DIRENT;
5140
5141                         GOTO(stop, rc = 0);
5142                 }
5143
5144                 /* If the file type stored in the name entry does not match
5145                  * the file type claimed by the object, and the object does
5146                  * not recognize the name entry, then it is quite possible
5147                  * that the name entry is corrupted. */
5148                 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) {
5149                         type = LNIT_BAD_DIRENT;
5150
5151                         GOTO(stop, rc = 0);
5152                 }
5153
5154                 /* For sub-dir object, we cannot make sure whether the sub-dir
5155                  * back references the parent via ".." name entry correctly or
5156                  * not in the LFSCK first-stage scanning. It may be that the
5157                  * (remote) sub-dir ".." name entry has no parent FID after
5158                  * file-level backup/restore and its linkEA may be wrong.
5159                  * So under such case, we should replace the linkEA according
5160                  * to current name entry. But this needs to be done during the
5161                  * LFSCK second-stage scanning. The LFSCK will record the name
5162                  * entry for further possible using. */
5163                 remove = false;
5164                 newdata = false;
5165                 goto nodata;
5166         } else if (unlikely(rc == -EINVAL)) {
5167                 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type)
5168                         type = LNIT_BAD_TYPE;
5169
5170                 count = 1;
5171                 ns->ln_flags |= LF_INCONSISTENT;
5172                 /* The magic crashed, we are not sure whether there are more
5173                  * corrupt data in the linkea, so remove all linkea entries. */
5174                 remove = true;
5175                 newdata = true;
5176                 goto nodata;
5177         } else if (rc == -ENODATA) {
5178                 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type)
5179                         type = LNIT_BAD_TYPE;
5180
5181                 count = 1;
5182                 ns->ln_flags |= LF_UPGRADE;
5183                 remove = false;
5184                 newdata = true;
5185
5186 nodata:
5187                 if (bk->lb_param & LPF_DRYRUN) {
5188                         ns->ln_linkea_repaired++;
5189                         repaired = true;
5190                         log = true;
5191                         goto stop;
5192                 }
5193
5194                 if (!lustre_handle_is_used(&lh))
5195                         goto again;
5196
5197                 if (remove) {
5198                         LASSERT(newdata);
5199
5200                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
5201                                           BYPASS_CAPA);
5202                         if (rc != 0)
5203                                 GOTO(stop, rc);
5204                 }
5205
5206                 if (newdata) {
5207                         rc = linkea_data_new(&ldata,
5208                                         &lfsck_env_info(env)->lti_linkea_buf);
5209                         if (rc != 0)
5210                                 GOTO(stop, rc);
5211                 }
5212
5213                 rc = linkea_add_buf(&ldata, cname, pfid);
5214                 if (rc != 0)
5215                         GOTO(stop, rc);
5216
5217                 rc = lfsck_links_write(env, obj, &ldata, handle);
5218                 if (unlikely(rc == -ENOSPC) &&
5219                     S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) {
5220                         if (handle != NULL) {
5221                                 LASSERT(dt_write_locked(env, obj));
5222
5223                                 dt_write_unlock(env, obj);
5224                                 dtlocked = false;
5225
5226                                 dt_trans_stop(env, dev, handle);
5227                                 handle = NULL;
5228
5229                                 lfsck_ibits_unlock(&lh, LCK_EX);
5230                         }
5231
5232                         rc = lfsck_namespace_trace_update(env, com,
5233                                         &lnr->lnr_fid, LNTF_SKIP_NLINK, true);
5234                         if (rc != 0)
5235                                 /* If we cannot record this object in the
5236                                  * LFSCK tracing, we have to mark the LFSCK
5237                                  * as LF_INCOMPLETE, then the LFSCK will
5238                                  * skip nlink attribute verification for
5239                                  * all objects. */
5240                                 ns->ln_flags |= LF_INCOMPLETE;
5241
5242                         GOTO(out, rc = 0);
5243                 }
5244
5245                 if (rc != 0)
5246                         GOTO(stop, rc);
5247
5248                 count = ldata.ld_leh->leh_reccount;
5249                 if (!S_ISDIR(lfsck_object_type(obj)) ||
5250                     !dt_object_remote(obj)) {
5251                         ns->ln_linkea_repaired++;
5252                         repaired = true;
5253                         log = true;
5254                 }
5255         } else {
5256                 GOTO(stop, rc);
5257         }
5258
5259 stop:
5260         if (dtlocked)
5261                 dt_write_unlock(env, obj);
5262
5263         if (handle != NULL && !IS_ERR(handle))
5264                 dt_trans_stop(env, dev, handle);
5265
5266 out:
5267         lfsck_ibits_unlock(&lh, LCK_EX);
5268
5269         if (!name_is_dot_or_dotdot(lnr->lnr_name, lnr->lnr_namelen) &&
5270             !lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv,
5271                                              lnr->lnr_name, lnr->lnr_namelen) &&
5272             type != LNIT_BAD_DIRENT) {
5273                 ns->ln_flags |= LF_INCONSISTENT;
5274
5275                 log = false;
5276                 rc = lfsck_namespace_repair_bad_name_hash(env, com, dir,
5277                                                 lnr->lnr_lmv, lnr->lnr_name);
5278                 if (rc >= 0)
5279                         bad_hash = true;
5280         }
5281
5282         if (rc >= 0) {
5283                 switch (type) {
5284                 case LNIT_BAD_TYPE:
5285                         log = false;
5286                         rc = lfsck_namespace_repair_dirent(env, com, dir,
5287                                         obj, lnr->lnr_name, lnr->lnr_name,
5288                                         lnr->lnr_type, true, false);
5289                         if (rc > 0)
5290                                 repaired = true;
5291                         break;
5292                 case LNIT_BAD_DIRENT:
5293                         log = false;
5294                         /* XXX: This is a bad dirent, we do not know whether
5295                          *      the original name entry reference a regular
5296                          *      file or a directory, then keep the parent's
5297                          *      nlink count unchanged here. */
5298                         rc = lfsck_namespace_repair_dirent(env, com, dir,
5299                                         obj, lnr->lnr_name, lnr->lnr_name,
5300                                         lnr->lnr_type, false, false);
5301                         if (rc > 0)
5302                                 repaired = true;
5303                         break;
5304                 default:
5305                         break;
5306                 }
5307
5308                 if (count == 1 && S_ISREG(lfsck_object_type(obj)))
5309                         dt_attr_get(env, obj, la, BYPASS_CAPA);
5310         }
5311
5312         down_write(&com->lc_sem);
5313         if (rc < 0) {
5314                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
5315                        "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
5316                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
5317                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
5318                        lnr->lnr_namelen, lnr->lnr_name, rc);
5319
5320                 lfsck_namespace_record_failure(env, lfsck, ns);
5321                 if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG ||
5322                      rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
5323                      rc == -EHOSTUNREACH || rc == -EINPROGRESS) &&
5324                     dev != NULL && dev != lfsck->li_next)
5325                         lfsck_lad_set_bitmap(env, com, idx);
5326
5327                 if (!(bk->lb_param & LPF_FAILOUT))
5328                         rc = 0;
5329         } else {
5330                 if (log)
5331                         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
5332                                "repaired the entry: "DFID", parent "DFID
5333                                ", name %.*s\n", lfsck_lfsck2name(lfsck),
5334                                PFID(&lnr->lnr_fid),
5335                                PFID(lfsck_dto2fid(lnr->lnr_obj)),
5336                                lnr->lnr_namelen, lnr->lnr_name);
5337
5338                 if (repaired) {
5339                         ns->ln_items_repaired++;
5340
5341                         switch (type) {
5342                         case LNIT_DANGLING:
5343                                 ns->ln_dangling_repaired++;
5344                                 break;
5345                         case LNIT_BAD_TYPE:
5346                                 ns->ln_bad_type_repaired++;
5347                                 break;
5348                         case LNIT_BAD_DIRENT:
5349                                 ns->ln_dirent_repaired++;
5350                                 break;
5351                         default:
5352                                 break;
5353                         }
5354
5355                         if (bk->lb_param & LPF_DRYRUN &&
5356                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
5357                                 lfsck_pos_fill(env, lfsck,
5358                                                &ns->ln_pos_first_inconsistent,
5359                                                false);
5360                 }
5361
5362                 if (bad_hash) {
5363                         ns->ln_name_hash_repaired++;
5364
5365                         /* Not count repeatedly. */
5366                         if (!repaired)
5367                                 ns->ln_items_repaired++;
5368
5369                         if (bk->lb_param & LPF_DRYRUN &&
5370                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
5371                                 lfsck_pos_fill(env, lfsck,
5372                                                &ns->ln_pos_first_inconsistent,
5373                                                false);
5374                 }
5375
5376                 rc = 0;
5377         }
5378
5379         if (count > 1 || la->la_nlink > 1)
5380                 ns->ln_mul_linked_checked++;
5381
5382         up_write(&com->lc_sem);
5383
5384         if (obj != NULL && !IS_ERR(obj))
5385                 lfsck_object_put(env, obj);
5386
5387         return rc;
5388 }
5389
5390 /**
5391  * Handle one orphan under the backend /lost+found directory
5392  *
5393  * Insert the orphan FID into the namespace LFSCK trace file for further
5394  * processing (via the subsequent namespace LFSCK second-stage scanning).
5395  * At the same time, remove the orphan name entry from backend /lost+found
5396  * directory. There is an interval between the orphan name entry removed
5397  * from the backend /lost+found directory and the orphan FID in the LFSCK
5398  * trace file handled. In such interval, the LFSCK can be reset, then
5399  * all the FIDs recorded in the namespace LFSCK trace file will be dropped.
5400  * To guarantee that the orphans can be found when LFSCK run next time
5401  * without e2fsck again, when remove the orphan name entry, the LFSCK
5402  * will set the orphan's ctime attribute as 1. Since normal applications
5403  * cannot change the object's ctime attribute as 1. Then when LFSCK run
5404  * next time, it can record the object (that ctime is 1) in the namespace
5405  * LFSCK trace file during the first-stage scanning.
5406  *
5407  * \param[in] env       pointer to the thread context
5408  * \param[in] com       pointer to the lfsck component
5409  * \param[in] parent    pointer to the object for the backend /lost+found
5410  * \param[in] ent       pointer to the name entry for the target under the
5411  *                      backend /lost+found
5412  *
5413  * \retval              positive for repaired
5414  * \retval              0 if needs to repair nothing
5415  * \retval              negative error number on failure
5416  */
5417 static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env,
5418                                               struct lfsck_component *com,
5419                                               struct dt_object *parent,
5420                                               struct lu_dirent *ent)
5421 {
5422         struct lfsck_thread_info        *info   = lfsck_env_info(env);
5423         struct lu_fid                   *key    = &info->lti_fid;
5424         struct lu_attr                  *la     = &info->lti_la;
5425         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5426         struct dt_object                *obj;
5427         struct dt_device                *dev    = lfsck->li_bottom;
5428         struct dt_object                *child  = NULL;
5429         struct thandle                  *th     = NULL;
5430         int                              idx;
5431         int                              rc     = 0;
5432         __u8                             flags  = 0;
5433         bool                             exist  = false;
5434         ENTRY;
5435
5436         child = lfsck_object_find_by_dev(env, dev, &ent->lde_fid);
5437         if (IS_ERR(child))
5438                 RETURN(PTR_ERR(child));
5439
5440         LASSERT(dt_object_exists(child));
5441         LASSERT(!dt_object_remote(child));
5442
5443         idx = lfsck_sub_trace_file_fid2idx(&ent->lde_fid);
5444         obj = com->lc_sub_trace_objs[idx].lsto_obj;
5445         fid_cpu_to_be(key, &ent->lde_fid);
5446         rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
5447                        (const struct dt_key *)key, BYPASS_CAPA);
5448         if (rc == 0) {
5449                 exist = true;
5450                 flags |= LNTF_CHECK_ORPHAN;
5451         } else if (rc == -ENOENT) {
5452                 flags = LNTF_CHECK_ORPHAN;
5453         } else {
5454                 GOTO(out, rc);
5455         }
5456
5457         th = dt_trans_create(env, dev);
5458         if (IS_ERR(th))
5459                 GOTO(out, rc = PTR_ERR(th));
5460
5461         /* a1. remove name entry from backend /lost+found */
5462         rc = dt_declare_delete(env, parent,
5463                                (const struct dt_key *)ent->lde_name, th);
5464         if (rc != 0)
5465                 GOTO(stop, rc);
5466
5467         if (S_ISDIR(lfsck_object_type(child))) {
5468                 /* a2. decrease parent's nlink */
5469                 rc = dt_declare_ref_del(env, parent, th);
5470                 if (rc != 0)
5471                         GOTO(stop, rc);
5472         }
5473
5474         if (exist) {
5475                 /* a3. remove child's FID from the LFSCK trace file. */
5476                 rc = dt_declare_delete(env, obj,
5477                                        (const struct dt_key *)key, th);
5478                 if (rc != 0)
5479                         GOTO(stop, rc);
5480         } else {
5481                 /* a4. set child's ctime as 1 */
5482                 memset(la, 0, sizeof(*la));
5483                 la->la_ctime = 1;
5484                 la->la_valid = LA_CTIME;
5485                 rc = dt_declare_attr_set(env, child, la, th);
5486                 if (rc != 0)
5487                         GOTO(stop, rc);
5488         }
5489
5490         /* a5. insert child's FID into the LFSCK trace file. */
5491         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
5492                                (const struct dt_key *)key, th);
5493         if (rc != 0)
5494                 GOTO(stop, rc);
5495
5496         rc = dt_trans_start_local(env, dev, th);
5497         if (rc != 0)
5498                 GOTO(stop, rc);
5499
5500         /* b1. remove name entry from backend /lost+found */
5501         rc = dt_delete(env, parent, (const struct dt_key *)ent->lde_name, th,
5502                        BYPASS_CAPA);
5503         if (rc != 0)
5504                 GOTO(stop, rc);
5505
5506         if (S_ISDIR(lfsck_object_type(child))) {
5507                 /* b2. decrease parent's nlink */
5508                 dt_write_lock(env, parent, 0);
5509                 rc = dt_ref_del(env, parent, th);
5510                 dt_write_unlock(env, parent);
5511                 if (rc != 0)
5512                         GOTO(stop, rc);
5513         }
5514
5515         if (exist) {
5516                 /* a3. remove child's FID from the LFSCK trace file. */
5517                 rc = dt_delete(env, obj, (const struct dt_key *)key, th,
5518                                BYPASS_CAPA);
5519                 if (rc != 0)
5520                         GOTO(stop, rc);
5521         } else {
5522                 /* b4. set child's ctime as 1 */
5523                 rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
5524                 if (rc != 0)
5525                         GOTO(stop, rc);
5526         }
5527
5528         /* b5. insert child's FID into the LFSCK trace file. */
5529         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
5530                        (const struct dt_key *)key, th, BYPASS_CAPA, 1);
5531
5532         GOTO(stop, rc = (rc == 0 ? 1 : rc));
5533
5534 stop:
5535         dt_trans_stop(env, dev, th);
5536
5537 out:
5538         lu_object_put(env, &child->do_lu);
5539
5540         return rc;
5541 }
5542
5543 /**
5544  * Handle orphans under the backend /lost+found directory
5545  *
5546  * Some backend checker, such as e2fsck for ldiskfs may find some orphans
5547  * and put them under the backend /lost+found directory that is invisible
5548  * to client. The LFSCK will scan such directory, for the original client
5549  * visible orphans, add their fids into the namespace LFSCK trace file,
5550  * then the subsenquent namespace LFSCK second-stage scanning can handle
5551  * them as other objects to be double scanned: either move back to normal
5552  * namespace, or to the global visible orphan directory:
5553  * /ROOT/.lustre/lost+found/MDTxxxx/
5554  *
5555  * \param[in] env       pointer to the thread context
5556  * \param[in] com       pointer to the lfsck component
5557  */
5558 static void lfsck_namespace_scan_local_lpf(const struct lu_env *env,
5559                                            struct lfsck_component *com)
5560 {
5561         struct lfsck_thread_info        *info   = lfsck_env_info(env);
5562         struct lu_dirent                *ent    =
5563                                         (struct lu_dirent *)info->lti_key;
5564         struct lu_seq_range             *range  = &info->lti_range;
5565         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5566         struct ptlrpc_thread            *thread = &lfsck->li_thread;
5567         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
5568         struct dt_device                *dev    = lfsck->li_bottom;
5569         struct lfsck_namespace          *ns     = com->lc_file_ram;
5570         struct dt_object                *parent;
5571         const struct dt_it_ops          *iops;
5572         struct dt_it                    *di;
5573         struct seq_server_site          *ss     =
5574                                         lu_site2seq(dev->dd_lu_dev.ld_site);
5575         __u64                            cookie;
5576         int                              rc     = 0;
5577         __u16                            type;
5578         ENTRY;
5579
5580         parent = lfsck_object_find_by_dev(env, dev, &LU_BACKEND_LPF_FID);
5581         if (IS_ERR(parent)) {
5582                 CERROR("%s: fail to find backend /lost+found: rc = %ld\n",
5583                        lfsck_lfsck2name(lfsck), PTR_ERR(parent));
5584                 RETURN_EXIT;
5585         }
5586
5587         /* It is normal that the /lost+found does not exist for ZFS backend. */
5588         if (!dt_object_exists(parent))
5589                 GOTO(out, rc = 0);
5590
5591         if (unlikely(!dt_try_as_dir(env, parent)))
5592                 GOTO(out, rc = -ENOTDIR);
5593
5594         CDEBUG(D_LFSCK, "%s: start to scan backend /lost+found\n",
5595                lfsck_lfsck2name(lfsck));
5596
5597         com->lc_new_scanned = 0;
5598         iops = &parent->do_index_ops->dio_it;
5599         di = iops->init(env, parent, LUDA_64BITHASH | LUDA_TYPE, BYPASS_CAPA);
5600         if (IS_ERR(di))
5601                 GOTO(out, rc = PTR_ERR(di));
5602
5603         rc = iops->load(env, di, 0);
5604         if (rc == 0)
5605                 rc = iops->next(env, di);
5606         else if (rc > 0)
5607                 rc = 0;
5608
5609         while (rc == 0) {
5610                 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
5611                     unlikely(!thread_is_running(thread)))
5612                         break;
5613
5614                 rc = iops->rec(env, di, (struct dt_rec *)ent,
5615                                LUDA_64BITHASH | LUDA_TYPE);
5616                 if (rc == 0)
5617                         rc = lfsck_unpack_ent(ent, &cookie, &type);
5618
5619                 if (unlikely(rc != 0)) {
5620                         CDEBUG(D_LFSCK, "%s: fail to iterate backend "
5621                                "/lost+found: rc = %d\n",
5622                                lfsck_lfsck2name(lfsck), rc);
5623
5624                         goto skip;
5625                 }
5626
5627                 /* skip dot and dotdot entries */
5628                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
5629                         goto next;
5630
5631                 if (!fid_seq_in_fldb(fid_seq(&ent->lde_fid)))
5632                         goto skip;
5633
5634                 if (fid_is_norm(&ent->lde_fid)) {
5635                         fld_range_set_mdt(range);
5636                         rc = fld_local_lookup(env, ss->ss_server_fld,
5637                                               fid_seq(&ent->lde_fid), range);
5638                         if (rc != 0)
5639                                 goto skip;
5640                 } else if (lfsck_dev_idx(dev) != 0) {
5641                         /* If the returned FID is IGIF, then there are three
5642                          * possible cases:
5643                          *
5644                          * 1) The object is upgraded from old Lustre-1.8 with
5645                          *    IGIF assigned to such object.
5646                          * 2) The object is a backend local object and is
5647                          *    invisible to client.
5648                          * 3) The object lost its LMV EA, and since there is
5649                          *    no FID-in-dirent for the orphan in the backend
5650                          *    /lost+found directory, then the low layer will
5651                          *    return IGIF for such object.
5652                          *
5653                          * For MDTx (x != 0), it is either case 2) or case 3),
5654                          * but from the LFSCK view, they are indistinguishable.
5655                          * To be safe, the LFSCK will keep it there and report
5656                          * some message, then the adminstrator can handle that
5657                          * furtherly.
5658                          *
5659                          * For MDT0, it is more possible the case 1). The LFSCK
5660                          * will handle the orphan as an upgraded object. */
5661                         CDEBUG(D_LFSCK, "%s: the orphan %.*s with IGIF "DFID
5662                                "in the backend /lost+found on the MDT %04x, "
5663                                "to be safe, skip it.\n",
5664                                lfsck_lfsck2name(lfsck), ent->lde_namelen,
5665                                ent->lde_name, PFID(&ent->lde_fid),
5666                                lfsck_dev_idx(dev));
5667                         goto skip;
5668                 }
5669
5670                 rc = lfsck_namespace_scan_local_lpf_one(env, com, parent, ent);
5671
5672 skip:
5673                 down_write(&com->lc_sem);
5674                 com->lc_new_scanned++;
5675                 ns->ln_local_lpf_scanned++;
5676                 if (rc > 0)
5677                         ns->ln_local_lpf_moved++;
5678                 else if (rc == 0)
5679                         ns->ln_local_lpf_skipped++;
5680                 else
5681                         ns->ln_local_lpf_failed++;
5682                 up_write(&com->lc_sem);
5683
5684                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
5685                         break;
5686
5687 next:
5688                 lfsck_control_speed_by_self(com);
5689                 if (unlikely(!thread_is_running(thread))) {
5690                         rc = 0;
5691                         break;
5692                 }
5693
5694                 rc = iops->next(env, di);
5695         }
5696
5697         iops->put(env, di);
5698         iops->fini(env, di);
5699
5700         EXIT;
5701
5702 out:
5703         CDEBUG(D_LFSCK, "%s: stop to scan backend /lost+found: rc = %d\n",
5704                lfsck_lfsck2name(lfsck), rc);
5705
5706         lu_object_put(env, &parent->do_lu);
5707 }
5708
5709 /**
5710  * Rescan the striped directory after the master LMV EA reset.
5711  *
5712  * Sometimes, the master LMV EA of the striped directory maybe lost, so when
5713  * the namespace LFSCK engine scan the striped directory for the first time,
5714  * it will be regarded as a normal directory. As the LFSCK processing, some
5715  * other LFSCK instance on other MDT will find the shard of this striped dir,
5716  * and find that the master MDT-object of the striped directory lost its LMV
5717  * EA, then such remote LFSCK instance will regenerate the master LMV EA and
5718  * notify the LFSCK instance on this MDT to rescan the striped directory.
5719  *
5720  * \param[in] env       pointer to the thread context
5721  * \param[in] com       pointer to the lfsck component
5722  * \param[in] llu       the lfsck_lmv_unit that contains the striped directory
5723  *                      to be rescanned.
5724  *
5725  * \retval              positive number for success
5726  * \retval              0 for LFSCK stopped/paused
5727  * \retval              negative error number on failure
5728  */
5729 static int lfsck_namespace_rescan_striped_dir(const struct lu_env *env,
5730                                               struct lfsck_component *com,
5731                                               struct lfsck_lmv_unit *llu)
5732 {
5733         struct lfsck_thread_info        *info   = lfsck_env_info(env);
5734         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5735         struct lfsck_assistant_data     *lad    = com->lc_data;
5736         struct dt_object                *dir;
5737         const struct dt_it_ops          *iops;
5738         struct dt_it                    *di;
5739         struct lu_dirent                *ent    =
5740                         (struct lu_dirent *)info->lti_key;
5741         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
5742         struct ptlrpc_thread            *thread = &lfsck->li_thread;
5743         struct lfsck_namespace_req      *lnr;
5744         struct lfsck_assistant_req      *lar;
5745         int                              rc;
5746         __u16                            type;
5747         ENTRY;
5748
5749         LASSERT(list_empty(&lad->lad_req_list));
5750
5751         lfsck->li_lmv = &llu->llu_lmv;
5752         lfsck->li_obj_dir = lfsck_object_get(llu->llu_obj);
5753         rc = lfsck_open_dir(env, lfsck, 0);
5754         if (rc != 0)
5755                 RETURN(rc);
5756
5757         dir = lfsck->li_obj_dir;
5758         di = lfsck->li_di_dir;
5759         iops = &dir->do_index_ops->dio_it;
5760         do {
5761                 rc = iops->rec(env, di, (struct dt_rec *)ent,
5762                                lfsck->li_args_dir);
5763                 if (rc == 0)
5764                         rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir,
5765                                               &type);
5766
5767                 if (rc != 0) {
5768                         if (bk->lb_param & LPF_FAILOUT)
5769                                 GOTO(out, rc);
5770
5771                         goto next;
5772                 }
5773
5774                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
5775                         goto next;
5776
5777                 lnr = lfsck_namespace_assistant_req_init(lfsck, ent, type);
5778                 if (IS_ERR(lnr)) {
5779                         if (bk->lb_param & LPF_FAILOUT)
5780                                 GOTO(out, rc = PTR_ERR(lnr));
5781
5782                         goto next;
5783                 }
5784
5785                 lar = &lnr->lnr_lar;
5786                 rc = lfsck_namespace_assistant_handler_p1(env, com, lar);
5787                 lfsck_namespace_assistant_req_fini(env, lar);
5788                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
5789                         GOTO(out, rc);
5790
5791                 if (unlikely(!thread_is_running(thread)))
5792                         GOTO(out, rc = 0);
5793
5794 next:
5795                 rc = iops->next(env, di);
5796         } while (rc == 0);
5797
5798 out:
5799         lfsck_close_dir(env, lfsck, rc);
5800         if (rc <= 0)
5801                 RETURN(rc);
5802
5803         /* The close_dir() may insert a dummy lnr in the lad->lad_req_list. */
5804         if (list_empty(&lad->lad_req_list))
5805                 RETURN(1);
5806
5807         spin_lock(&lad->lad_lock);
5808         lar = list_entry(lad->lad_req_list.next, struct lfsck_assistant_req,
5809                           lar_list);
5810         list_del_init(&lar->lar_list);
5811         spin_unlock(&lad->lad_lock);
5812
5813         rc = lfsck_namespace_assistant_handler_p1(env, com, lar);
5814         lfsck_namespace_assistant_req_fini(env, lar);
5815
5816         RETURN(rc == 0 ? 1 : rc);
5817 }
5818
5819 static int
5820 lfsck_namespace_double_scan_one_trace_file(const struct lu_env *env,
5821                                            struct lfsck_component *com,
5822                                            struct dt_object *obj, bool first)
5823 {
5824         struct lfsck_instance   *lfsck  = com->lc_lfsck;
5825         struct ptlrpc_thread    *thread = &lfsck->li_thread;
5826         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
5827         struct lfsck_namespace  *ns     = com->lc_file_ram;
5828         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
5829         struct dt_object        *target;
5830         struct dt_it            *di;
5831         struct dt_key           *key;
5832         struct lu_fid            fid;
5833         int                      rc;
5834         __u8                     flags  = 0;
5835         ENTRY;
5836
5837         di = iops->init(env, obj, 0, BYPASS_CAPA);
5838         if (IS_ERR(di))
5839                 RETURN(PTR_ERR(di));
5840
5841         if (first)
5842                 fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
5843         else
5844                 fid_zero(&fid);
5845         rc = iops->get(env, di, (const struct dt_key *)&fid);
5846         if (rc < 0)
5847                 GOTO(fini, rc);
5848
5849         if (first) {
5850                 /* The start one either has been processed or does not exist,
5851                  * skip it. */
5852                 rc = iops->next(env, di);
5853                 if (rc != 0)
5854                         GOTO(put, rc);
5855         }
5856
5857         do {
5858                 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
5859                     unlikely(!thread_is_running(thread)))
5860                         GOTO(put, rc = 0);
5861
5862                 key = iops->key(env, di);
5863                 if (IS_ERR(key)) {
5864                         rc = PTR_ERR(key);
5865                         if (rc == -ENOENT)
5866                                 GOTO(put, rc = 1);
5867
5868                         goto checkpoint;
5869                 }
5870
5871                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
5872                 if (!fid_is_sane(&fid)) {
5873                         rc = 0;
5874                         goto checkpoint;
5875                 }
5876
5877                 target = lfsck_object_find_by_dev(env, lfsck->li_bottom, &fid);
5878                 if (IS_ERR(target)) {
5879                         rc = PTR_ERR(target);
5880                         goto checkpoint;
5881                 }
5882
5883                 if (dt_object_exists(target)) {
5884                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
5885                         if (rc == 0) {
5886                                 rc = lfsck_namespace_double_scan_one(env, com,
5887                                                                 target, flags);
5888                                 if (rc == -ENOENT)
5889                                         rc = 0;
5890                         }
5891                 }
5892
5893                 lfsck_object_put(env, target);
5894
5895 checkpoint:
5896                 down_write(&com->lc_sem);
5897                 com->lc_new_checked++;
5898                 com->lc_new_scanned++;
5899                 if (rc >= 0 && fid_is_sane(&fid))
5900                         ns->ln_fid_latest_scanned_phase2 = fid;
5901                 if (rc > 0)
5902                         ns->ln_objs_repaired_phase2++;
5903                 else if (rc < 0)
5904                         ns->ln_objs_failed_phase2++;
5905                 up_write(&com->lc_sem);
5906
5907                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
5908                         GOTO(put, rc);
5909
5910                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
5911                                               cfs_time_current())) &&
5912                     com->lc_new_checked != 0) {
5913                         down_write(&com->lc_sem);
5914                         ns->ln_run_time_phase2 +=
5915                                 cfs_duration_sec(cfs_time_current() +
5916                                 HALF_SEC - com->lc_time_last_checkpoint);
5917                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
5918                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
5919                         com->lc_new_checked = 0;
5920                         rc = lfsck_namespace_store(env, com, false);
5921                         up_write(&com->lc_sem);
5922                         if (rc != 0)
5923                                 GOTO(put, rc);
5924
5925                         com->lc_time_last_checkpoint = cfs_time_current();
5926                         com->lc_time_next_checkpoint =
5927                                 com->lc_time_last_checkpoint +
5928                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
5929                 }
5930
5931                 lfsck_control_speed_by_self(com);
5932                 if (unlikely(!thread_is_running(thread)))
5933                         GOTO(put, rc = 0);
5934
5935                 rc = iops->next(env, di);
5936         } while (rc == 0);
5937
5938         GOTO(put, rc);
5939
5940 put:
5941         iops->put(env, di);
5942
5943 fini:
5944         iops->fini(env, di);
5945
5946         return rc;
5947 }
5948
5949 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
5950                                                 struct lfsck_component *com)
5951 {
5952         struct lfsck_instance   *lfsck  = com->lc_lfsck;
5953         struct lfsck_namespace  *ns     = com->lc_file_ram;
5954         int                      rc;
5955         int                      i;
5956         ENTRY;
5957
5958         while (!list_empty(&lfsck->li_list_lmv)) {
5959                 struct lfsck_lmv_unit *llu;
5960
5961                 spin_lock(&lfsck->li_lock);
5962                 llu = list_entry(lfsck->li_list_lmv.next,
5963                                  struct lfsck_lmv_unit, llu_link);
5964                 list_del_init(&llu->llu_link);
5965                 spin_unlock(&lfsck->li_lock);
5966
5967                 rc = lfsck_namespace_rescan_striped_dir(env, com, llu);
5968                 if (rc <= 0)
5969                         RETURN(rc);
5970         }
5971
5972         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
5973                lfsck_lfsck2name(lfsck));
5974
5975         lfsck_namespace_scan_local_lpf(env, com);
5976
5977         com->lc_new_checked = 0;
5978         com->lc_new_scanned = 0;
5979         com->lc_time_last_checkpoint = cfs_time_current();
5980         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
5981                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
5982
5983         i = lfsck_sub_trace_file_fid2idx(&ns->ln_fid_latest_scanned_phase2);
5984         rc = lfsck_namespace_double_scan_one_trace_file(env, com,
5985                                 com->lc_sub_trace_objs[i].lsto_obj, true);
5986         while (rc > 0 && ++i < LFSCK_STF_COUNT)
5987                 rc = lfsck_namespace_double_scan_one_trace_file(env, com,
5988                                 com->lc_sub_trace_objs[i].lsto_obj, false);
5989
5990         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop at the No. %d "
5991                "trace file: rc = %d\n", lfsck_lfsck2name(lfsck), i, rc);
5992
5993         RETURN(rc);
5994 }
5995
5996 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
5997                                                struct lfsck_component *com,
5998                                                struct lfsck_position *pos)
5999 {
6000         struct lfsck_assistant_data     *lad = com->lc_data;
6001         struct lfsck_namespace_req      *lnr;
6002
6003         if (list_empty(&lad->lad_req_list))
6004                 return;
6005
6006         lnr = list_entry(lad->lad_req_list.next,
6007                          struct lfsck_namespace_req,
6008                          lnr_lar.lar_list);
6009         pos->lp_oit_cookie = lnr->lnr_oit_cookie;
6010         pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
6011         pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
6012 }
6013
6014 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
6015                                               struct lfsck_component *com,
6016                                               int rc)
6017 {
6018         struct lfsck_instance   *lfsck  = com->lc_lfsck;
6019         struct lfsck_namespace  *ns     = com->lc_file_ram;
6020
6021         down_write(&com->lc_sem);
6022         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
6023                                 HALF_SEC - lfsck->li_time_last_checkpoint);
6024         ns->ln_time_last_checkpoint = cfs_time_current_sec();
6025         ns->ln_objs_checked_phase2 += com->lc_new_checked;
6026         com->lc_new_checked = 0;
6027
6028         if (rc > 0) {
6029                 if (ns->ln_flags & LF_INCOMPLETE)
6030                         ns->ln_status = LS_PARTIAL;
6031                 else
6032                         ns->ln_status = LS_COMPLETED;
6033                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
6034                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
6035                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
6036                 ns->ln_success_count++;
6037         } else if (rc == 0) {
6038                 if (lfsck->li_status != 0)
6039                         ns->ln_status = lfsck->li_status;
6040                 else
6041                         ns->ln_status = LS_STOPPED;
6042         } else {
6043                 ns->ln_status = LS_FAILED;
6044         }
6045
6046         rc = lfsck_namespace_store(env, com, false);
6047         up_write(&com->lc_sem);
6048
6049         return rc;
6050 }
6051
6052 static int
6053 lfsck_namespace_assistant_sync_failures_interpret(const struct lu_env *env,
6054                                                   struct ptlrpc_request *req,
6055                                                   void *args, int rc)
6056 {
6057         if (rc == 0) {
6058                 struct lfsck_async_interpret_args *laia = args;
6059                 struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
6060
6061                 ltd->ltd_synced_failures = 1;
6062         }
6063
6064         return 0;
6065 }
6066
6067 /**
6068  * Notify remote LFSCK instances about former failures.
6069  *
6070  * The local LFSCK instance has recorded which MDTs have ever failed to respond
6071  * some LFSCK verification requests (maybe because of network issues or the MDT
6072  * itself trouble). During the respond gap the MDT may missed some name entries
6073  * verification, then the MDT cannot know whether related MDT-objects have been
6074  * referenced by related name entries or not, then in the second-stage scanning,
6075  * these MDT-objects will be regarded as orphan, if the MDT-object contains bad
6076  * linkEA for back reference, then it will misguide the LFSCK to generate wrong
6077  * name entry for repairing the orphan.
6078  *
6079  * To avoid above trouble, when layout LFSCK finishes the first-stage scanning,
6080  * it will scan the bitmap for the ever failed MDTs, and notify them that they
6081  * have ever missed some name entries verification and should skip the handling
6082  * for orphan MDT-objects.
6083  *
6084  * \param[in] env       pointer to the thread context
6085  * \param[in] com       pointer to the lfsck component
6086  * \param[in] lr        pointer to the lfsck request
6087  */
6088 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
6089                                                     struct lfsck_component *com,
6090                                                     struct lfsck_request *lr)
6091 {
6092         struct lfsck_async_interpret_args *laia  =
6093                                 &lfsck_env_info(env)->lti_laia2;
6094         struct lfsck_assistant_data       *lad   = com->lc_data;
6095         struct lfsck_namespace            *ns    = com->lc_file_ram;
6096         struct lfsck_instance             *lfsck = com->lc_lfsck;
6097         struct lfsck_tgt_descs            *ltds  = &lfsck->li_mdt_descs;
6098         struct lfsck_tgt_desc             *ltd;
6099         struct ptlrpc_request_set         *set;
6100         __u32                              idx;
6101         int                                rc    = 0;
6102         ENTRY;
6103
6104         if (!lad->lad_incomplete)
6105                 RETURN_EXIT;
6106
6107         set = ptlrpc_prep_set();
6108         if (set == NULL)
6109                 GOTO(out, rc = -ENOMEM);
6110
6111         lr->lr_flags2 = ns->ln_flags | LF_INCOMPLETE;
6112         memset(laia, 0, sizeof(*laia));
6113         lad->lad_touch_gen++;
6114
6115         down_read(&ltds->ltd_rw_sem);
6116         cfs_foreach_bit(lad->lad_bitmap, idx) {
6117                 ltd = LTD_TGT(ltds, idx);
6118                 LASSERT(ltd != NULL);
6119
6120                 laia->laia_ltd = ltd;
6121                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
6122                         lfsck_namespace_assistant_sync_failures_interpret,
6123                         laia, LFSCK_NOTIFY);
6124                 if (rc != 0)
6125                         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
6126                                "to sync failure with MDT %x: rc = %d\n",
6127                                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
6128         }
6129         up_read(&ltds->ltd_rw_sem);
6130
6131         rc = ptlrpc_set_wait(set);
6132         ptlrpc_set_destroy(set);
6133
6134         GOTO(out, rc);
6135
6136 out:
6137         if (rc != 0)
6138                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
6139                        "to sync failure with MDTs, and related MDTs "
6140                        "may handle orphan improperly: rc = %d\n",
6141                        lfsck_lfsck2name(lfsck), rc);
6142
6143         EXIT;
6144 }
6145
6146 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
6147         .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
6148         .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
6149         .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
6150         .la_double_scan_result  = lfsck_namespace_double_scan_result,
6151         .la_req_fini            = lfsck_namespace_assistant_req_fini,
6152         .la_sync_failures       = lfsck_namespace_assistant_sync_failures,
6153 };
6154
6155 /**
6156  * Verify the specified linkEA entry for the given directory object.
6157  * If the object has no such linkEA entry or it has more other linkEA
6158  * entries, then re-generate the linkEA with the given information.
6159  *
6160  * \param[in] env       pointer to the thread context
6161  * \param[in] dev       pointer to the dt_device
6162  * \param[in] obj       pointer to the dt_object to be handled
6163  * \param[in] cname     the name for the child in the parent directory
6164  * \param[in] pfid      the parent directory's FID for the linkEA
6165  *
6166  * \retval              0 for success
6167  * \retval              negative error number on failure
6168  */
6169 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
6170                         struct dt_object *obj, const struct lu_name *cname,
6171                         const struct lu_fid *pfid)
6172 {
6173         struct linkea_data       ldata  = { NULL };
6174         struct lu_buf            linkea_buf;
6175         struct thandle          *th;
6176         int                      rc;
6177         int                      fl     = LU_XATTR_CREATE;
6178         bool                     dirty  = false;
6179         ENTRY;
6180
6181         LASSERT(S_ISDIR(lfsck_object_type(obj)));
6182
6183         rc = lfsck_links_read(env, obj, &ldata);
6184         if (rc == -ENODATA) {
6185                 dirty = true;
6186         } else if (rc == 0) {
6187                 fl = LU_XATTR_REPLACE;
6188                 if (ldata.ld_leh->leh_reccount != 1) {
6189                         dirty = true;
6190                 } else {
6191                         rc = linkea_links_find(&ldata, cname, pfid);
6192                         if (rc != 0)
6193                                 dirty = true;
6194                 }
6195         }
6196
6197         if (!dirty)
6198                 RETURN(rc);
6199
6200         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
6201         if (rc != 0)
6202                 RETURN(rc);
6203
6204         rc = linkea_add_buf(&ldata, cname, pfid);
6205         if (rc != 0)
6206                 RETURN(rc);
6207
6208         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
6209                        ldata.ld_leh->leh_len);
6210         th = dt_trans_create(env, dev);
6211         if (IS_ERR(th))
6212                 RETURN(PTR_ERR(th));
6213
6214         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
6215                                   XATTR_NAME_LINK, fl, th);
6216         if (rc != 0)
6217                 GOTO(stop, rc);
6218
6219         rc = dt_trans_start_local(env, dev, th);
6220         if (rc != 0)
6221                 GOTO(stop, rc);
6222
6223         dt_write_lock(env, obj, 0);
6224         rc = dt_xattr_set(env, obj, &linkea_buf,
6225                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
6226         dt_write_unlock(env, obj);
6227
6228         GOTO(stop, rc);
6229
6230 stop:
6231         dt_trans_stop(env, dev, th);
6232         return rc;
6233 }
6234
6235 /**
6236  * Get the name and parent directory's FID from the first linkEA entry.
6237  *
6238  * \param[in] env       pointer to the thread context
6239  * \param[in] obj       pointer to the object which get linkEA from
6240  * \param[out] name     pointer to the buffer to hold the name
6241  *                      in the first linkEA entry
6242  * \param[out] pfid     pointer to the buffer to hold the parent
6243  *                      directory's FID in the first linkEA entry
6244  *
6245  * \retval              0 for success
6246  * \retval              negative error number on failure
6247  */
6248 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
6249                           char *name, struct lu_fid *pfid)
6250 {
6251         struct lu_name           *cname = &lfsck_env_info(env)->lti_name;
6252         struct linkea_data        ldata = { NULL };
6253         int                       rc;
6254
6255         rc = lfsck_links_read(env, obj, &ldata);
6256         if (rc != 0)
6257                 return rc;
6258
6259         linkea_first_entry(&ldata);
6260         if (ldata.ld_lee == NULL)
6261                 return -ENODATA;
6262
6263         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
6264         /* To guarantee the 'name' is terminated with '0'. */
6265         memcpy(name, cname->ln_name, cname->ln_namelen);
6266         name[cname->ln_namelen] = 0;
6267
6268         return 0;
6269 }
6270
6271 /**
6272  * Update the object's name entry with the given FID.
6273  *
6274  * \param[in] env       pointer to the thread context
6275  * \param[in] lfsck     pointer to the lfsck instance
6276  * \param[in] parent    pointer to the parent directory that holds
6277  *                      the name entry
6278  * \param[in] name      the name for the entry to be updated
6279  * \param[in] pfid      the new PFID for the name entry
6280  * \param[in] type      the type for the name entry to be updated
6281  *
6282  * \retval              0 for success
6283  * \retval              negative error number on failure
6284  */
6285 int lfsck_update_name_entry(const struct lu_env *env,
6286                             struct lfsck_instance *lfsck,
6287                             struct dt_object *parent, const char *name,
6288                             const struct lu_fid *pfid, __u32 type)
6289 {
6290         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
6291         struct dt_device        *dev    = lfsck->li_next;
6292         struct lustre_handle     lh     = { 0 };
6293         struct thandle          *th;
6294         int                      rc;
6295         bool                     exists = true;
6296         ENTRY;
6297
6298         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
6299                               MDS_INODELOCK_UPDATE, LCK_EX);
6300         if (rc != 0)
6301                 RETURN(rc);
6302
6303         th = dt_trans_create(env, dev);
6304         if (IS_ERR(th))
6305                 GOTO(unlock, rc = PTR_ERR(th));
6306
6307         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
6308         if (rc != 0)
6309                 GOTO(stop, rc);
6310
6311         rec->rec_type = type;
6312         rec->rec_fid = pfid;
6313         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
6314                                (const struct dt_key *)name, th);
6315         if (rc != 0)
6316                 GOTO(stop, rc);
6317
6318         rc = dt_declare_ref_add(env, parent, th);
6319         if (rc != 0)
6320                 GOTO(stop, rc);
6321
6322         rc = dt_trans_start(env, dev, th);
6323         if (rc != 0)
6324                 GOTO(stop, rc);
6325
6326         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
6327                        BYPASS_CAPA);
6328         if (rc == -ENOENT) {
6329                 exists = false;
6330                 rc = 0;
6331         }
6332
6333         if (rc != 0)
6334                 GOTO(stop, rc);
6335
6336         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
6337                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
6338         if (rc == 0 && S_ISDIR(type) && !exists) {
6339                 dt_write_lock(env, parent, 0);
6340                 rc = dt_ref_add(env, parent, th);
6341                 dt_write_unlock(env, parent);
6342         }
6343
6344         GOTO(stop, rc);
6345
6346 stop:
6347         dt_trans_stop(env, dev, th);
6348
6349 unlock:
6350         lfsck_ibits_unlock(&lh, LCK_EX);
6351
6352         CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
6353                " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
6354                PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
6355
6356         return rc;
6357 }
6358
6359 int lfsck_namespace_setup(const struct lu_env *env,
6360                           struct lfsck_instance *lfsck)
6361 {
6362         struct lfsck_component  *com;
6363         struct lfsck_namespace  *ns;
6364         struct dt_object        *root = NULL;
6365         struct dt_object        *obj;
6366         int                      i;
6367         int                      rc;
6368         ENTRY;
6369
6370         LASSERT(lfsck->li_master);
6371
6372         OBD_ALLOC_PTR(com);
6373         if (com == NULL)
6374                 RETURN(-ENOMEM);
6375
6376         INIT_LIST_HEAD(&com->lc_link);
6377         INIT_LIST_HEAD(&com->lc_link_dir);
6378         init_rwsem(&com->lc_sem);
6379         atomic_set(&com->lc_ref, 1);
6380         com->lc_lfsck = lfsck;
6381         com->lc_type = LFSCK_TYPE_NAMESPACE;
6382         com->lc_ops = &lfsck_namespace_ops;
6383         com->lc_data = lfsck_assistant_data_init(
6384                         &lfsck_namespace_assistant_ops,
6385                         LFSCK_NAMESPACE);
6386         if (com->lc_data == NULL)
6387                 GOTO(out, rc = -ENOMEM);
6388
6389         com->lc_file_size = sizeof(struct lfsck_namespace);
6390         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
6391         if (com->lc_file_ram == NULL)
6392                 GOTO(out, rc = -ENOMEM);
6393
6394         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
6395         if (com->lc_file_disk == NULL)
6396                 GOTO(out, rc = -ENOMEM);
6397
6398         for (i = 0; i < LFSCK_STF_COUNT; i++)
6399                 mutex_init(&com->lc_sub_trace_objs[i].lsto_mutex);
6400
6401         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
6402         if (IS_ERR(root))
6403                 GOTO(out, rc = PTR_ERR(root));
6404
6405         if (unlikely(!dt_try_as_dir(env, root)))
6406                 GOTO(out, rc = -ENOTDIR);
6407
6408         obj = local_file_find_or_create(env, lfsck->li_los, root,
6409                                         LFSCK_NAMESPACE,
6410                                         S_IFREG | S_IRUGO | S_IWUSR);
6411         if (IS_ERR(obj))
6412                 GOTO(out, rc = PTR_ERR(obj));
6413
6414         com->lc_obj = obj;
6415         rc = lfsck_namespace_load(env, com);
6416         if (rc == -ENODATA)
6417                 rc = lfsck_namespace_init(env, com);
6418         else if (rc < 0)
6419                 rc = lfsck_namespace_reset(env, com, true);
6420         else
6421                 rc = lfsck_namespace_load_sub_trace_files(env, com, false);
6422         if (rc != 0)
6423                 GOTO(out, rc);
6424
6425         ns = com->lc_file_ram;
6426         switch (ns->ln_status) {
6427         case LS_INIT:
6428         case LS_COMPLETED:
6429         case LS_FAILED:
6430         case LS_STOPPED:
6431                 spin_lock(&lfsck->li_lock);
6432                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
6433                 spin_unlock(&lfsck->li_lock);
6434                 break;
6435         default:
6436                 CERROR("%s: unknown lfsck_namespace status %d\n",
6437                        lfsck_lfsck2name(lfsck), ns->ln_status);
6438                 /* fall through */
6439         case LS_SCANNING_PHASE1:
6440         case LS_SCANNING_PHASE2:
6441                 /* No need to store the status to disk right now.
6442                  * If the system crashed before the status stored,
6443                  * it will be loaded back when next time. */
6444                 ns->ln_status = LS_CRASHED;
6445                 /* fall through */
6446         case LS_PAUSED:
6447         case LS_CRASHED:
6448                 spin_lock(&lfsck->li_lock);
6449                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
6450                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
6451                 spin_unlock(&lfsck->li_lock);
6452                 break;
6453         }
6454
6455         GOTO(out, rc = 0);
6456
6457 out:
6458         if (root != NULL && !IS_ERR(root))
6459                 lu_object_put(env, &root->do_lu);
6460         if (rc != 0) {
6461                 lfsck_component_cleanup(env, com);
6462                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
6463                        lfsck_lfsck2name(lfsck), rc);
6464         }
6465         return rc;
6466 }