Whamcloud - gitweb
6781598e91810ad2db6e94fce689d6dd0d4e2f55
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2015, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC_V1        0xA0629D03
45 #define LFSCK_NAMESPACE_MAGIC_V2        0xA0621A0B
46
47 /* For Lustre-2.x (x <= 6), the namespace LFSCK used LFSCK_NAMESPACE_MAGIC_V1
48  * as the trace file magic. When downgrade to such old release, the old LFSCK
49  * will not recognize the new LFSCK_NAMESPACE_MAGIC_V2 in the new trace file,
50  * then it will reset the whole LFSCK, and will not cause start failure. The
51  * similar case will happen when upgrade from such old release. */
52 #define LFSCK_NAMESPACE_MAGIC           LFSCK_NAMESPACE_MAGIC_V2
53
54 enum lfsck_nameentry_check {
55         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
56         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
57         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
58 };
59
60 static struct lfsck_namespace_req *
61 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
62                                    struct lfsck_assistant_object *lso,
63                                    struct lu_dirent *ent, __u16 type)
64 {
65         struct lfsck_namespace_req *lnr;
66         int                         size;
67
68         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
69         OBD_ALLOC(lnr, size);
70         if (lnr == NULL)
71                 return ERR_PTR(-ENOMEM);
72
73         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
74         lnr->lnr_lar.lar_parent = lfsck_assistant_object_get(lso);
75         lnr->lnr_lmv = lfsck_lmv_get(lfsck->li_lmv);
76         lnr->lnr_fid = ent->lde_fid;
77         lnr->lnr_dir_cookie = ent->lde_hash;
78         lnr->lnr_attr = ent->lde_attrs;
79         lnr->lnr_size = size;
80         lnr->lnr_type = type;
81         lnr->lnr_namelen = ent->lde_namelen;
82         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
83
84         return lnr;
85 }
86
87 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
88                                                struct lfsck_assistant_req *lar)
89 {
90         struct lfsck_namespace_req *lnr =
91                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
92
93         if (lnr->lnr_lmv != NULL)
94                 lfsck_lmv_put(env, lnr->lnr_lmv);
95
96         lfsck_assistant_object_put(env, lar->lar_parent);
97         OBD_FREE(lnr, lnr->lnr_size);
98 }
99
100 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
101                                       struct lfsck_namespace *src)
102 {
103         dst->ln_magic = le32_to_cpu(src->ln_magic);
104         dst->ln_status = le32_to_cpu(src->ln_status);
105         dst->ln_flags = le32_to_cpu(src->ln_flags);
106         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
107         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
108         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
109         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
110         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
111         dst->ln_time_last_checkpoint =
112                                 le64_to_cpu(src->ln_time_last_checkpoint);
113         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
114                                  &src->ln_pos_latest_start);
115         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
116                                  &src->ln_pos_last_checkpoint);
117         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
118                                  &src->ln_pos_first_inconsistent);
119         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
120         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
121         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
122         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
123         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
124         dst->ln_objs_repaired_phase2 =
125                                 le64_to_cpu(src->ln_objs_repaired_phase2);
126         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
127         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
128         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
129                       &src->ln_fid_latest_scanned_phase2);
130         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
131         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
132         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
133         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
134         dst->ln_unknown_inconsistency =
135                                 le64_to_cpu(src->ln_unknown_inconsistency);
136         dst->ln_unmatched_pairs_repaired =
137                                 le64_to_cpu(src->ln_unmatched_pairs_repaired);
138         dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
139         dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
140         dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
141         dst->ln_lost_dirent_repaired =
142                                 le64_to_cpu(src->ln_lost_dirent_repaired);
143         dst->ln_striped_dirs_scanned =
144                                 le64_to_cpu(src->ln_striped_dirs_scanned);
145         dst->ln_striped_dirs_repaired =
146                                 le64_to_cpu(src->ln_striped_dirs_repaired);
147         dst->ln_striped_dirs_failed =
148                                 le64_to_cpu(src->ln_striped_dirs_failed);
149         dst->ln_striped_dirs_disabled =
150                                 le64_to_cpu(src->ln_striped_dirs_disabled);
151         dst->ln_striped_dirs_skipped =
152                                 le64_to_cpu(src->ln_striped_dirs_skipped);
153         dst->ln_striped_shards_scanned =
154                                 le64_to_cpu(src->ln_striped_shards_scanned);
155         dst->ln_striped_shards_repaired =
156                                 le64_to_cpu(src->ln_striped_shards_repaired);
157         dst->ln_striped_shards_failed =
158                                 le64_to_cpu(src->ln_striped_shards_failed);
159         dst->ln_striped_shards_skipped =
160                                 le64_to_cpu(src->ln_striped_shards_skipped);
161         dst->ln_name_hash_repaired = le64_to_cpu(src->ln_name_hash_repaired);
162         dst->ln_local_lpf_scanned = le64_to_cpu(src->ln_local_lpf_scanned);
163         dst->ln_local_lpf_moved = le64_to_cpu(src->ln_local_lpf_moved);
164         dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped);
165         dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed);
166         dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size);
167 }
168
169 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
170                                       struct lfsck_namespace *src)
171 {
172         dst->ln_magic = cpu_to_le32(src->ln_magic);
173         dst->ln_status = cpu_to_le32(src->ln_status);
174         dst->ln_flags = cpu_to_le32(src->ln_flags);
175         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
176         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
177         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
178         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
179         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
180         dst->ln_time_last_checkpoint =
181                                 cpu_to_le64(src->ln_time_last_checkpoint);
182         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
183                                  &src->ln_pos_latest_start);
184         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
185                                  &src->ln_pos_last_checkpoint);
186         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
187                                  &src->ln_pos_first_inconsistent);
188         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
189         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
190         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
191         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
192         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
193         dst->ln_objs_repaired_phase2 =
194                                 cpu_to_le64(src->ln_objs_repaired_phase2);
195         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
196         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
197         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
198                       &src->ln_fid_latest_scanned_phase2);
199         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
200         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
201         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
202         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
203         dst->ln_unknown_inconsistency =
204                                 cpu_to_le64(src->ln_unknown_inconsistency);
205         dst->ln_unmatched_pairs_repaired =
206                                 cpu_to_le64(src->ln_unmatched_pairs_repaired);
207         dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
208         dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
209         dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
210         dst->ln_lost_dirent_repaired =
211                                 cpu_to_le64(src->ln_lost_dirent_repaired);
212         dst->ln_striped_dirs_scanned =
213                                 cpu_to_le64(src->ln_striped_dirs_scanned);
214         dst->ln_striped_dirs_repaired =
215                                 cpu_to_le64(src->ln_striped_dirs_repaired);
216         dst->ln_striped_dirs_failed =
217                                 cpu_to_le64(src->ln_striped_dirs_failed);
218         dst->ln_striped_dirs_disabled =
219                                 cpu_to_le64(src->ln_striped_dirs_disabled);
220         dst->ln_striped_dirs_skipped =
221                                 cpu_to_le64(src->ln_striped_dirs_skipped);
222         dst->ln_striped_shards_scanned =
223                                 cpu_to_le64(src->ln_striped_shards_scanned);
224         dst->ln_striped_shards_repaired =
225                                 cpu_to_le64(src->ln_striped_shards_repaired);
226         dst->ln_striped_shards_failed =
227                                 cpu_to_le64(src->ln_striped_shards_failed);
228         dst->ln_striped_shards_skipped =
229                                 cpu_to_le64(src->ln_striped_shards_skipped);
230         dst->ln_name_hash_repaired = cpu_to_le64(src->ln_name_hash_repaired);
231         dst->ln_local_lpf_scanned = cpu_to_le64(src->ln_local_lpf_scanned);
232         dst->ln_local_lpf_moved = cpu_to_le64(src->ln_local_lpf_moved);
233         dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped);
234         dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed);
235         dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size);
236 }
237
238 static void lfsck_namespace_record_failure(const struct lu_env *env,
239                                            struct lfsck_instance *lfsck,
240                                            struct lfsck_namespace *ns)
241 {
242         struct lfsck_position pos;
243
244         ns->ln_items_failed++;
245         lfsck_pos_fill(env, lfsck, &pos, false);
246         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
247             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
248                 ns->ln_pos_first_inconsistent = pos;
249
250                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
251                        "inconsistency at the pos [%llu, "DFID", %#llx]\n",
252                        lfsck_lfsck2name(lfsck),
253                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
254                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
255                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
256         }
257 }
258
259 /**
260  * Load the MDT bitmap from the lfsck_namespace trace file.
261  *
262  * \param[in] env       pointer to the thread context
263  * \param[in] com       pointer to the lfsck component
264  *
265  * \retval              0 for success
266  * \retval              negative error number on failure or data corruption
267  */
268 static int lfsck_namespace_load_bitmap(const struct lu_env *env,
269                                        struct lfsck_component *com)
270 {
271         struct dt_object                *obj    = com->lc_obj;
272         struct lfsck_assistant_data     *lad    = com->lc_data;
273         struct lfsck_namespace          *ns     = com->lc_file_ram;
274         struct cfs_bitmap                       *bitmap = lad->lad_bitmap;
275         ssize_t                          size;
276         __u32                            nbits;
277         int                              rc;
278         ENTRY;
279
280         if (com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size >
281             ns->ln_bitmap_size)
282                 nbits = com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size;
283         else
284                 nbits = ns->ln_bitmap_size;
285
286         if (unlikely(nbits < BITS_PER_LONG))
287                 nbits = BITS_PER_LONG;
288
289         if (nbits > bitmap->size) {
290                 __u32 new_bits = bitmap->size;
291                 struct cfs_bitmap *new_bitmap;
292
293                 while (new_bits < nbits)
294                         new_bits <<= 1;
295
296                 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
297                 if (new_bitmap == NULL)
298                         RETURN(-ENOMEM);
299
300                 lad->lad_bitmap = new_bitmap;
301                 CFS_FREE_BITMAP(bitmap);
302                 bitmap = new_bitmap;
303         }
304
305         if (ns->ln_bitmap_size == 0) {
306                 lad->lad_incomplete = 0;
307                 CFS_RESET_BITMAP(bitmap);
308
309                 RETURN(0);
310         }
311
312         size = (ns->ln_bitmap_size + 7) >> 3;
313         rc = dt_xattr_get(env, obj,
314                           lfsck_buf_get(env, bitmap->data, size),
315                           XATTR_NAME_LFSCK_BITMAP);
316         if (rc != size)
317                 RETURN(rc >= 0 ? -EINVAL : rc);
318
319         if (cfs_bitmap_check_empty(bitmap))
320                 lad->lad_incomplete = 0;
321         else
322                 lad->lad_incomplete = 1;
323
324         RETURN(0);
325 }
326
327 /**
328  * Load namespace LFSCK statistics information from the trace file.
329  *
330  * \param[in] env       pointer to the thread context
331  * \param[in] com       pointer to the lfsck component
332  *
333  * \retval              0 for success
334  * \retval              negative error number on failure
335  */
336 static int lfsck_namespace_load(const struct lu_env *env,
337                                 struct lfsck_component *com)
338 {
339         int len = com->lc_file_size;
340         int rc;
341
342         rc = dt_xattr_get(env, com->lc_obj,
343                           lfsck_buf_get(env, com->lc_file_disk, len),
344                           XATTR_NAME_LFSCK_NAMESPACE);
345         if (rc == len) {
346                 struct lfsck_namespace *ns = com->lc_file_ram;
347
348                 lfsck_namespace_le_to_cpu(ns,
349                                 (struct lfsck_namespace *)com->lc_file_disk);
350                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
351                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
352                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
353                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
354                         rc = -ESTALE;
355                 } else {
356                         rc = 0;
357                 }
358         } else if (rc != -ENODATA) {
359                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
360                        "expected = %d: rc = %d\n",
361                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
362                 if (rc >= 0)
363                         rc = -ESTALE;
364         }
365
366         return rc;
367 }
368
369 static int lfsck_namespace_store(const struct lu_env *env,
370                                  struct lfsck_component *com)
371 {
372         struct dt_object                *obj    = com->lc_obj;
373         struct lfsck_instance           *lfsck  = com->lc_lfsck;
374         struct lfsck_namespace          *ns     = com->lc_file_ram;
375         struct lfsck_assistant_data     *lad    = com->lc_data;
376         struct dt_device                *dev    = lfsck_obj2dev(obj);
377         struct cfs_bitmap               *bitmap = NULL;
378         struct thandle                  *handle;
379         __u32                            nbits  = 0;
380         int                              len    = com->lc_file_size;
381         int                              rc;
382         ENTRY;
383
384         if (lad != NULL) {
385                 bitmap = lad->lad_bitmap;
386                 nbits = bitmap->size;
387
388                 LASSERT(nbits > 0);
389                 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
390         }
391
392         ns->ln_bitmap_size = nbits;
393         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
394                                   ns);
395         handle = dt_trans_create(env, dev);
396         if (IS_ERR(handle))
397                 GOTO(log, rc = PTR_ERR(handle));
398
399         rc = dt_declare_xattr_set(env, obj,
400                                   lfsck_buf_get(env, com->lc_file_disk, len),
401                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
402         if (rc != 0)
403                 GOTO(out, rc);
404
405         if (bitmap != NULL) {
406                 rc = dt_declare_xattr_set(env, obj,
407                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
408                                 XATTR_NAME_LFSCK_BITMAP, 0, handle);
409                 if (rc != 0)
410                         GOTO(out, rc);
411         }
412
413         rc = dt_trans_start_local(env, dev, handle);
414         if (rc != 0)
415                 GOTO(out, rc);
416
417         rc = dt_xattr_set(env, obj,
418                           lfsck_buf_get(env, com->lc_file_disk, len),
419                           XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
420         if (rc == 0 && bitmap != NULL)
421                 rc = dt_xattr_set(env, obj,
422                                   lfsck_buf_get(env, bitmap->data, nbits >> 3),
423                                   XATTR_NAME_LFSCK_BITMAP, 0, handle);
424
425         GOTO(out, rc);
426
427 out:
428         dt_trans_stop(env, dev, handle);
429
430 log:
431         if (rc != 0)
432                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
433                        lfsck_lfsck2name(lfsck), rc);
434         return rc;
435 }
436
437 static struct dt_object *
438 lfsck_namespace_load_one_trace_file(const struct lu_env *env,
439                                     struct lfsck_component *com,
440                                     struct dt_object *parent,
441                                     const char *name, bool reset)
442 {
443         struct lfsck_instance   *lfsck = com->lc_lfsck;
444         struct dt_object        *obj;
445         int                      rc;
446
447         if (reset) {
448                 rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
449                 if (rc != 0 && rc != -ENOENT)
450                         return ERR_PTR(rc);
451         }
452
453         obj = local_index_find_or_create(env, lfsck->li_los, parent, name,
454                                          S_IFREG | S_IRUGO | S_IWUSR,
455                                          &dt_lfsck_features);
456
457         return obj;
458 }
459
460 static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env,
461                                                 struct lfsck_component *com,
462                                                 bool reset)
463 {
464         char                            *name = lfsck_env_info(env)->lti_key;
465         struct lfsck_sub_trace_obj      *lsto;
466         struct dt_object                *obj;
467         int                              rc;
468         int                              i;
469
470         for (i = 0, lsto = &com->lc_sub_trace_objs[0];
471              i < LFSCK_STF_COUNT; i++, lsto++) {
472                 snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i);
473                 if (lsto->lsto_obj != NULL) {
474                         if (!reset)
475                                 continue;
476
477                         lfsck_object_put(env, lsto->lsto_obj);
478                         lsto->lsto_obj = NULL;
479                 }
480
481                 obj = lfsck_namespace_load_one_trace_file(env, com,
482                                 com->lc_lfsck->li_lfsck_dir, name, reset);
483                 if (IS_ERR(obj))
484                         return PTR_ERR(obj);
485
486                 lsto->lsto_obj = obj;
487                 rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
488                 if (rc != 0)
489                         return rc;
490         }
491
492         return 0;
493 }
494
495 static int lfsck_namespace_init(const struct lu_env *env,
496                                 struct lfsck_component *com)
497 {
498         struct lfsck_namespace *ns = com->lc_file_ram;
499         int rc;
500
501         memset(ns, 0, sizeof(*ns));
502         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
503         ns->ln_status = LS_INIT;
504         down_write(&com->lc_sem);
505         rc = lfsck_namespace_store(env, com);
506         up_write(&com->lc_sem);
507         if (rc == 0)
508                 rc = lfsck_namespace_load_sub_trace_files(env, com, true);
509
510         return rc;
511 }
512
513 /**
514  * Update the namespace LFSCK trace file for the given @fid
515  *
516  * \param[in] env       pointer to the thread context
517  * \param[in] com       pointer to the lfsck component
518  * \param[in] fid       the fid which flags to be updated in the lfsck
519  *                      trace file
520  * \param[in] add       true if add new flags, otherwise remove flags
521  *
522  * \retval              0 for success or nothing to be done
523  * \retval              negative error number on failure
524  */
525 int lfsck_namespace_trace_update(const struct lu_env *env,
526                                  struct lfsck_component *com,
527                                  const struct lu_fid *fid,
528                                  const __u8 flags, bool add)
529 {
530         struct lfsck_instance   *lfsck  = com->lc_lfsck;
531         struct dt_object        *obj;
532         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
533         struct dt_device        *dev;
534         struct thandle          *th     = NULL;
535         int                      idx;
536         int                      rc     = 0;
537         __u8                     old    = 0;
538         __u8                     new    = 0;
539         ENTRY;
540
541         LASSERT(flags != 0);
542
543         if (unlikely(!fid_is_sane(fid)))
544                 RETURN(0);
545
546         idx = lfsck_sub_trace_file_fid2idx(fid);
547         obj = com->lc_sub_trace_objs[idx].lsto_obj;
548         dev = lfsck_obj2dev(obj);
549         mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
550         fid_cpu_to_be(key, fid);
551         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
552                        (const struct dt_key *)key);
553         if (rc == -ENOENT) {
554                 if (!add)
555                         GOTO(unlock, rc = 0);
556
557                 old = 0;
558                 new = flags;
559         } else if (rc == 0) {
560                 if (add) {
561                         if ((old & flags) == flags)
562                                 GOTO(unlock, rc = 0);
563
564                         new = old | flags;
565                 } else {
566                         if ((old & flags) == 0)
567                                 GOTO(unlock, rc = 0);
568
569                         new = old & ~flags;
570                 }
571         } else {
572                 GOTO(log, rc);
573         }
574
575         th = dt_trans_create(env, dev);
576         if (IS_ERR(th))
577                 GOTO(log, rc = PTR_ERR(th));
578
579         if (old != 0) {
580                 rc = dt_declare_delete(env, obj,
581                                        (const struct dt_key *)key, th);
582                 if (rc != 0)
583                         GOTO(log, rc);
584         }
585
586         if (new != 0) {
587                 rc = dt_declare_insert(env, obj,
588                                        (const struct dt_rec *)&new,
589                                        (const struct dt_key *)key, th);
590                 if (rc != 0)
591                         GOTO(log, rc);
592         }
593
594         rc = dt_trans_start_local(env, dev, th);
595         if (rc != 0)
596                 GOTO(log, rc);
597
598         if (old != 0) {
599                 rc = dt_delete(env, obj, (const struct dt_key *)key, th);
600                 if (rc != 0)
601                         GOTO(log, rc);
602         }
603
604         if (new != 0) {
605                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
606                                (const struct dt_key *)key, th, 1);
607                 if (rc != 0)
608                         GOTO(log, rc);
609         }
610
611         GOTO(log, rc);
612
613 log:
614         if (th != NULL && !IS_ERR(th))
615                 dt_trans_stop(env, dev, th);
616
617         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
618                "trace file, flags %x, old %x, new %x: rc = %d\n",
619                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
620                (__u32)flags, (__u32)old, (__u32)new, rc);
621
622 unlock:
623         mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
624
625         return rc;
626 }
627
628 int lfsck_namespace_check_exist(const struct lu_env *env,
629                                 struct dt_object *dir,
630                                 struct dt_object *obj, const char *name)
631 {
632         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
633         int               rc;
634         ENTRY;
635
636         if (unlikely(lfsck_is_dead_obj(obj)))
637                 RETURN(LFSCK_NAMEENTRY_DEAD);
638
639         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
640                        (const struct dt_key *)name);
641         if (rc == -ENOENT)
642                 RETURN(LFSCK_NAMEENTRY_REMOVED);
643
644         if (rc < 0)
645                 RETURN(rc);
646
647         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
648                 RETURN(LFSCK_NAMEENTRY_RECREATED);
649
650         RETURN(0);
651 }
652
653 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
654                                             struct dt_object *obj,
655                                             struct thandle *handle)
656 {
657         int rc;
658
659         /* For destroying all invalid linkEA entries. */
660         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
661         if (rc == 0)
662                 /* For insert new linkEA entry. */
663                 rc = dt_declare_xattr_set(env, obj,
664                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
665                         XATTR_NAME_LINK, 0, handle);
666         return rc;
667 }
668
669 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
670                        struct linkea_data *ldata)
671 {
672         int rc;
673
674         if (ldata->ld_buf->lb_buf == NULL)
675                 return -ENOMEM;
676
677         if (!dt_object_exists(obj))
678                 return -ENOENT;
679
680         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK);
681         if (rc == -ERANGE) {
682                 /* Buf was too small, figure out what we need. */
683                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK);
684                 if (unlikely(rc == 0))
685                         return -ENODATA;
686
687                 if (rc < 0)
688                         return rc;
689
690                 lu_buf_realloc(ldata->ld_buf, rc);
691                 if (ldata->ld_buf->lb_buf == NULL)
692                         return -ENOMEM;
693
694                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK);
695         }
696
697         if (unlikely(rc == 0))
698                 return -ENODATA;
699
700         if (rc > 0)
701                 rc = linkea_init(ldata);
702
703         return rc;
704 }
705
706 /**
707  * Remove linkEA for the given object.
708  *
709  * The caller should take the ldlm lock before the calling.
710  *
711  * \param[in] env       pointer to the thread context
712  * \param[in] com       pointer to the lfsck component
713  * \param[in] obj       pointer to the dt_object to be handled
714  *
715  * \retval              0 for repaired cases
716  * \retval              negative error number on failure
717  */
718 static int lfsck_namespace_links_remove(const struct lu_env *env,
719                                         struct lfsck_component *com,
720                                         struct dt_object *obj)
721 {
722         struct lfsck_instance           *lfsck  = com->lc_lfsck;
723         struct dt_device                *dev    = lfsck_obj2dev(obj);
724         struct thandle                  *th     = NULL;
725         int                              rc     = 0;
726         ENTRY;
727
728         LASSERT(dt_object_remote(obj) == 0);
729
730         th = dt_trans_create(env, dev);
731         if (IS_ERR(th))
732                 GOTO(log, rc = PTR_ERR(th));
733
734         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
735         if (rc != 0)
736                 GOTO(stop, rc);
737
738         rc = dt_trans_start_local(env, dev, th);
739         if (rc != 0)
740                 GOTO(stop, rc);
741
742         dt_write_lock(env, obj, 0);
743         if (unlikely(lfsck_is_dead_obj(obj)))
744                 GOTO(unlock, rc = -ENOENT);
745
746         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
747                 GOTO(unlock, rc = 0);
748
749         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th);
750
751         GOTO(unlock, rc);
752
753 unlock:
754         dt_write_unlock(env, obj);
755
756 stop:
757         dt_trans_stop(env, dev, th);
758
759 log:
760         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
761                "for the object "DFID": rc = %d\n",
762                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
763
764         if (rc == 0) {
765                 struct lfsck_namespace *ns = com->lc_file_ram;
766
767                 ns->ln_flags |= LF_INCONSISTENT;
768         }
769
770         return rc;
771 }
772
773 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
774                              struct linkea_data *ldata, struct thandle *handle)
775 {
776         const struct lu_buf *buf = lfsck_buf_get_const(env,
777                                                        ldata->ld_buf->lb_buf,
778                                                        ldata->ld_leh->leh_len);
779
780         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle);
781 }
782
783 static int lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
784                                                struct lu_name *cname,
785                                                struct lu_fid *pfid,
786                                                char *buf, const int buflen)
787 {
788         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
789         if (unlikely(ldata->ld_reclen <= 0 ||
790                      ldata->ld_reclen + sizeof(struct link_ea_header) >
791                         ldata->ld_leh->leh_len ||
792                      cname->ln_namelen <= 0 ||
793                      cname->ln_namelen > NAME_MAX ||
794                      cname->ln_namelen >= buflen ||
795                      !fid_is_sane(pfid)))
796                 return -EINVAL;
797
798         /* To guarantee the 'name' is terminated with '0'. */
799         memcpy(buf, cname->ln_name, cname->ln_namelen);
800         buf[cname->ln_namelen] = 0;
801         cname->ln_name = buf;
802
803         return 0;
804 }
805
806 static void lfsck_linkea_del_buf(struct linkea_data *ldata,
807                                  const struct lu_name *lname)
808 {
809         LASSERT(ldata->ld_leh != NULL && ldata->ld_lee != NULL);
810
811         /* If current record is corrupted, all the subsequent
812          * records will be dropped. */
813         if (unlikely(ldata->ld_reclen <= 0 ||
814                      ldata->ld_reclen + sizeof(struct link_ea_header) >
815                         ldata->ld_leh->leh_len)) {
816                 void *ptr = ldata->ld_lee;
817
818                 ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
819                 ldata->ld_leh->leh_reccount = 0;
820                 linkea_first_entry(ldata);
821                 while (ldata->ld_lee != NULL &&
822                        (char *)ldata->ld_lee < (char *)ptr) {
823                         int reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
824                                      ldata->ld_lee->lee_reclen[1];
825
826                         ldata->ld_leh->leh_len += reclen;
827                         ldata->ld_leh->leh_reccount++;
828                         ldata->ld_lee = (struct link_ea_entry *)
829                                         ((char *)ldata->ld_lee + reclen);
830                 }
831
832                 ldata->ld_lee = NULL;
833         } else {
834                 linkea_del_buf(ldata, lname);
835         }
836 }
837
838 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
839                                                struct lu_name *cname,
840                                                struct lu_fid *pfid,
841                                                bool remove)
842 {
843         struct link_ea_entry    *oldlee;
844         int                      oldlen;
845         int                      repeated = 0;
846
847         oldlee = ldata->ld_lee;
848         oldlen = ldata->ld_reclen;
849         linkea_next_entry(ldata);
850         while (ldata->ld_lee != NULL) {
851                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
852                                    ldata->ld_lee->lee_reclen[1];
853                 if (unlikely(ldata->ld_reclen == oldlen &&
854                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
855                         repeated++;
856                         if (!remove)
857                                 break;
858
859                         lfsck_linkea_del_buf(ldata, cname);
860                 } else {
861                         linkea_next_entry(ldata);
862                 }
863         }
864         ldata->ld_lee = oldlee;
865         ldata->ld_reclen = oldlen;
866
867         return repeated;
868 }
869
870 /**
871  * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
872  *
873  * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
874  * with the given type to generate the name, the detailed rules for name
875  * have been described as following.
876  *
877  * The function also generates the linkEA corresponding to the name entry
878  * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
879  *
880  * \param[in] env       pointer to the thread context
881  * \param[in] com       pointer to the lfsck component
882  * \param[in] orphan    pointer to the orphan MDT-object
883  * \param[in] infix     additional information for the orphan name, such as
884  *                      the FID for original
885  * \param[in] type      the type for describing why the orphan MDT-object is
886  *                      created. The rules are as following:
887  *
888  *  type "D":           The MDT-object is a directory, it may knows its parent
889  *                      but because there is no valid linkEA, the LFSCK cannot
890  *                      know where to put it back to the namespace.
891  *  type "O":           The MDT-object has no linkEA, and there is no name
892  *                      entry that references the MDT-object.
893  *
894  *  type "S":           The orphan MDT-object is a shard of a striped directory
895  *
896  * \see lfsck_layout_recreate_parent() for more types.
897  *
898  * The orphan name will be like:
899  * ${FID}-${infix}-${type}-${conflict_version}
900  *
901  * \param[out] count    if some others inserted some linkEA entries by race,
902  *                      then return the linkEA entries count.
903  *
904  * \retval              positive number for repaired cases
905  * \retval              0 if needs to repair nothing
906  * \retval              negative error number on failure
907  */
908 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
909                                          struct lfsck_component *com,
910                                          struct dt_object *orphan,
911                                          const char *infix, const char *type,
912                                          int *count)
913 {
914         struct lfsck_thread_info        *info   = lfsck_env_info(env);
915         struct lu_name                  *cname  = &info->lti_name;
916         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
917         struct lu_attr                  *la     = &info->lti_la2;
918         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
919         const struct lu_fid             *pfid;
920         struct lu_fid                    tfid;
921         struct lfsck_instance           *lfsck  = com->lc_lfsck;
922         struct dt_device                *dev    = lfsck_obj2dev(orphan);
923         struct dt_object                *parent;
924         struct thandle                  *th     = NULL;
925         struct lfsck_lock_handle        *pllh   = &info->lti_llh;
926         struct lustre_handle             clh    = { 0 };
927         struct linkea_data               ldata2 = { NULL };
928         struct lu_buf                    linkea_buf;
929         int                              namelen;
930         int                              idx    = 0;
931         int                              rc     = 0;
932         bool                             exist  = false;
933         ENTRY;
934
935         cname->ln_name = NULL;
936         if (unlikely(lfsck->li_lpf_obj == NULL))
937                 GOTO(log, rc = -ENXIO);
938
939         parent = lfsck->li_lpf_obj;
940         pfid = lfsck_dto2fid(parent);
941
942 again:
943         do {
944                 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
945                                    PFID(cfid), infix, type, idx++);
946                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
947                                (const struct dt_key *)info->lti_key);
948                 if (rc != 0 && rc != -ENOENT)
949                         GOTO(log, rc);
950
951                 if (unlikely(rc == 0 && lu_fid_eq(cfid, &tfid)))
952                         exist = true;
953         } while (rc == 0 && !exist);
954
955         rc = lfsck_lock(env, lfsck, parent, info->lti_key, pllh,
956                         MDS_INODELOCK_UPDATE, LCK_PW);
957         if (rc != 0)
958                 GOTO(log, rc);
959
960         /* Re-check whether the name conflict with othrs after taken
961          * the ldlm lock. */
962         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
963                        (const struct dt_key *)info->lti_key);
964         if (rc == 0) {
965                 if (!lu_fid_eq(cfid, &tfid)) {
966                         exist = false;
967                         lfsck_unlock(pllh);
968                         goto again;
969                 }
970
971                 exist = true;
972         } else if (rc != -ENOENT) {
973                 GOTO(log, rc);
974         } else {
975                 exist = false;
976         }
977
978         cname->ln_name = info->lti_key;
979         cname->ln_namelen = namelen;
980         rc = linkea_data_new(&ldata2, &info->lti_linkea_buf2);
981         if (rc != 0)
982                 GOTO(log, rc);
983
984         rc = linkea_add_buf(&ldata2, cname, pfid);
985         if (rc != 0)
986                 GOTO(log, rc);
987
988         rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
989                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP |
990                               MDS_INODELOCK_XATTR, LCK_EX);
991         if (rc != 0)
992                 GOTO(log, rc);
993
994         lfsck_buf_init(&linkea_buf, ldata2.ld_buf->lb_buf,
995                        ldata2.ld_leh->leh_len);
996         th = dt_trans_create(env, dev);
997         if (IS_ERR(th))
998                 GOTO(log, rc = PTR_ERR(th));
999
1000         if (S_ISDIR(lfsck_object_type(orphan))) {
1001                 rc = dt_declare_delete(env, orphan,
1002                                        (const struct dt_key *)dotdot, th);
1003                 if (rc != 0)
1004                         GOTO(stop, rc);
1005
1006                 rec->rec_type = S_IFDIR;
1007                 rec->rec_fid = pfid;
1008                 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
1009                                        (const struct dt_key *)dotdot, th);
1010                 if (rc != 0)
1011                         GOTO(stop, rc);
1012         }
1013
1014         rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
1015                                   XATTR_NAME_LINK, 0, th);
1016         if (rc != 0)
1017                 GOTO(stop, rc);
1018
1019         if (!exist) {
1020                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
1021                 rec->rec_fid = cfid;
1022                 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1023                                        (const struct dt_key *)cname->ln_name,
1024                                        th);
1025                 if (rc != 0)
1026                         GOTO(stop, rc);
1027
1028                 if (S_ISDIR(rec->rec_type)) {
1029                         rc = dt_declare_ref_add(env, parent, th);
1030                         if (rc != 0)
1031                                 GOTO(stop, rc);
1032                 }
1033         }
1034
1035         memset(la, 0, sizeof(*la));
1036         la->la_ctime = cfs_time_current_sec();
1037         la->la_valid = LA_CTIME;
1038         rc = dt_declare_attr_set(env, orphan, la, th);
1039         if (rc != 0)
1040                 GOTO(stop, rc);
1041
1042         rc = dt_trans_start_local(env, dev, th);
1043         if (rc != 0)
1044                 GOTO(stop, rc);
1045
1046         dt_write_lock(env, orphan, 0);
1047         rc = lfsck_links_read2(env, orphan, &ldata2);
1048         if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
1049                    (rc == 0 && ldata2.ld_leh != NULL &&
1050                     ldata2.ld_leh->leh_reccount == 0))) {
1051                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1052                         GOTO(unlock, rc = 1);
1053
1054                 if (S_ISDIR(lfsck_object_type(orphan))) {
1055                         rc = dt_delete(env, orphan,
1056                                        (const struct dt_key *)dotdot, th);
1057                         if (rc != 0)
1058                                 GOTO(unlock, rc);
1059
1060                         rec->rec_type = S_IFDIR;
1061                         rec->rec_fid = pfid;
1062                         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
1063                                        (const struct dt_key *)dotdot, th, 1);
1064                         if (rc != 0)
1065                                 GOTO(unlock, rc);
1066                 }
1067
1068                 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
1069                                   th);
1070         } else {
1071                 if (rc == 0 && count != NULL)
1072                         *count = ldata2.ld_leh->leh_reccount;
1073
1074                 GOTO(unlock, rc);
1075         }
1076         dt_write_unlock(env, orphan);
1077
1078         if (rc == 0 && !exist) {
1079                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
1080                 rec->rec_fid = cfid;
1081                 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1082                                (const struct dt_key *)cname->ln_name, th, 1);
1083                 if (rc == 0 && S_ISDIR(rec->rec_type)) {
1084                         dt_write_lock(env, parent, 0);
1085                         rc = dt_ref_add(env, parent, th);
1086                         dt_write_unlock(env, parent);
1087                 }
1088         }
1089
1090         if (rc == 0)
1091                 rc = dt_attr_set(env, orphan, la, th);
1092
1093         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1094
1095 unlock:
1096         dt_write_unlock(env, orphan);
1097
1098 stop:
1099         dt_trans_stop(env, dev, th);
1100
1101 log:
1102         lfsck_ibits_unlock(&clh, LCK_EX);
1103         lfsck_unlock(pllh);
1104         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
1105                "object "DFID", name = %s: rc = %d\n",
1106                lfsck_lfsck2name(lfsck), PFID(cfid),
1107                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1108
1109         if (rc != 0) {
1110                 struct lfsck_namespace *ns = com->lc_file_ram;
1111
1112                 ns->ln_flags |= LF_INCONSISTENT;
1113         }
1114
1115         return rc;
1116 }
1117
1118 /**
1119  * Add the specified name entry back to namespace.
1120  *
1121  * If there is a linkEA entry that back references a name entry under
1122  * some parent directory, but such parent directory does not have the
1123  * claimed name entry. On the other hand, the linkEA entries count is
1124  * not larger than the MDT-object's hard link count. Under such case,
1125  * it is quite possible that the name entry is lost. Then the LFSCK
1126  * should add the name entry back to the namespace.
1127  *
1128  * \param[in] env       pointer to the thread context
1129  * \param[in] com       pointer to the lfsck component
1130  * \param[in] parent    pointer to the directory under which the name entry
1131  *                      will be inserted into
1132  * \param[in] child     pointer to the object referenced by the name entry
1133  *                      that to be inserted into the parent
1134  * \param[in] name      the name for the child in the parent directory
1135  *
1136  * \retval              positive number for repaired cases
1137  * \retval              0 if nothing to be repaired
1138  * \retval              negative error number on failure
1139  */
1140 static int lfsck_namespace_insert_normal(const struct lu_env *env,
1141                                          struct lfsck_component *com,
1142                                          struct dt_object *parent,
1143                                          struct dt_object *child,
1144                                          const char *name)
1145 {
1146         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1147         struct lu_attr                  *la     = &info->lti_la;
1148         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1149         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1150         /* The child and its name may be on different MDTs. */
1151         const struct lu_fid             *pfid   = lfsck_dto2fid(parent);
1152         const struct lu_fid             *cfid   = lfsck_dto2fid(child);
1153         struct dt_device                *dev    = lfsck->li_next;
1154         struct thandle                  *th     = NULL;
1155         struct lfsck_lock_handle        *llh    = &info->lti_llh;
1156         int                              rc     = 0;
1157         ENTRY;
1158
1159         /* @parent/@child may be based on lfsck->li_bottom,
1160          * but here we need the object based on the lfsck->li_next. */
1161
1162         parent = lfsck_object_locate(dev, parent);
1163         if (IS_ERR(parent))
1164                 GOTO(log, rc = PTR_ERR(parent));
1165
1166         if (unlikely(!dt_try_as_dir(env, parent)))
1167                 GOTO(log, rc = -ENOTDIR);
1168
1169         child = lfsck_object_locate(dev, child);
1170         if (IS_ERR(child))
1171                 GOTO(log, rc = PTR_ERR(child));
1172
1173         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1174                 GOTO(log, rc = 1);
1175
1176         rc = lfsck_lock(env, lfsck, parent, name, llh,
1177                         MDS_INODELOCK_UPDATE, LCK_PW);
1178         if (rc != 0)
1179                 GOTO(log, rc);
1180
1181         th = dt_trans_create(env, dev);
1182         if (IS_ERR(th))
1183                 GOTO(unlock, rc = PTR_ERR(th));
1184
1185         rec->rec_type = lfsck_object_type(child) & S_IFMT;
1186         rec->rec_fid = cfid;
1187         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1188                                (const struct dt_key *)name, th);
1189         if (rc != 0)
1190                 GOTO(stop, rc);
1191
1192         if (S_ISDIR(rec->rec_type)) {
1193                 rc = dt_declare_ref_add(env, parent, th);
1194                 if (rc != 0)
1195                         GOTO(stop, rc);
1196         }
1197
1198         memset(la, 0, sizeof(*la));
1199         la->la_ctime = cfs_time_current_sec();
1200         la->la_valid = LA_CTIME;
1201         rc = dt_declare_attr_set(env, parent, la, th);
1202         if (rc != 0)
1203                 GOTO(stop, rc);
1204
1205         rc = dt_declare_attr_set(env, child, la, th);
1206         if (rc != 0)
1207                 GOTO(stop, rc);
1208
1209         rc = dt_trans_start_local(env, dev, th);
1210         if (rc != 0)
1211                 GOTO(stop, rc);
1212
1213         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1214                        (const struct dt_key *)name, th, 1);
1215         if (rc != 0)
1216                 GOTO(stop, rc);
1217
1218         if (S_ISDIR(rec->rec_type)) {
1219                 dt_write_lock(env, parent, 0);
1220                 rc = dt_ref_add(env, parent, th);
1221                 dt_write_unlock(env, parent);
1222                 if (rc != 0)
1223                         GOTO(stop, rc);
1224         }
1225
1226         la->la_ctime = cfs_time_current_sec();
1227         rc = dt_attr_set(env, parent, la, th);
1228         if (rc != 0)
1229                 GOTO(stop, rc);
1230
1231         rc = dt_attr_set(env, child, la, th);
1232
1233         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1234
1235 stop:
1236         dt_trans_stop(env, dev, th);
1237
1238 unlock:
1239         lfsck_unlock(llh);
1240
1241 log:
1242         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
1243                "the name %s and type %o to the parent "DFID": rc = %d\n",
1244                lfsck_lfsck2name(lfsck), PFID(cfid), name,
1245                lfsck_object_type(child) & S_IFMT, PFID(pfid), rc);
1246
1247         if (rc != 0) {
1248                 struct lfsck_namespace *ns = com->lc_file_ram;
1249
1250                 ns->ln_flags |= LF_INCONSISTENT;
1251                 if (rc > 0)
1252                         ns->ln_lost_dirent_repaired++;
1253         }
1254
1255         return rc;
1256 }
1257
1258 /**
1259  * Create the specified orphan directory.
1260  *
1261  * For the case that the parent MDT-object stored in some MDT-object's
1262  * linkEA entry is lost, the LFSCK will re-create the parent object as
1263  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1264  * with the name ${FID}-P-${conflict_version}.
1265  *
1266  * \param[in] env       pointer to the thread context
1267  * \param[in] com       pointer to the lfsck component
1268  * \param[in] orphan    pointer to the orphan MDT-object to be created
1269  * \param[in] lmv       pointer to master LMV EA that will be set to the orphan
1270  *
1271  * \retval              positive number for repaired cases
1272  * \retval              negative error number on failure
1273  */
1274 static int lfsck_namespace_create_orphan_dir(const struct lu_env *env,
1275                                              struct lfsck_component *com,
1276                                              struct dt_object *orphan,
1277                                              struct lmv_mds_md_v1 *lmv)
1278 {
1279         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1280         struct lu_attr                  *la     = &info->lti_la;
1281         struct dt_allocation_hint       *hint   = &info->lti_hint;
1282         struct dt_object_format         *dof    = &info->lti_dof;
1283         struct lu_name                  *cname  = &info->lti_name2;
1284         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1285         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1286         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
1287         struct lu_fid                    tfid;
1288         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1289         struct lfsck_namespace          *ns     = com->lc_file_ram;
1290         struct dt_device                *dev    = lfsck_obj2dev(orphan);
1291         struct dt_object                *parent = NULL;
1292         struct thandle                  *th     = NULL;
1293         struct lfsck_lock_handle        *llh    = &info->lti_llh;
1294         struct linkea_data               ldata  = { NULL };
1295         struct lu_buf                    linkea_buf;
1296         struct lu_buf                    lmv_buf;
1297         char                             name[32];
1298         int                              namelen;
1299         int                              idx    = 0;
1300         int                              rc     = 0;
1301         int                              rc1    = 0;
1302         ENTRY;
1303
1304         LASSERT(!dt_object_exists(orphan));
1305
1306         cname->ln_name = NULL;
1307         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1308                 GOTO(log, rc = 1);
1309
1310         if (dt_object_remote(orphan)) {
1311                 LASSERT(lfsck->li_lpf_root_obj != NULL);
1312
1313                 idx = lfsck_find_mdt_idx_by_fid(env, lfsck, cfid);
1314                 if (idx < 0)
1315                         GOTO(log, rc = idx);
1316
1317                 snprintf(name, 8, "MDT%04x", idx);
1318                 rc = dt_lookup(env, lfsck->li_lpf_root_obj,
1319                                (struct dt_rec *)&tfid,
1320                                (const struct dt_key *)name);
1321                 if (rc != 0)
1322                         GOTO(log, rc = (rc == -ENOENT ? -ENXIO : rc));
1323
1324                 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
1325                 if (IS_ERR(parent))
1326                         GOTO(log, rc = PTR_ERR(parent));
1327
1328                 if (unlikely(!dt_try_as_dir(env, parent)))
1329                         GOTO(log, rc = -ENOTDIR);
1330         } else {
1331                 if (unlikely(lfsck->li_lpf_obj == NULL))
1332                         GOTO(log, rc = -ENXIO);
1333
1334                 parent = lfsck->li_lpf_obj;
1335         }
1336
1337         dev = lfsck_find_dev_by_fid(env, lfsck, cfid);
1338         if (IS_ERR(dev))
1339                 GOTO(log, rc = PTR_ERR(dev));
1340
1341         idx = 0;
1342
1343 again:
1344         do {
1345                 namelen = snprintf(name, 31, DFID"-P-%d",
1346                                    PFID(cfid), idx++);
1347                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1348                                (const struct dt_key *)name);
1349                 if (rc != 0 && rc != -ENOENT)
1350                         GOTO(log, rc);
1351         } while (rc == 0);
1352
1353         rc = lfsck_lock(env, lfsck, parent, name, llh,
1354                         MDS_INODELOCK_UPDATE, LCK_PW);
1355         if (rc != 0)
1356                 GOTO(log, rc);
1357
1358         /* Re-check whether the name conflict with othrs after taken
1359          * the ldlm lock. */
1360         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1361                        (const struct dt_key *)name);
1362         if (unlikely(rc == 0)) {
1363                 lfsck_unlock(llh);
1364                 goto again;
1365         }
1366
1367         if (rc != -ENOENT)
1368                 GOTO(unlock1, rc);
1369
1370         cname->ln_name = name;
1371         cname->ln_namelen = namelen;
1372
1373         memset(la, 0, sizeof(*la));
1374         la->la_mode = S_IFDIR | 0700;
1375         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
1376                        LA_ATIME | LA_MTIME | LA_CTIME;
1377
1378         orphan->do_ops->do_ah_init(env, hint, parent, orphan,
1379                                    la->la_mode & S_IFMT);
1380
1381         memset(dof, 0, sizeof(*dof));
1382         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1383
1384         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
1385         if (rc != 0)
1386                 GOTO(unlock1, rc);
1387
1388         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
1389         if (rc != 0)
1390                 GOTO(unlock1, rc);
1391
1392         th = dt_trans_create(env, dev);
1393         if (IS_ERR(th))
1394                 GOTO(unlock1, rc = PTR_ERR(th));
1395
1396         /* Sync the remote transaction to guarantee that the subsequent
1397          * lock against the @orphan can find the @orphan in time. */
1398         if (dt_object_remote(orphan))
1399                 th->th_sync = 1;
1400
1401         rc = dt_declare_create(env, orphan, la, hint, dof, th);
1402         if (rc != 0)
1403                 GOTO(stop, rc);
1404
1405         if (unlikely(!dt_try_as_dir(env, orphan)))
1406                 GOTO(stop, rc = -ENOTDIR);
1407
1408         rc = dt_declare_ref_add(env, orphan, th);
1409         if (rc != 0)
1410                 GOTO(stop, rc);
1411
1412         rec->rec_type = S_IFDIR;
1413         rec->rec_fid = cfid;
1414         rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
1415                                (const struct dt_key *)dot, th);
1416         if (rc != 0)
1417                 GOTO(stop, rc);
1418
1419         rec->rec_fid = lfsck_dto2fid(parent);
1420         rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
1421                                (const struct dt_key *)dotdot, th);
1422         if (rc != 0)
1423                 GOTO(stop, rc);
1424
1425         if (lmv != NULL) {
1426                 lmv->lmv_magic = LMV_MAGIC;
1427                 lmv->lmv_master_mdt_index = lfsck_dev_idx(lfsck);
1428                 lfsck_lmv_header_cpu_to_le(lmv2, lmv);
1429                 lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
1430                 rc = dt_declare_xattr_set(env, orphan, &lmv_buf,
1431                                           XATTR_NAME_LMV, 0, th);
1432                 if (rc != 0)
1433                         GOTO(stop, rc);
1434         }
1435
1436         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1437                        ldata.ld_leh->leh_len);
1438         rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
1439                                   XATTR_NAME_LINK, 0, th);
1440         if (rc != 0)
1441                 GOTO(stop, rc);
1442
1443         rec->rec_fid = cfid;
1444         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1445                                (const struct dt_key *)name, th);
1446         if (rc == 0)
1447                 rc = dt_declare_ref_add(env, parent, th);
1448
1449         if (rc != 0)
1450                 GOTO(stop, rc);
1451
1452         rc = dt_trans_start_local(env, dev, th);
1453         if (rc != 0)
1454                 GOTO(stop, rc);
1455
1456         dt_write_lock(env, orphan, 0);
1457         rc = dt_create(env, orphan, la, hint, dof, th);
1458         if (rc != 0)
1459                 GOTO(unlock2, rc);
1460
1461         rc = dt_ref_add(env, orphan, th);
1462         if (rc != 0)
1463                 GOTO(unlock2, rc);
1464
1465         rec->rec_fid = cfid;
1466         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
1467                        (const struct dt_key *)dot, th, 1);
1468         if (rc != 0)
1469                 GOTO(unlock2, rc);
1470
1471         rec->rec_fid = lfsck_dto2fid(parent);
1472         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
1473                        (const struct dt_key *)dotdot, th, 1);
1474         if (rc != 0)
1475                 GOTO(unlock2, rc);
1476
1477         if (lmv != NULL) {
1478                 rc = dt_xattr_set(env, orphan, &lmv_buf, XATTR_NAME_LMV, 0, th);
1479                 if (rc != 0)
1480                         GOTO(unlock2, rc);
1481         }
1482
1483         rc = dt_xattr_set(env, orphan, &linkea_buf,
1484                           XATTR_NAME_LINK, 0, th);
1485         dt_write_unlock(env, orphan);
1486         if (rc != 0)
1487                 GOTO(stop, rc);
1488
1489         rec->rec_fid = cfid;
1490         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1491                        (const struct dt_key *)name, th, 1);
1492         if (rc == 0) {
1493                 dt_write_lock(env, parent, 0);
1494                 rc = dt_ref_add(env, parent, th);
1495                 dt_write_unlock(env, parent);
1496         }
1497
1498         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1499
1500 unlock2:
1501         dt_write_unlock(env, orphan);
1502
1503 stop:
1504         rc1 = dt_trans_stop(env, dev, th);
1505         if (rc1 != 0 && rc > 0)
1506                 rc = rc1;
1507
1508 unlock1:
1509         lfsck_unlock(llh);
1510
1511 log:
1512         CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for "
1513                "the object "DFID", name = %s: rc = %d\n",
1514                lfsck_lfsck2name(lfsck), PFID(cfid),
1515                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1516
1517         if (parent != NULL && !IS_ERR(parent) && parent != lfsck->li_lpf_obj)
1518                 lfsck_object_put(env, parent);
1519
1520         if (rc != 0)
1521                 ns->ln_flags |= LF_INCONSISTENT;
1522
1523         return rc;
1524 }
1525
1526 /**
1527  * Remove the specified entry from the linkEA.
1528  *
1529  * Locate the linkEA entry with the given @cname and @pfid, then
1530  * remove this entry or the other entries those are repeated with
1531  * this entry.
1532  *
1533  * \param[in] env       pointer to the thread context
1534  * \param[in] com       pointer to the lfsck component
1535  * \param[in] obj       pointer to the dt_object to be handled
1536  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1537  * \param[in] cname     the name for the child in the parent directory
1538  * \param[in] pfid      the parent directory's FID for the linkEA
1539  * \param[in] next      if true, then remove the first found linkEA
1540  *                      entry, and move the ldata->ld_lee to next entry
1541  *
1542  * \retval              positive number for repaired cases
1543  * \retval              0 if nothing to be repaired
1544  * \retval              negative error number on failure
1545  */
1546 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
1547                                          struct lfsck_component *com,
1548                                          struct dt_object *obj,
1549                                          struct linkea_data *ldata,
1550                                          struct lu_name *cname,
1551                                          struct lu_fid *pfid,
1552                                          bool next)
1553 {
1554         struct lfsck_instance           *lfsck     = com->lc_lfsck;
1555         struct dt_device                *dev       = lfsck_obj2dev(obj);
1556         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
1557         struct thandle                  *th        = NULL;
1558         struct lustre_handle             lh        = { 0 };
1559         struct linkea_data               ldata_new = { NULL };
1560         struct lu_buf                    linkea_buf;
1561         int                              buflen    = 0;
1562         int                              rc        = 0;
1563         ENTRY;
1564
1565         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1566                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1567                               LCK_EX);
1568         if (rc != 0)
1569                 GOTO(log, rc);
1570
1571         if (next)
1572                 lfsck_linkea_del_buf(ldata, cname);
1573         else
1574                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
1575                                                     true);
1576         if (ldata->ld_leh->leh_reccount > 0) {
1577                 lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1578                                ldata->ld_leh->leh_len);
1579                 buflen = linkea_buf.lb_len;
1580         }
1581
1582 again:
1583         th = dt_trans_create(env, dev);
1584         if (IS_ERR(th))
1585                 GOTO(unlock1, rc = PTR_ERR(th));
1586
1587         if (buflen != 0)
1588                 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1589                                           XATTR_NAME_LINK, 0, th);
1590         else
1591                 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
1592         if (rc != 0)
1593                 GOTO(stop, rc);
1594
1595         rc = dt_trans_start_local(env, dev, th);
1596         if (rc != 0)
1597                 GOTO(stop, rc);
1598
1599         dt_write_lock(env, obj, 0);
1600         if (unlikely(lfsck_is_dead_obj(obj)))
1601                 GOTO(unlock2, rc = -ENOENT);
1602
1603         rc = lfsck_links_read2(env, obj, &ldata_new);
1604         if (rc != 0)
1605                 GOTO(unlock2,
1606                      rc = (rc == -ENODATA ? 0 : rc));
1607
1608         /* The specified linkEA entry has been removed by race. */
1609         rc = linkea_links_find(&ldata_new, cname, pfid);
1610         if (rc != 0)
1611                 GOTO(unlock2, rc = 0);
1612
1613         if (bk->lb_param & LPF_DRYRUN)
1614                 GOTO(unlock2, rc = 1);
1615
1616         if (next)
1617                 lfsck_linkea_del_buf(&ldata_new, cname);
1618         else
1619                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
1620                                                     true);
1621
1622         if (buflen < ldata_new.ld_leh->leh_len) {
1623                 dt_write_unlock(env, obj);
1624                 dt_trans_stop(env, dev, th);
1625                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1626                                ldata_new.ld_leh->leh_len);
1627                 goto again;
1628         }
1629
1630         if (ldata_new.ld_leh->leh_reccount > 0) {
1631                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1632                                ldata_new.ld_leh->leh_len);
1633                 rc = dt_xattr_set(env, obj, &linkea_buf,
1634                                   XATTR_NAME_LINK, 0, th);
1635         } else {
1636                 rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th);
1637         }
1638
1639         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1640
1641 unlock2:
1642         dt_write_unlock(env, obj);
1643
1644 stop:
1645         dt_trans_stop(env, dev, th);
1646
1647 unlock1:
1648         lfsck_ibits_unlock(&lh, LCK_EX);
1649
1650 log:
1651         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
1652                "for the object: "DFID", parent "DFID", name %.*s\n",
1653                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
1654                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
1655                cname->ln_name);
1656
1657         if (rc != 0) {
1658                 struct lfsck_namespace *ns = com->lc_file_ram;
1659
1660                 ns->ln_flags |= LF_INCONSISTENT;
1661         }
1662
1663         return rc;
1664 }
1665
1666 /**
1667  * Conditionally remove the specified entry from the linkEA.
1668  *
1669  * Take the parent lock firstly, then check whether the specified
1670  * name entry exists or not: if yes, do nothing; otherwise, call
1671  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
1672  *
1673  * \param[in] env       pointer to the thread context
1674  * \param[in] com       pointer to the lfsck component
1675  * \param[in] parent    pointer to the parent directory
1676  * \param[in] child     pointer to the child object that holds the linkEA
1677  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1678  * \param[in] cname     the name for the child in the parent directory
1679  * \param[in] pfid      the parent directory's FID for the linkEA
1680  *
1681  * \retval              positive number for repaired cases
1682  * \retval              0 if nothing to be repaired
1683  * \retval              negative error number on failure
1684  */
1685 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
1686                                               struct lfsck_component *com,
1687                                               struct dt_object *parent,
1688                                               struct dt_object *child,
1689                                               struct linkea_data *ldata,
1690                                               struct lu_name *cname,
1691                                               struct lu_fid *pfid)
1692 {
1693         struct lfsck_thread_info *info  = lfsck_env_info(env);
1694         struct lu_fid            *cfid  = &info->lti_fid3;
1695         struct lfsck_lock_handle *llh   = &info->lti_llh;
1696         int                       rc;
1697         ENTRY;
1698
1699         rc = lfsck_lock(env, com->lc_lfsck, parent, cname->ln_name, llh,
1700                         MDS_INODELOCK_UPDATE, LCK_PR);
1701         if (rc != 0)
1702                 RETURN(rc);
1703
1704         dt_read_lock(env, parent, 0);
1705         if (unlikely(lfsck_is_dead_obj(parent))) {
1706                 dt_read_unlock(env, parent);
1707                 lfsck_unlock(llh);
1708                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1709                                                    cname, pfid, true);
1710
1711                 RETURN(rc);
1712         }
1713
1714         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1715                        (const struct dt_key *)cname->ln_name);
1716         dt_read_unlock(env, parent);
1717
1718         /* It is safe to release the ldlm lock, because when the logic come
1719          * here, we have got all the needed information above whether the
1720          * linkEA entry is valid or not. It is not important that others
1721          * may add new linkEA entry after the ldlm lock released. If other
1722          * has removed the specified linkEA entry by race, then it is OK,
1723          * because the subsequent lfsck_namespace_shrink_linkea() can handle
1724          * such case. */
1725         lfsck_unlock(llh);
1726         if (rc == -ENOENT) {
1727                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1728                                                    cname, pfid, true);
1729
1730                 RETURN(rc);
1731         }
1732
1733         if (rc != 0)
1734                 RETURN(rc);
1735
1736         /* The LFSCK just found some internal status of cross-MDTs
1737          * create operation. That is normal. */
1738         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1739                 linkea_next_entry(ldata);
1740
1741                 RETURN(0);
1742         }
1743
1744         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1745                                            pfid, true);
1746
1747         RETURN(rc);
1748 }
1749
1750 /**
1751  * Conditionally replace name entry in the parent.
1752  *
1753  * As required, the LFSCK may re-create the lost MDT-object for dangling
1754  * name entry, but such repairing may be wrong because of bad FID in the
1755  * name entry. As the LFSCK processing, the real MDT-object may be found,
1756  * then the LFSCK should check whether the former re-created MDT-object
1757  * has been modified or not, if not, then destroy it and update the name
1758  * entry in the parent to reference the real MDT-object.
1759  *
1760  * \param[in] env       pointer to the thread context
1761  * \param[in] com       pointer to the lfsck component
1762  * \param[in] parent    pointer to the parent directory
1763  * \param[in] child     pointer to the MDT-object that may be the real
1764  *                      MDT-object corresponding to the name entry in parent
1765  * \param[in] cfid      the current FID in the name entry
1766  * \param[in] cname     contains the name of the child in the parent directory
1767  *
1768  * \retval              positive number for repaired cases
1769  * \retval              0 if nothing to be repaired
1770  * \retval              negative error number on failure
1771  */
1772 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1773                                         struct lfsck_component *com,
1774                                         struct dt_object *parent,
1775                                         struct dt_object *child,
1776                                         const struct lu_fid *cfid,
1777                                         const struct lu_name *cname)
1778 {
1779         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1780         struct lu_attr                  *la     = &info->lti_la;
1781         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1782         struct lu_fid                    tfid;
1783         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1784         /* The child and its name may be on different MDTs. */
1785         struct dt_device                *dev    = lfsck->li_next;
1786         const char                      *name   = cname->ln_name;
1787         const struct lu_fid             *pfid   = lfsck_dto2fid(parent);
1788         struct dt_object                *cobj   = NULL;
1789         struct lfsck_lock_handle        *pllh   = &info->lti_llh;
1790         struct lustre_handle             clh    = { 0 };
1791         struct linkea_data               ldata  = { NULL };
1792         struct thandle                  *th     = NULL;
1793         bool                             exist  = true;
1794         int                              rc     = 0;
1795         ENTRY;
1796
1797         /* @parent/@child may be based on lfsck->li_bottom,
1798          * but here we need the object based on the lfsck->li_next. */
1799
1800         parent = lfsck_object_locate(dev, parent);
1801         if (IS_ERR(parent))
1802                 GOTO(log, rc = PTR_ERR(parent));
1803
1804         if (unlikely(!dt_try_as_dir(env, parent)))
1805                 GOTO(log, rc = -ENOTDIR);
1806
1807         rc = lfsck_lock(env, lfsck, parent, name, pllh,
1808                         MDS_INODELOCK_UPDATE, LCK_PW);
1809         if (rc != 0)
1810                 GOTO(log, rc);
1811
1812         if (!fid_is_sane(cfid)) {
1813                 exist = false;
1814                 goto replace;
1815         }
1816
1817         cobj = lfsck_object_find_by_dev(env, dev, cfid);
1818         if (IS_ERR(cobj)) {
1819                 rc = PTR_ERR(cobj);
1820                 if (rc == -ENOENT) {
1821                         exist = false;
1822                         goto replace;
1823                 }
1824
1825                 GOTO(log, rc);
1826         }
1827
1828         if (!dt_object_exists(cobj)) {
1829                 exist = false;
1830                 goto replace;
1831         }
1832
1833         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1834                        (const struct dt_key *)name);
1835         if (rc == -ENOENT) {
1836                 exist = false;
1837                 goto replace;
1838         }
1839
1840         if (rc != 0)
1841                 GOTO(log, rc);
1842
1843         /* Someone changed the name entry, cannot replace it. */
1844         if (!lu_fid_eq(cfid, &tfid))
1845                 GOTO(log, rc = 0);
1846
1847         /* lock the object to be destroyed. */
1848         rc = lfsck_ibits_lock(env, lfsck, cobj, &clh,
1849                               MDS_INODELOCK_UPDATE |
1850                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1851                               LCK_EX);
1852         if (rc != 0)
1853                 GOTO(log, rc);
1854
1855         if (unlikely(lfsck_is_dead_obj(cobj))) {
1856                 exist = false;
1857                 goto replace;
1858         }
1859
1860         rc = dt_attr_get(env, cobj, la);
1861         if (rc != 0)
1862                 GOTO(log, rc);
1863
1864         /* The object has been modified by other(s), or it is not created by
1865          * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1866         if (la->la_ctime != 0)
1867                 GOTO(log, rc);
1868
1869         if (S_ISREG(la->la_mode)) {
1870                 rc = dt_xattr_get(env, cobj, &LU_BUF_NULL, XATTR_NAME_LOV);
1871                 /* If someone has created related OST-object(s),
1872                  * then keep it. */
1873                 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1874                         GOTO(log, rc = (rc > 0 ? 0 : rc));
1875         }
1876
1877 replace:
1878         dt_read_lock(env, child, 0);
1879         rc = lfsck_links_read2(env, child, &ldata);
1880         dt_read_unlock(env, child);
1881
1882         /* Someone changed the child, no need to replace. */
1883         if (rc == -ENODATA)
1884                 GOTO(log, rc = 0);
1885
1886         if (rc != 0)
1887                 GOTO(log, rc);
1888
1889         rc = linkea_links_find(&ldata, cname, pfid);
1890         /* Someone moved the child, no need to replace. */
1891         if (rc != 0)
1892                 GOTO(log, rc = 0);
1893
1894         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1895                 GOTO(log, rc = 1);
1896
1897         th = dt_trans_create(env, dev);
1898         if (IS_ERR(th))
1899                 GOTO(log, rc = PTR_ERR(th));
1900
1901         if (exist) {
1902                 rc = dt_declare_destroy(env, cobj, th);
1903                 if (rc != 0)
1904                         GOTO(stop, rc);
1905         }
1906
1907         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1908         if (rc != 0)
1909                 GOTO(stop, rc);
1910
1911         rec->rec_type = S_IFDIR;
1912         rec->rec_fid = lfsck_dto2fid(child);
1913         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1914                                (const struct dt_key *)name, th);
1915         if (rc != 0)
1916                 GOTO(stop, rc);
1917
1918         rc = dt_trans_start_local(env, dev, th);
1919         if (rc != 0)
1920                 GOTO(stop, rc);
1921
1922         if (exist) {
1923                 rc = dt_destroy(env, cobj, th);
1924                 if (rc != 0)
1925                         GOTO(stop, rc);
1926         }
1927
1928         /* The old name entry maybe not exist. */
1929         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
1930         if (rc != 0 && rc != -ENOENT)
1931                 GOTO(stop, rc);
1932
1933         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1934                        (const struct dt_key *)name, th, 1);
1935
1936         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1937
1938 stop:
1939         dt_trans_stop(env, dev, th);
1940
1941 log:
1942         lfsck_ibits_unlock(&clh, LCK_EX);
1943         lfsck_unlock(pllh);
1944
1945         if (cobj != NULL && !IS_ERR(cobj))
1946                 lfsck_object_put(env, cobj);
1947
1948         CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1949                "object "DFID" because of conflict with the object "DFID
1950                " under the parent "DFID" with name %s: rc = %d\n",
1951                lfsck_lfsck2name(lfsck), PFID(cfid),
1952                PFID(lfsck_dto2fid(child)), PFID(pfid), name, rc);
1953
1954         return rc;
1955 }
1956
1957 /**
1958  * Overwrite the linkEA for the object with the given ldata.
1959  *
1960  * The caller should take the ldlm lock before the calling.
1961  *
1962  * \param[in] env       pointer to the thread context
1963  * \param[in] com       pointer to the lfsck component
1964  * \param[in] obj       pointer to the dt_object to be handled
1965  * \param[in] ldata     pointer to the new linkEA data
1966  *
1967  * \retval              positive number for repaired cases
1968  * \retval              0 if nothing to be repaired
1969  * \retval              negative error number on failure
1970  */
1971 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1972                                    struct lfsck_component *com,
1973                                    struct dt_object *obj,
1974                                    struct linkea_data *ldata)
1975 {
1976         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1977         struct dt_device                *dev    = lfsck_obj2dev(obj);
1978         struct thandle                  *th     = NULL;
1979         struct lu_buf                    linkea_buf;
1980         int                              rc     = 0;
1981         ENTRY;
1982
1983         th = dt_trans_create(env, dev);
1984         if (IS_ERR(th))
1985                 GOTO(log, rc = PTR_ERR(th));
1986
1987         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1988                        ldata->ld_leh->leh_len);
1989         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1990                                   XATTR_NAME_LINK, 0, th);
1991         if (rc != 0)
1992                 GOTO(stop, rc);
1993
1994         rc = dt_trans_start_local(env, dev, th);
1995         if (rc != 0)
1996                 GOTO(stop, rc);
1997
1998         dt_write_lock(env, obj, 0);
1999         if (unlikely(lfsck_is_dead_obj(obj)))
2000                 GOTO(unlock, rc = 0);
2001
2002         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2003                 GOTO(unlock, rc = 1);
2004
2005         rc = dt_xattr_set(env, obj, &linkea_buf,
2006                           XATTR_NAME_LINK, 0, th);
2007
2008         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2009
2010 unlock:
2011         dt_write_unlock(env, obj);
2012
2013 stop:
2014         dt_trans_stop(env, dev, th);
2015
2016 log:
2017         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
2018                "object "DFID": rc = %d\n",
2019                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
2020
2021         if (rc != 0) {
2022                 struct lfsck_namespace *ns = com->lc_file_ram;
2023
2024                 ns->ln_flags |= LF_INCONSISTENT;
2025         }
2026
2027         return rc;
2028 }
2029
2030 /**
2031  * Repair invalid name entry.
2032  *
2033  * If the name entry contains invalid information, such as bad file type
2034  * or (and) corrupted object FID, then either remove the name entry or
2035  * udpate the name entry with the given (right) information.
2036  *
2037  * \param[in] env       pointer to the thread context
2038  * \param[in] com       pointer to the lfsck component
2039  * \param[in] parent    pointer to the parent directory
2040  * \param[in] child     pointer to the object referenced by the name entry
2041  * \param[in] name      the old name of the child under the parent directory
2042  * \param[in] name2     the new name of the child under the parent directory
2043  * \param[in] type      the type claimed by the name entry
2044  * \param[in] update    update the name entry if true; otherwise, remove it
2045  * \param[in] dec       decrease the parent nlink count if true
2046  *
2047  * \retval              positive number for repaired successfully
2048  * \retval              0 if nothing to be repaired
2049  * \retval              negative error number on failure
2050  */
2051 int lfsck_namespace_repair_dirent(const struct lu_env *env,
2052                                   struct lfsck_component *com,
2053                                   struct dt_object *parent,
2054                                   struct dt_object *child,
2055                                   const char *name, const char *name2,
2056                                   __u16 type, bool update, bool dec)
2057 {
2058         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2059         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
2060         const struct lu_fid             *pfid   = lfsck_dto2fid(parent);
2061         const struct lu_fid             *cfid   = lfsck_dto2fid(child);
2062         struct lu_fid                    tfid;
2063         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2064         struct dt_device                *dev    = lfsck->li_next;
2065         struct thandle                  *th     = NULL;
2066         struct lfsck_lock_handle        *llh    = &info->lti_llh;
2067         struct lustre_handle             lh     = { 0 };
2068         int                              rc     = 0;
2069         ENTRY;
2070
2071         parent = lfsck_object_locate(dev, parent);
2072         if (IS_ERR(parent))
2073                 GOTO(log, rc = PTR_ERR(parent));
2074
2075         if (unlikely(!dt_try_as_dir(env, parent)))
2076                 GOTO(log, rc = -ENOTDIR);
2077
2078         if (!update || strcmp(name, name2) == 0)
2079                 rc = lfsck_lock(env, lfsck, parent, name, llh,
2080                                 MDS_INODELOCK_UPDATE, LCK_PW);
2081         else
2082                 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2083                                       MDS_INODELOCK_UPDATE, LCK_PW);
2084         if (rc != 0)
2085                 GOTO(log, rc);
2086
2087         th = dt_trans_create(env, dev);
2088         if (IS_ERR(th))
2089                 GOTO(unlock1, rc = PTR_ERR(th));
2090
2091         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2092         if (rc != 0)
2093                 GOTO(stop, rc);
2094
2095         if (update) {
2096                 rec->rec_type = lfsck_object_type(child) & S_IFMT;
2097                 rec->rec_fid = cfid;
2098                 rc = dt_declare_insert(env, parent,
2099                                        (const struct dt_rec *)rec,
2100                                        (const struct dt_key *)name2, th);
2101                 if (rc != 0)
2102                         GOTO(stop, rc);
2103         }
2104
2105         if (dec && S_ISDIR(type)) {
2106                 rc = dt_declare_ref_del(env, parent, th);
2107                 if (rc != 0)
2108                         GOTO(stop, rc);
2109         }
2110
2111         rc = dt_trans_start_local(env, dev, th);
2112         if (rc != 0)
2113                 GOTO(stop, rc);
2114
2115
2116         dt_write_lock(env, parent, 0);
2117         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2118                        (const struct dt_key *)name);
2119         /* Someone has removed the bad name entry by race. */
2120         if (rc == -ENOENT)
2121                 GOTO(unlock2, rc = 0);
2122
2123         if (rc != 0)
2124                 GOTO(unlock2, rc);
2125
2126         /* Someone has removed the bad name entry and reused it for other
2127          * object by race. */
2128         if (!lu_fid_eq(&tfid, cfid))
2129                 GOTO(unlock2, rc = 0);
2130
2131         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2132                 GOTO(unlock2, rc = 1);
2133
2134         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
2135         if (rc != 0)
2136                 GOTO(unlock2, rc);
2137
2138         if (update) {
2139                 rc = dt_insert(env, parent,
2140                                (const struct dt_rec *)rec,
2141                                (const struct dt_key *)name2, th, 1);
2142                 if (rc != 0)
2143                         GOTO(unlock2, rc);
2144         }
2145
2146         if (dec && S_ISDIR(type)) {
2147                 rc = dt_ref_del(env, parent, th);
2148                 if (rc != 0)
2149                         GOTO(unlock2, rc);
2150         }
2151
2152         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2153
2154 unlock2:
2155         dt_write_unlock(env, parent);
2156
2157 stop:
2158         dt_trans_stop(env, dev, th);
2159
2160         /* We are not sure whether the child will become orphan or not.
2161          * Record it in the LFSCK trace file for further checking in
2162          * the second-stage scanning. */
2163         if (!update && !dec && rc == 0)
2164                 lfsck_namespace_trace_update(env, com, cfid,
2165                                              LNTF_CHECK_LINKEA, true);
2166
2167 unlock1:
2168         /* It is harmless even if unlock the unused lock_handle */
2169         lfsck_ibits_unlock(&lh, LCK_PW);
2170         lfsck_unlock(llh);
2171
2172 log:
2173         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
2174                "entry for: parent "DFID", child "DFID", name %s, type "
2175                "in name entry %o, type claimed by child %o. repair it "
2176                "by %s with new name2 %s: rc = %d\n",
2177                lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2178                name, type, update ? lfsck_object_type(child) : 0,
2179                update ? "updating" : "removing", name2, rc);
2180
2181         if (rc != 0) {
2182                 struct lfsck_namespace *ns = com->lc_file_ram;
2183
2184                 ns->ln_flags |= LF_INCONSISTENT;
2185         }
2186
2187         return rc;
2188 }
2189
2190 /**
2191  * Update the ".." name entry for the given object.
2192  *
2193  * The object's ".." is corrupted, this function will update the ".." name
2194  * entry with the given pfid, and the linkEA with the given ldata.
2195  *
2196  * The caller should take the ldlm lock before the calling.
2197  *
2198  * \param[in] env       pointer to the thread context
2199  * \param[in] com       pointer to the lfsck component
2200  * \param[in] obj       pointer to the dt_object to be handled
2201  * \param[in] pfid      the new fid for the object's ".." name entry
2202  * \param[in] cname     the name for the @obj in the parent directory
2203  *
2204  * \retval              positive number for repaired cases
2205  * \retval              0 if nothing to be repaired
2206  * \retval              negative error number on failure
2207  */
2208 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
2209                                                   struct lfsck_component *com,
2210                                                   struct dt_object *obj,
2211                                                   const struct lu_fid *pfid,
2212                                                   struct lu_name *cname)
2213 {
2214         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2215         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
2216         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2217         struct dt_device                *dev    = lfsck_obj2dev(obj);
2218         struct thandle                  *th     = NULL;
2219         struct linkea_data               ldata  = { NULL };
2220         struct lu_buf                    linkea_buf;
2221         int                              rc     = 0;
2222         ENTRY;
2223
2224         LASSERT(!dt_object_remote(obj));
2225         LASSERT(S_ISDIR(lfsck_object_type(obj)));
2226
2227         rc = linkea_data_new(&ldata, &info->lti_big_buf);
2228         if (rc != 0)
2229                 GOTO(log, rc);
2230
2231         rc = linkea_add_buf(&ldata, cname, pfid);
2232         if (rc != 0)
2233                 GOTO(log, rc);
2234
2235         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2236                        ldata.ld_leh->leh_len);
2237
2238         th = dt_trans_create(env, dev);
2239         if (IS_ERR(th))
2240                 GOTO(log, rc = PTR_ERR(th));
2241
2242         rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
2243         if (rc != 0)
2244                 GOTO(stop, rc);
2245
2246         rec->rec_type = S_IFDIR;
2247         rec->rec_fid = pfid;
2248         rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
2249                                (const struct dt_key *)dotdot, th);
2250         if (rc != 0)
2251                 GOTO(stop, rc);
2252
2253         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
2254                                   XATTR_NAME_LINK, 0, th);
2255         if (rc != 0)
2256                 GOTO(stop, rc);
2257
2258         rc = dt_trans_start_local(env, dev, th);
2259         if (rc != 0)
2260                 GOTO(stop, rc);
2261
2262         dt_write_lock(env, obj, 0);
2263         if (unlikely(lfsck_is_dead_obj(obj)))
2264                 GOTO(unlock, rc = 0);
2265
2266         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2267                 GOTO(unlock, rc = 1);
2268
2269         /* The old ".." name entry maybe not exist. */
2270         dt_delete(env, obj, (const struct dt_key *)dotdot, th);
2271
2272         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
2273                        (const struct dt_key *)dotdot, th, 1);
2274         if (rc != 0)
2275                 GOTO(unlock, rc);
2276
2277         rc = dt_xattr_set(env, obj, &linkea_buf,
2278                           XATTR_NAME_LINK, 0, th);
2279
2280         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2281
2282 unlock:
2283         dt_write_unlock(env, obj);
2284
2285 stop:
2286         dt_trans_stop(env, dev, th);
2287
2288 log:
2289         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
2290                "the object "DFID", new parent "DFID": rc = %d\n",
2291                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
2292                PFID(pfid), rc);
2293
2294         if (rc != 0) {
2295                 struct lfsck_namespace *ns = com->lc_file_ram;
2296
2297                 ns->ln_flags |= LF_INCONSISTENT;
2298         }
2299
2300         return rc;
2301 }
2302
2303 /**
2304  * Handle orphan @obj during Double Scan Directory.
2305  *
2306  * Remove the @obj's current (invalid) linkEA entries, and insert
2307  * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
2308  * ${FID}-${PFID}-D-${conflict_version}
2309  *
2310  * The caller should take the ldlm lock before the calling.
2311  *
2312  * \param[in] env       pointer to the thread context
2313  * \param[in] com       pointer to the lfsck component
2314  * \param[in] obj       pointer to the orphan object to be handled
2315  * \param[in] pfid      the new fid for the object's ".." name entry
2316  * \param[in,out] lh    ldlm lock handler for the given @obj
2317  * \param[out] type     to tell the caller what the inconsistency is
2318  *
2319  * \retval              positive number for repaired cases
2320  * \retval              0 if nothing to be repaired
2321  * \retval              negative error number on failure
2322  */
2323 static int
2324 lfsck_namespace_dsd_orphan(const struct lu_env *env,
2325                            struct lfsck_component *com,
2326                            struct dt_object *obj,
2327                            const struct lu_fid *pfid,
2328                            struct lustre_handle *lh,
2329                            enum lfsck_namespace_inconsistency_type *type)
2330 {
2331         struct lfsck_thread_info *info = lfsck_env_info(env);
2332         struct lfsck_namespace   *ns   = com->lc_file_ram;
2333         int                       rc;
2334         ENTRY;
2335
2336         /* Remove the unrecognized linkEA. */
2337         rc = lfsck_namespace_links_remove(env, com, obj);
2338         lfsck_ibits_unlock(lh, LCK_EX);
2339         if (rc < 0 && rc != -ENODATA)
2340                 RETURN(rc);
2341
2342         *type = LNIT_MUL_REF;
2343
2344         /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2345          * ever tried to verify some remote MDT-object that resides on this
2346          * MDT, but this MDT failed to respond such request. So means there
2347          * may be some remote name entry on other MDT that references this
2348          * object with another name, so we cannot know whether this linkEA
2349          * is valid or not. So keep it there and maybe resolved when next
2350          * LFSCK run. */
2351         if (ns->ln_flags & LF_INCOMPLETE)
2352                 RETURN(0);
2353
2354         /* The unique linkEA is invalid, even if the ".." name entry may be
2355          * valid, we still cannot know via which name entry this directory
2356          * will be referenced. Then handle it as pure orphan. */
2357         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2358                  "-"DFID, PFID(pfid));
2359         rc = lfsck_namespace_insert_orphan(env, com, obj,
2360                                            info->lti_tmpbuf, "D", NULL);
2361
2362         RETURN(rc);
2363 }
2364
2365 /**
2366  * Double Scan Directory object for single linkEA entry case.
2367  *
2368  * The given @child has unique linkEA entry. If the linkEA entry is valid,
2369  * then check whether the name is in the namespace or not, if not, add the
2370  * missing name entry back to namespace. If the linkEA entry is invalid,
2371  * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
2372  * as an orphan.
2373  *
2374  * \param[in] env       pointer to the thread context
2375  * \param[in] com       pointer to the lfsck component
2376  * \param[in] child     pointer to the directory to be double scanned
2377  * \param[in] pfid      the FID corresponding to the ".." entry
2378  * \param[in] ldata     pointer to the linkEA data for the given @child
2379  * \param[in,out] lh    ldlm lock handler for the given @child
2380  * \param[out] type     to tell the caller what the inconsistency is
2381  * \param[in] retry     if found inconsistency, but the caller does not hold
2382  *                      ldlm lock on the @child, then set @retry as true
2383  * \param[in] unknown   set if does not know how to repair the inconsistency
2384  *
2385  * \retval              positive number for repaired cases
2386  * \retval              0 if nothing to be repaired
2387  * \retval              negative error number on failure
2388  */
2389 static int
2390 lfsck_namespace_dsd_single(const struct lu_env *env,
2391                            struct lfsck_component *com,
2392                            struct dt_object *child,
2393                            const struct lu_fid *pfid,
2394                            struct linkea_data *ldata,
2395                            struct lustre_handle *lh,
2396                            enum lfsck_namespace_inconsistency_type *type,
2397                            bool *retry, bool *unknown)
2398 {
2399         struct lfsck_thread_info *info          = lfsck_env_info(env);
2400         struct lu_name           *cname         = &info->lti_name;
2401         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2402         struct lu_fid             tfid;
2403         struct lfsck_namespace   *ns            = com->lc_file_ram;
2404         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2405         struct dt_object         *parent        = NULL;
2406         struct lmv_mds_md_v1     *lmv;
2407         int                       rc            = 0;
2408         ENTRY;
2409
2410         rc = lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid,
2411                                                  info->lti_key,
2412                                                  sizeof(info->lti_key));
2413         /* The unique linkEA entry with bad parent will be handled as orphan. */
2414         if (rc != 0) {
2415                 if (!lustre_handle_is_used(lh) && retry != NULL)
2416                         *retry = true;
2417                 else
2418                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2419                                                         pfid, lh, type);
2420
2421                 GOTO(out, rc);
2422         }
2423
2424         parent = lfsck_object_find_bottom(env, lfsck, &tfid);
2425         if (IS_ERR(parent))
2426                 GOTO(out, rc = PTR_ERR(parent));
2427
2428         /* We trust the unique linkEA entry in spite of whether it matches the
2429          * ".." name entry or not. Because even if the linkEA entry is wrong
2430          * and the ".." name entry is right, we still cannot know via which
2431          * name entry the child will be referenced, since all known entries
2432          * have been verified during the first-stage scanning. */
2433         if (!dt_object_exists(parent)) {
2434                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2435                  * has ever tried to verify some remote MDT-object that resides
2436                  * on this MDT, but this MDT failed to respond such request. So
2437                  * means there may be some remote name entry on other MDT that
2438                  * references this object with another name, so we cannot know
2439                  * whether this linkEA is valid or not. So keep it there and
2440                  * maybe resolved when next LFSCK run. */
2441                 if (ns->ln_flags & LF_INCOMPLETE)
2442                         GOTO(out, rc = 0);
2443
2444                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2445                         *retry = true;
2446
2447                         GOTO(out, rc = 0);
2448                 }
2449
2450                 lfsck_ibits_unlock(lh, LCK_EX);
2451
2452 lost_parent:
2453                 lmv = &info->lti_lmv;
2454                 rc = lfsck_read_stripe_lmv(env, child, lmv);
2455                 if (rc != 0 && rc != -ENODATA)
2456                         GOTO(out, rc);
2457
2458                 if (rc == -ENODATA || lmv->lmv_magic != LMV_MAGIC_STRIPE) {
2459                         lmv = NULL;
2460                 } else if (lfsck_shard_name_to_index(env,
2461                                         cname->ln_name, cname->ln_namelen,
2462                                         S_IFDIR, cfid) < 0) {
2463                         /* It is an invalid name entry, we
2464                          * cannot trust the parent also. */
2465                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2466                                                 ldata, cname, &tfid, true);
2467                         if (rc < 0)
2468                                 GOTO(out, rc);
2469
2470                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2471                                  "-"DFID, PFID(pfid));
2472                         rc = lfsck_namespace_insert_orphan(env, com, child,
2473                                                 info->lti_tmpbuf, "S", NULL);
2474
2475                         GOTO(out, rc);
2476                 }
2477
2478                 /* Create the lost parent as an orphan. */
2479                 rc = lfsck_namespace_create_orphan_dir(env, com, parent, lmv);
2480                 if (rc >= 0) {
2481                         /* Add the missing name entry to the parent. */
2482                         rc = lfsck_namespace_insert_normal(env, com, parent,
2483                                                         child, cname->ln_name);
2484                         if (unlikely(rc == -EEXIST)) {
2485                                 /* Unfortunately, someone reused the name
2486                                  * under the parent by race. So we have
2487                                  * to remove the linkEA entry from
2488                                  * current child object. It means that the
2489                                  * LFSCK cannot recover the system
2490                                  * totally back to its original status,
2491                                  * but it is necessary to make the
2492                                  * current system to be consistent. */
2493                                 rc = lfsck_namespace_shrink_linkea(env,
2494                                                 com, child, ldata,
2495                                                 cname, &tfid, true);
2496                                 if (rc >= 0) {
2497                                         snprintf(info->lti_tmpbuf,
2498                                                  sizeof(info->lti_tmpbuf),
2499                                                  "-"DFID, PFID(pfid));
2500                                         rc = lfsck_namespace_insert_orphan(env,
2501                                                 com, child, info->lti_tmpbuf,
2502                                                 "D", NULL);
2503                                 }
2504                         }
2505                 }
2506
2507                 GOTO(out, rc);
2508         } /* !dt_object_exists(parent) */
2509
2510         /* The unique linkEA entry with bad parent will be handled as orphan. */
2511         if (unlikely(!dt_try_as_dir(env, parent))) {
2512                 if (!lustre_handle_is_used(lh) && retry != NULL)
2513                         *retry = true;
2514                 else
2515                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2516                                                         pfid, lh, type);
2517
2518                 GOTO(out, rc);
2519         }
2520
2521         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2522                        (const struct dt_key *)cname->ln_name);
2523         if (rc == -ENOENT) {
2524                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2525                  * has ever tried to verify some remote MDT-object that resides
2526                  * on this MDT, but this MDT failed to respond such request. So
2527                  * means there may be some remote name entry on other MDT that
2528                  * references this object with another name, so we cannot know
2529                  * whether this linkEA is valid or not. So keep it there and
2530                  * maybe resolved when next LFSCK run. */
2531                 if (ns->ln_flags & LF_INCOMPLETE)
2532                         GOTO(out, rc = 0);
2533
2534                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2535                         *retry = true;
2536
2537                         GOTO(out, rc = 0);
2538                 }
2539
2540                 lfsck_ibits_unlock(lh, LCK_EX);
2541                 rc = lfsck_namespace_check_name(env, parent, child, cname);
2542                 if (rc == -ENOENT)
2543                         goto lost_parent;
2544
2545                 if (rc < 0)
2546                         GOTO(out, rc);
2547
2548                 /* It is an invalid name entry, drop it. */
2549                 if (unlikely(rc > 0)) {
2550                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2551                                                 ldata, cname, &tfid, true);
2552                         if (rc >= 0) {
2553                                 snprintf(info->lti_tmpbuf,
2554                                          sizeof(info->lti_tmpbuf),
2555                                          "-"DFID, PFID(pfid));
2556                                 rc = lfsck_namespace_insert_orphan(env, com,
2557                                         child, info->lti_tmpbuf, "D", NULL);
2558                         }
2559
2560                         GOTO(out, rc);
2561                 }
2562
2563                 /* Add the missing name entry back to the namespace. */
2564                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2565                                                    cname->ln_name);
2566                 if (unlikely(rc == -ESTALE))
2567                         /* It may happen when the remote object has been
2568                          * removed, but the local MDT is not aware of that. */
2569                         goto lost_parent;
2570
2571                 if (unlikely(rc == -EEXIST)) {
2572                         /* Unfortunately, someone reused the name under the
2573                          * parent by race. So we have to remove the linkEA
2574                          * entry from current child object. It means that the
2575                          * LFSCK cannot recover the system totally back to
2576                          * its original status, but it is necessary to make
2577                          * the current system to be consistent.
2578                          *
2579                          * It also may be because of the LFSCK found some
2580                          * internal status of create operation. Under such
2581                          * case, nothing to be done. */
2582                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2583                                         parent, child, ldata, cname, &tfid);
2584                         if (rc >= 0) {
2585                                 snprintf(info->lti_tmpbuf,
2586                                          sizeof(info->lti_tmpbuf),
2587                                          "-"DFID, PFID(pfid));
2588                                 rc = lfsck_namespace_insert_orphan(env, com,
2589                                         child, info->lti_tmpbuf, "D", NULL);
2590                         }
2591                 }
2592
2593                 GOTO(out, rc);
2594         } /* rc == -ENOENT */
2595
2596         if (rc != 0)
2597                 GOTO(out, rc);
2598
2599         if (!lu_fid_eq(&tfid, cfid)) {
2600                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2601                         *retry = true;
2602
2603                         GOTO(out, rc = 0);
2604                 }
2605
2606                 lfsck_ibits_unlock(lh, LCK_EX);
2607                 /* The name entry references another MDT-object that
2608                  * may be created by the LFSCK for repairing dangling
2609                  * name entry. Try to replace it. */
2610                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2611                                                   &tfid, cname);
2612                 if (rc == 0)
2613                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2614                                                         pfid, lh, type);
2615
2616                 GOTO(out, rc);
2617         }
2618
2619         /* Zero FID may because the remote directroy object has invalid linkEA,
2620          * or lost linkEA. Under such case, the LFSCK on this MDT does not know
2621          * how to repair the inconsistency, but the namespace LFSCK on the MDT
2622          * where its name entry resides may has more information (name, FID) to
2623          * repair such inconsistency. So here, keep the inconsistency to avoid
2624          * some imporper repairing. */
2625         if (fid_is_zero(pfid)) {
2626                 if (unknown)
2627                         *unknown = true;
2628
2629                 GOTO(out, rc = 0);
2630         }
2631
2632         /* The ".." name entry is wrong, update it. */
2633         if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
2634                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2635                         *retry = true;
2636
2637                         GOTO(out, rc = 0);
2638                 }
2639
2640                 *type = LNIT_UNMATCHED_PAIRS;
2641                 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
2642                                                 lfsck_dto2fid(parent), cname);
2643         }
2644
2645         GOTO(out, rc);
2646
2647 out:
2648         if (parent != NULL && !IS_ERR(parent))
2649                 lfsck_object_put(env, parent);
2650
2651         return rc;
2652 }
2653
2654 /**
2655  * Double Scan Directory object for multiple linkEA entries case.
2656  *
2657  * The given @child has multiple linkEA entries. There is at most one linkEA
2658  * entry will be valid, all the others will be removed. Firstly, the function
2659  * will try to find out the linkEA entry for which the name entry exists under
2660  * the given parent (@pfid). If there is no linkEA entry that matches the given
2661  * ".." name entry, then tries to find out the first linkEA entry that both the
2662  * parent and the name entry exist to rebuild a new ".." name entry.
2663  *
2664  * \param[in] env       pointer to the thread context
2665  * \param[in] com       pointer to the lfsck component
2666  * \param[in] child     pointer to the directory to be double scanned
2667  * \param[in] pfid      the FID corresponding to the ".." entry
2668  * \param[in] ldata     pointer to the linkEA data for the given @child
2669  * \param[in,out] lh    ldlm lock handler for the given @child
2670  * \param[out] type     to tell the caller what the inconsistency is
2671  * \param[in] lpf       true if the ".." entry is under lost+found/MDTxxxx/
2672  * \param[in] unknown   set if does not know how to repair the inconsistency
2673  *
2674  * \retval              positive number for repaired cases
2675  * \retval              0 if nothing to be repaired
2676  * \retval              negative error number on failure
2677  */
2678 static int
2679 lfsck_namespace_dsd_multiple(const struct lu_env *env,
2680                              struct lfsck_component *com,
2681                              struct dt_object *child,
2682                              const struct lu_fid *pfid,
2683                              struct linkea_data *ldata,
2684                              struct lustre_handle *lh,
2685                              enum lfsck_namespace_inconsistency_type *type,
2686                              bool lpf, bool *unknown)
2687 {
2688         struct lfsck_thread_info *info          = lfsck_env_info(env);
2689         struct lu_name           *cname         = &info->lti_name;
2690         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2691         struct lu_fid            *pfid2         = &info->lti_fid3;
2692         struct lu_fid             tfid;
2693         struct lfsck_namespace   *ns            = com->lc_file_ram;
2694         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2695         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2696         struct dt_object         *parent        = NULL;
2697         struct linkea_data        ldata_new     = { NULL };
2698         int                       dirent_count  = 0;
2699         int                       rc            = 0;
2700         bool                      once          = true;
2701         ENTRY;
2702
2703 again:
2704         while (ldata->ld_lee != NULL) {
2705                 rc = lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid,
2706                                                          info->lti_key,
2707                                                          sizeof(info->lti_key));
2708                 /* Drop invalid linkEA entry. */
2709                 if (rc != 0) {
2710                         lfsck_linkea_del_buf(ldata, cname);
2711                         continue;
2712                 }
2713
2714                 /* Drop repeated linkEA entries. */
2715                 lfsck_namespace_filter_linkea_entry(ldata, cname, &tfid, true);
2716
2717                 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
2718                  * then it is possible that: the directry object has ever
2719                  * been lost, but its name entry was there. In the former
2720                  * LFSCK run, during the first-stage scanning, the LFSCK
2721                  * found the dangling name entry, but it did not recreate
2722                  * the lost object, and when moved to the second-stage
2723                  * scanning, some children objects of the lost directory
2724                  * object were found, then the LFSCK recreated such lost
2725                  * directory object as an orphan.
2726                  *
2727                  * When the LFSCK runs again, if the dangling name is still
2728                  * there, the LFSCK should move the orphan directory object
2729                  * back to the normal namespace. */
2730                 if (!lpf && !lu_fid_eq(pfid, &tfid) && once) {
2731                         linkea_next_entry(ldata);
2732                         continue;
2733                 }
2734
2735                 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
2736                 if (IS_ERR(parent))
2737                         RETURN(PTR_ERR(parent));
2738
2739                 if (!dt_object_exists(parent)) {
2740                         lfsck_object_put(env, parent);
2741                         if (ldata->ld_leh->leh_reccount > 1) {
2742                                 /* If it is NOT the last linkEA entry, then
2743                                  * there is still other chance to make the
2744                                  * child to be visible via other parent, then
2745                                  * remove this linkEA entry. */
2746                                 lfsck_linkea_del_buf(ldata, cname);
2747                                 continue;
2748                         }
2749
2750                         break;
2751                 }
2752
2753                 /* The linkEA entry with bad parent will be removed. */
2754                 if (unlikely(!dt_try_as_dir(env, parent))) {
2755                         lfsck_object_put(env, parent);
2756                         lfsck_linkea_del_buf(ldata, cname);
2757                         continue;
2758                 }
2759
2760                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2761                                (const struct dt_key *)cname->ln_name);
2762                 *pfid2 = *lfsck_dto2fid(parent);
2763                 if (rc == -ENOENT) {
2764                         lfsck_object_put(env, parent);
2765                         linkea_next_entry(ldata);
2766                         continue;
2767                 }
2768
2769                 if (rc != 0) {
2770                         lfsck_object_put(env, parent);
2771
2772                         RETURN(rc);
2773                 }
2774
2775                 if (lu_fid_eq(&tfid, cfid)) {
2776                         lfsck_object_put(env, parent);
2777                         /* If the parent (that is declared via linkEA entry)
2778                          * directory contains the specified child, but such
2779                          * parent does not match the dotdot name entry, then
2780                          * trust the linkEA. */
2781                         if (!lu_fid_eq(pfid, pfid2)) {
2782                                 *type = LNIT_UNMATCHED_PAIRS;
2783                                 rc = lfsck_namespace_repair_unmatched_pairs(env,
2784                                                 com, child, pfid2, cname);
2785
2786                                 RETURN(rc);
2787                         }
2788
2789 rebuild:
2790                         /* It is the most common case that we find the
2791                          * name entry corresponding to the linkEA entry
2792                          * that matches the ".." name entry. */
2793                         rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
2794                         if (rc != 0)
2795                                 RETURN(rc);
2796
2797                         rc = linkea_add_buf(&ldata_new, cname, pfid2);
2798                         if (rc != 0)
2799                                 RETURN(rc);
2800
2801                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2802                                                             &ldata_new);
2803                         if (rc < 0)
2804                                 RETURN(rc);
2805
2806                         lfsck_linkea_del_buf(ldata, cname);
2807                         linkea_first_entry(ldata);
2808                         /* There may be some invalid dangling name entries under
2809                          * other parent directories, remove all of them. */
2810                         while (ldata->ld_lee != NULL) {
2811                                 rc = lfsck_namespace_unpack_linkea_entry(ldata,
2812                                                 cname, &tfid, info->lti_key,
2813                                                 sizeof(info->lti_key));
2814                                 if (rc != 0)
2815                                         goto next;
2816
2817                                 parent = lfsck_object_find_bottom(env, lfsck,
2818                                                                   &tfid);
2819                                 if (IS_ERR(parent)) {
2820                                         rc = PTR_ERR(parent);
2821                                         if (rc != -ENOENT &&
2822                                             bk->lb_param & LPF_FAILOUT)
2823                                                 RETURN(rc);
2824
2825                                         goto next;
2826                                 }
2827
2828                                 if (!dt_object_exists(parent)) {
2829                                         lfsck_object_put(env, parent);
2830                                         goto next;
2831                                 }
2832
2833                                 rc = lfsck_namespace_repair_dirent(env, com,
2834                                         parent, child, cname->ln_name,
2835                                         cname->ln_name, S_IFDIR, false, true);
2836                                 lfsck_object_put(env, parent);
2837                                 if (rc < 0) {
2838                                         if (bk->lb_param & LPF_FAILOUT)
2839                                                 RETURN(rc);
2840
2841                                         goto next;
2842                                 }
2843
2844                                 dirent_count += rc;
2845
2846 next:
2847                                 lfsck_linkea_del_buf(ldata, cname);
2848                         }
2849
2850                         ns->ln_dirent_repaired += dirent_count;
2851
2852                         RETURN(rc);
2853                 } /* lu_fid_eq(&tfid, lfsck_dto2fid(child)) */
2854
2855                 lfsck_ibits_unlock(lh, LCK_EX);
2856                 /* The name entry references another MDT-object that may be
2857                  * created by the LFSCK for repairing dangling name entry.
2858                  * Try to replace it. */
2859                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2860                                                   &tfid, cname);
2861                 lfsck_object_put(env, parent);
2862                 if (rc < 0)
2863                         RETURN(rc);
2864
2865                 if (rc > 0)
2866                         goto rebuild;
2867
2868                 lfsck_linkea_del_buf(ldata, cname);
2869         } /* while (ldata->ld_lee != NULL) */
2870
2871         linkea_first_entry(ldata);
2872         if (ldata->ld_leh->leh_reccount == 1) {
2873                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
2874                                                 lh, type, NULL, unknown);
2875
2876                 RETURN(rc);
2877         }
2878
2879         /* All linkEA entries are invalid and removed, then handle the @child
2880          * as an orphan.*/
2881         if (ldata->ld_leh->leh_reccount == 0) {
2882                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
2883                                                 type);
2884
2885                 RETURN(rc);
2886         }
2887
2888         /* If the dangling name entry for the orphan directory object has
2889          * been remvoed, then just check whether the directory object is
2890          * still under the .lustre/lost+found/MDTxxxx/ or not. */
2891         if (lpf) {
2892                 lpf = false;
2893                 goto again;
2894         }
2895
2896         /* There is no linkEA entry that matches the ".." name entry. Find
2897          * the first linkEA entry that both parent and name entry exist to
2898          * rebuild a new ".." name entry. */
2899         if (once) {
2900                 once = false;
2901                 goto again;
2902         }
2903
2904         RETURN(rc);
2905 }
2906
2907 /**
2908  * Repair the object's nlink attribute.
2909  *
2910  * If all the known name entries have been verified, then the object's hard
2911  * link attribute should match the object's linkEA entries count unless the
2912  * object's has too much hard link to be recorded in the linkEA. Such cases
2913  * should have been marked in the LFSCK trace file. Otherwise, trust the
2914  * linkEA to update the object's nlink attribute.
2915  *
2916  * \param[in] env       pointer to the thread context
2917  * \param[in] com       pointer to the lfsck component
2918  * \param[in] obj       pointer to the dt_object to be handled
2919  * \param[in,out] la    pointer to buffer to object's attribute before
2920  *                      and after the repairing
2921  *
2922  * \retval              positive number for repaired cases
2923  * \retval              0 if nothing to be repaired
2924  * \retval              negative error number on failure
2925  */
2926 static int lfsck_namespace_repair_nlink(const struct lu_env *env,
2927                                         struct lfsck_component *com,
2928                                         struct dt_object *obj,
2929                                         struct lu_attr *la)
2930 {
2931         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2932         struct lu_fid                   *tfid   = &info->lti_fid3;
2933         struct lfsck_namespace          *ns     = com->lc_file_ram;
2934         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2935         struct dt_device                *dev    = lfsck_obj2dev(obj);
2936         const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
2937         struct thandle                  *th     = NULL;
2938         struct linkea_data               ldata  = { NULL };
2939         struct lustre_handle             lh     = { 0 };
2940         __u32                            old    = la->la_nlink;
2941         int                              idx;
2942         int                              rc     = 0;
2943         __u8                             flags;
2944         ENTRY;
2945
2946         LASSERT(!dt_object_remote(obj));
2947
2948         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
2949                               MDS_INODELOCK_UPDATE, LCK_PW);
2950         if (rc != 0)
2951                 GOTO(log, rc);
2952
2953         th = dt_trans_create(env, dev);
2954         if (IS_ERR(th))
2955                 GOTO(log, rc = PTR_ERR(th));
2956
2957         la->la_valid = LA_NLINK;
2958         rc = dt_declare_attr_set(env, obj, la, th);
2959         if (rc != 0)
2960                 GOTO(stop, rc);
2961
2962         rc = dt_trans_start_local(env, dev, th);
2963         if (rc != 0)
2964                 GOTO(stop, rc);
2965
2966         dt_write_lock(env, obj, 0);
2967         /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2968          * ever tried to verify some remote MDT-object that resides on this
2969          * MDT, but this MDT failed to respond such request. So means there
2970          * may be some remote name entry on other MDT that references this
2971          * object with another name, so we cannot know whether this linkEA
2972          * is valid or not. So keep it there and maybe resolved when next
2973          * LFSCK run. */
2974         if (ns->ln_flags & LF_INCOMPLETE)
2975                 GOTO(unlock, rc = 0);
2976
2977         fid_cpu_to_be(tfid, cfid);
2978         idx = lfsck_sub_trace_file_fid2idx(cfid);
2979         rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj,
2980                        (struct dt_rec *)&flags, (const struct dt_key *)tfid);
2981         if (rc != 0)
2982                 GOTO(unlock, rc);
2983
2984         if (flags & LNTF_SKIP_NLINK)
2985                 GOTO(unlock, rc = 0);
2986
2987         rc = dt_attr_get(env, obj, la);
2988         if (rc != 0)
2989                 GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc));
2990
2991         rc = lfsck_links_read2(env, obj, &ldata);
2992         if (rc != 0)
2993                 GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc));
2994
2995         if (la->la_nlink == ldata.ld_leh->leh_reccount ||
2996             unlikely(la->la_nlink == 0))
2997                 GOTO(unlock, rc = 0);
2998
2999         la->la_nlink = ldata.ld_leh->leh_reccount;
3000         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
3001                 GOTO(unlock, rc = 1);
3002
3003         rc = dt_attr_set(env, obj, la, th);
3004
3005         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
3006
3007 unlock:
3008         dt_write_unlock(env, obj);
3009
3010 stop:
3011         dt_trans_stop(env, dev, th);
3012
3013 log:
3014         lfsck_ibits_unlock(&lh, LCK_PW);
3015         CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
3016                "nlink count from %u to %u: rc = %d\n",
3017                lfsck_lfsck2name(lfsck), PFID(cfid), old, la->la_nlink, rc);
3018
3019         if (rc != 0)
3020                 ns->ln_flags |= LF_INCONSISTENT;
3021
3022         return rc;
3023 }
3024
3025 /**
3026  * Double scan the directory object for namespace LFSCK.
3027  *
3028  * This function will verify the <parent, child> pairs in the namespace tree:
3029  * the parent references the child via some name entry that should be in the
3030  * child's linkEA entry, the child should back references the parent via its
3031  * ".." name entry.
3032  *
3033  * The LFSCK will scan every linkEA entry in turn until find out the first
3034  * matched pairs. If found, then all other linkEA entries will be dropped.
3035  * If all the linkEA entries cannot match the ".." name entry, then there
3036  * are serveral possible cases:
3037  *
3038  * 1) If there is only one linkEA entry, then trust it as long as the PFID
3039  *    in the linkEA entry is valid.
3040  *
3041  * 2) If there are multiple linkEA entries, then try to find the linkEA
3042  *    that matches the ".." name entry. If found, then all other entries
3043  *    are invalid; otherwise, it is quite possible that the ".." name entry
3044  *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
3045  *    entry according to the first valid linkEA entry (both the parent and
3046  *    the name entry should exist).
3047  *
3048  * 3) If the directory object has no (valid) linkEA entry, then the
3049  *    directory object will be handled as pure orphan and inserted
3050  *    in the .lustre/lost+found/MDTxxxx/ with the name:
3051  *    ${self_FID}-${PFID}-D-${conflict_version}
3052  *
3053  * \param[in] env       pointer to the thread context
3054  * \param[in] com       pointer to the lfsck component
3055  * \param[in] child     pointer to the directory object to be handled
3056  * \param[in] flags     to indicate the specical checking on the @child
3057  *
3058  * \retval              positive number for repaired cases
3059  * \retval              0 if nothing to be repaired
3060  * \retval              negative error number on failure
3061  */
3062 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
3063                                            struct lfsck_component *com,
3064                                            struct dt_object *child, __u8 flags)
3065 {
3066         struct lfsck_thread_info *info          = lfsck_env_info(env);
3067         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
3068         struct lu_fid            *pfid          = &info->lti_fid2;
3069         struct lfsck_namespace   *ns            = com->lc_file_ram;
3070         struct lfsck_instance    *lfsck         = com->lc_lfsck;
3071         struct lustre_handle      lh            = { 0 };
3072         struct linkea_data        ldata         = { NULL };
3073         bool                      unknown       = false;
3074         bool                      lpf           = false;
3075         bool                      retry         = false;
3076         enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
3077         int                       rc            = 0;
3078         ENTRY;
3079
3080         LASSERT(!dt_object_remote(child));
3081
3082         if (flags & LNTF_UNCERTAIN_LMV) {
3083                 if (flags & LNTF_RECHECK_NAME_HASH) {
3084                         rc = lfsck_namespace_scan_shard(env, com, child);
3085                         if (rc < 0)
3086                                 RETURN(rc);
3087
3088                         ns->ln_striped_shards_scanned++;
3089                 } else {
3090                         ns->ln_striped_shards_skipped++;
3091                 }
3092         }
3093
3094         flags &= ~(LNTF_RECHECK_NAME_HASH | LNTF_UNCERTAIN_LMV);
3095         if (flags == 0)
3096                 RETURN(0);
3097
3098         if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) &&
3099             !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
3100                 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
3101                        "the namespace LFSCK, then the LFSCK cannot guarantee"
3102                        "all the name entries have been verified in first-stage"
3103                        "scanning. So have to skip orphan related handling for"
3104                        "the directory object "DFID" with remote name entry\n",
3105                        lfsck_lfsck2name(lfsck), PFID(cfid));
3106
3107                 RETURN(0);
3108         }
3109
3110         if (unlikely(!dt_try_as_dir(env, child)))
3111                 GOTO(out, rc = -ENOTDIR);
3112
3113         /* We only take ldlm lock on the @child when required. When the
3114          * logic comes here for the first time, it is always false. */
3115         if (0) {
3116
3117 lock:
3118                 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
3119                                       MDS_INODELOCK_UPDATE |
3120                                       MDS_INODELOCK_XATTR, LCK_EX);
3121                 if (rc != 0)
3122                         GOTO(out, rc);
3123         }
3124
3125         dt_read_lock(env, child, 0);
3126         if (unlikely(lfsck_is_dead_obj(child))) {
3127                 dt_read_unlock(env, child);
3128
3129                 GOTO(out, rc = 0);
3130         }
3131
3132         rc = dt_lookup(env, child, (struct dt_rec *)pfid,
3133                        (const struct dt_key *)dotdot);
3134         if (rc != 0) {
3135                 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
3136                         dt_read_unlock(env, child);
3137
3138                         GOTO(out, rc);
3139                 }
3140
3141                 if (!lustre_handle_is_used(&lh)) {
3142                         dt_read_unlock(env, child);
3143                         goto lock;
3144                 }
3145
3146                 fid_zero(pfid);
3147         } else if (lfsck->li_lpf_obj != NULL &&
3148                    lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
3149                 lpf = true;
3150         } else if (unlikely(!fid_is_sane(pfid))) {
3151                 fid_zero(pfid);
3152         }
3153
3154         rc = lfsck_links_read(env, child, &ldata);
3155         dt_read_unlock(env, child);
3156         if (rc != 0) {
3157                 if (rc != -ENODATA && rc != -EINVAL)
3158                         GOTO(out, rc);
3159
3160                 if (!lustre_handle_is_used(&lh))
3161                         goto lock;
3162
3163                 if (rc == -EINVAL && !fid_is_zero(pfid)) {
3164                         /* Remove the corrupted linkEA. */
3165                         rc = lfsck_namespace_links_remove(env, com, child);
3166                         if (rc == 0)
3167                                 /* Here, because of the crashed linkEA, we
3168                                  * cannot know whether there is some parent
3169                                  * that references the child directory via
3170                                  * some name entry or not. So keep it there,
3171                                  * when the LFSCK run next time, if there is
3172                                  * some parent that references this object,
3173                                  * then the LFSCK can rebuild the linkEA;
3174                                  * otherwise, this object will be handled
3175                                  * as orphan as above. */
3176                                 unknown = true;
3177                 } else {
3178                         /* 1. If we have neither ".." nor linkEA,
3179                          *    then it is an orphan.
3180                          *
3181                          * 2. If we only have the ".." name entry,
3182                          *    but no parent references this child
3183                          *    directory, then handle it as orphan. */
3184                         lfsck_ibits_unlock(&lh, LCK_EX);
3185                         type = LNIT_MUL_REF;
3186
3187                         /* If the LFSCK is marked as LF_INCOMPLETE,
3188                          * then means some MDT has ever tried to
3189                          * verify some remote MDT-object that resides
3190                          * on this MDT, but this MDT failed to respond
3191                          * such request. So means there may be some
3192                          * remote name entry on other MDT that
3193                          * references this object with another name,
3194                          * so we cannot know whether this linkEA is
3195                          * valid or not. So keep it there and maybe
3196                          * resolved when next LFSCK run. */
3197                         if (ns->ln_flags & LF_INCOMPLETE)
3198                                 GOTO(out, rc = 0);
3199
3200                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
3201                                  "-"DFID, PFID(pfid));
3202                         rc = lfsck_namespace_insert_orphan(env, com, child,
3203                                                 info->lti_tmpbuf, "D", NULL);
3204                 }
3205
3206                 GOTO(out, rc);
3207         } /* rc != 0 */
3208
3209         linkea_first_entry(&ldata);
3210         /* This is the most common case: the object has unique linkEA entry. */
3211         if (ldata.ld_leh->leh_reccount == 1) {
3212                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
3213                                                 &lh, &type, &retry, &unknown);
3214                 if (retry) {
3215                         LASSERT(!lustre_handle_is_used(&lh));
3216
3217                         retry = false;
3218                         goto lock;
3219                 }
3220
3221                 GOTO(out, rc);
3222         }
3223
3224         if (!lustre_handle_is_used(&lh))
3225                 goto lock;
3226
3227         if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
3228                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
3229                                                 &type);
3230
3231                 GOTO(out, rc);
3232         }
3233
3234         /* When we come here, the cases usually like that:
3235          * 1) The directory object has a corrupted linkEA entry. During the
3236          *    first-stage scanning, the LFSCK cannot know such corruption,
3237          *    then it appends the right linkEA entry according to the found
3238          *    name entry after the bad one.
3239          *
3240          * 2) The directory object has a right linkEA entry. During the
3241          *    first-stage scanning, the LFSCK finds some bad name entry,
3242          *    but the LFSCK cannot aware that at that time, then it adds
3243          *    the bad linkEA entry for further processing. */
3244         rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
3245                                           &lh, &type, lpf, &unknown);
3246
3247         GOTO(out, rc);
3248
3249 out:
3250         lfsck_ibits_unlock(&lh, LCK_EX);
3251         if (rc > 0) {
3252                 switch (type) {
3253                 case LNIT_BAD_LINKEA:
3254                         ns->ln_linkea_repaired++;
3255                         break;
3256                 case LNIT_UNMATCHED_PAIRS:
3257                         ns->ln_unmatched_pairs_repaired++;
3258                         break;
3259                 case LNIT_MUL_REF:
3260                         ns->ln_mul_ref_repaired++;
3261                         break;
3262                 default:
3263                         break;
3264                 }
3265         }
3266
3267         if (unknown)
3268                 ns->ln_unknown_inconsistency++;
3269
3270         return rc;
3271 }
3272
3273 /**
3274  * Double scan the MDT-object for namespace LFSCK.
3275  *
3276  * If the MDT-object contains invalid or repeated linkEA entries, then drop
3277  * those entries from the linkEA; if the linkEA becomes empty or the object
3278  * has no linkEA, then it is an orphan and will be added into the directory
3279  * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
3280  * the remote parent; if the name entry corresponding to some linkEA entry
3281  * is lost, then add the name entry back to the namespace.
3282  *
3283  * \param[in] env       pointer to the thread context
3284  * \param[in] com       pointer to the lfsck component
3285  * \param[in] child     pointer to the dt_object to be handled
3286  * \param[in] flags     some hints to indicate how the @child should be handled
3287  *
3288  * \retval              positive number for repaired cases
3289  * \retval              0 if nothing to be repaired
3290  * \retval              negative error number on failure
3291  */
3292 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
3293                                            struct lfsck_component *com,
3294                                            struct dt_object *child, __u8 flags)
3295 {
3296         struct lfsck_thread_info *info     = lfsck_env_info(env);
3297         struct lu_attr           *la       = &info->lti_la;
3298         struct lu_name           *cname    = &info->lti_name;
3299         struct lu_fid            *pfid     = &info->lti_fid;
3300         struct lu_fid            *cfid     = &info->lti_fid2;
3301         struct lfsck_instance    *lfsck    = com->lc_lfsck;
3302         struct lfsck_namespace   *ns       = com->lc_file_ram;
3303         struct dt_object         *parent   = NULL;
3304         struct linkea_data        ldata    = { NULL };
3305         bool                      repaired = false;
3306         int                       count    = 0;
3307         int                       rc;
3308         ENTRY;
3309
3310         dt_read_lock(env, child, 0);
3311         if (unlikely(lfsck_is_dead_obj(child))) {
3312                 dt_read_unlock(env, child);
3313
3314                 RETURN(0);
3315         }
3316
3317         if (S_ISDIR(lfsck_object_type(child))) {
3318                 dt_read_unlock(env, child);
3319                 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
3320
3321                 RETURN(rc);
3322         }
3323
3324         rc = lfsck_links_read(env, child, &ldata);
3325         dt_read_unlock(env, child);
3326
3327         if (rc == -EINVAL) {
3328                 struct lustre_handle lh = { 0 };
3329
3330                 rc = lfsck_ibits_lock(env, com->lc_lfsck, child, &lh,
3331                                       MDS_INODELOCK_UPDATE |
3332                                       MDS_INODELOCK_XATTR, LCK_EX);
3333                 if (rc == 0) {
3334                         rc = lfsck_namespace_links_remove(env, com, child);
3335                         lfsck_ibits_unlock(&lh, LCK_EX);
3336                 }
3337
3338                 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
3339         }
3340
3341         if (rc != 0)
3342                 GOTO(out, rc);
3343
3344         linkea_first_entry(&ldata);
3345         while (ldata.ld_lee != NULL) {
3346                 rc = lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
3347                                                          info->lti_key,
3348                                                          sizeof(info->lti_key));
3349                 /* Invalid PFID in the linkEA entry. */
3350                 if (rc != 0) {
3351                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3352                                                 &ldata, cname, pfid, true);
3353                         if (rc < 0)
3354                                 GOTO(out, rc);
3355
3356                         if (rc > 0)
3357                                 repaired = true;
3358
3359                         continue;
3360                 }
3361
3362                 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
3363                                                          false);
3364                 /* Found repeated linkEA entries */
3365                 if (rc > 0) {
3366                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3367                                                 &ldata, cname, pfid, false);
3368                         if (rc < 0)
3369                                 GOTO(out, rc);
3370
3371                         if (rc == 0)
3372                                 continue;
3373
3374                         repaired = true;
3375
3376                         /* fall through */
3377                 }
3378
3379                 parent = lfsck_object_find_bottom(env, lfsck, pfid);
3380                 if (IS_ERR(parent))
3381                         GOTO(out, rc = PTR_ERR(parent));
3382
3383                 if (!dt_object_exists(parent)) {
3384
3385 lost_parent:
3386                         if (ldata.ld_leh->leh_reccount > 1) {
3387                                 /* If it is NOT the last linkEA entry, then
3388                                  * there is still other chance to make the
3389                                  * child to be visible via other parent, then
3390                                  * remove this linkEA entry. */
3391                                 rc = lfsck_namespace_shrink_linkea(env, com,
3392                                         child, &ldata, cname, pfid, true);
3393                         } else {
3394                                 /* If the LFSCK is marked as LF_INCOMPLETE,
3395                                  * then means some MDT has ever tried to
3396                                  * verify some remote MDT-object that resides
3397                                  * on this MDT, but this MDT failed to respond
3398                                  * such request. So means there may be some
3399                                  * remote name entry on other MDT that
3400                                  * references this object with another name,
3401                                  * so we cannot know whether this linkEA is
3402                                  * valid or not. So keep it there and maybe
3403                                  * resolved when next LFSCK run. */
3404                                 if (ns->ln_flags & LF_INCOMPLETE) {
3405                                         lfsck_object_put(env, parent);
3406
3407                                         GOTO(out, rc = 0);
3408                                 }
3409
3410                                 /* Create the lost parent as an orphan. */
3411                                 rc = lfsck_namespace_create_orphan_dir(env, com,
3412                                                                 parent, NULL);
3413                                 if (rc < 0) {
3414                                         lfsck_object_put(env, parent);
3415
3416                                         GOTO(out, rc);
3417                                 }
3418
3419                                 if (rc > 0)
3420                                         repaired = true;
3421
3422                                 /* Add the missing name entry to the parent. */
3423                                 rc = lfsck_namespace_insert_normal(env, com,
3424                                                 parent, child, cname->ln_name);
3425                                 if (unlikely(rc == -EEXIST))
3426                                         /* Unfortunately, someone reused the
3427                                          * name under the parent by race. So we
3428                                          * have to remove the linkEA entry from
3429                                          * current child object. It means that
3430                                          * the LFSCK cannot recover the system
3431                                          * totally back to its original status,
3432                                          * but it is necessary to make the
3433                                          * current system to be consistent. */
3434                                         rc = lfsck_namespace_shrink_linkea(env,
3435                                                         com, child, &ldata,
3436                                                         cname, pfid, true);
3437                                 else
3438                                         linkea_next_entry(&ldata);
3439                         }
3440
3441                         lfsck_object_put(env, parent);