Whamcloud - gitweb
LU-5707 lfsck: store namespace LFSCK statistics info in new EA
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 enum lfsck_nameentry_check {
47         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
48         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
49         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
50 };
51
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
53
54 static struct lfsck_namespace_req *
55 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
56                                    struct lu_dirent *ent, __u16 type)
57 {
58         struct lfsck_namespace_req *lnr;
59         int                         size;
60
61         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
62         OBD_ALLOC(lnr, size);
63         if (lnr == NULL)
64                 return ERR_PTR(-ENOMEM);
65
66         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
67         lnr->lnr_obj = lfsck_object_get(lfsck->li_obj_dir);
68         lnr->lnr_lmv = lfsck_lmv_get(lfsck->li_lmv);
69         lnr->lnr_fid = ent->lde_fid;
70         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
71         lnr->lnr_dir_cookie = ent->lde_hash;
72         lnr->lnr_attr = ent->lde_attrs;
73         lnr->lnr_size = size;
74         lnr->lnr_type = type;
75         lnr->lnr_namelen = ent->lde_namelen;
76         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
77
78         return lnr;
79 }
80
81 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
82                                                struct lfsck_assistant_req *lar)
83 {
84         struct lfsck_namespace_req *lnr =
85                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
86
87         if (lnr->lnr_lmv != NULL)
88                 lfsck_lmv_put(env, lnr->lnr_lmv);
89
90         lu_object_put(env, &lnr->lnr_obj->do_lu);
91         OBD_FREE(lnr, lnr->lnr_size);
92 }
93
94 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
95                                       struct lfsck_namespace *src)
96 {
97         dst->ln_magic = le32_to_cpu(src->ln_magic);
98         dst->ln_status = le32_to_cpu(src->ln_status);
99         dst->ln_flags = le32_to_cpu(src->ln_flags);
100         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
101         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
102         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
103         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
104         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
105         dst->ln_time_last_checkpoint =
106                                 le64_to_cpu(src->ln_time_last_checkpoint);
107         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
108                                  &src->ln_pos_latest_start);
109         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
110                                  &src->ln_pos_last_checkpoint);
111         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
112                                  &src->ln_pos_first_inconsistent);
113         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
114         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
115         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
116         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
117         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
118         dst->ln_objs_repaired_phase2 =
119                                 le64_to_cpu(src->ln_objs_repaired_phase2);
120         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
121         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
122         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
123                       &src->ln_fid_latest_scanned_phase2);
124         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
125         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
126         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
127         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
128         dst->ln_unknown_inconsistency =
129                                 le64_to_cpu(src->ln_unknown_inconsistency);
130         dst->ln_unmatched_pairs_repaired =
131                                 le64_to_cpu(src->ln_unmatched_pairs_repaired);
132         dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
133         dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
134         dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
135         dst->ln_lost_dirent_repaired =
136                                 le64_to_cpu(src->ln_lost_dirent_repaired);
137         dst->ln_striped_dirs_scanned =
138                                 le64_to_cpu(src->ln_striped_dirs_scanned);
139         dst->ln_striped_dirs_repaired =
140                                 le64_to_cpu(src->ln_striped_dirs_repaired);
141         dst->ln_striped_dirs_failed =
142                                 le64_to_cpu(src->ln_striped_dirs_failed);
143         dst->ln_striped_dirs_disabled =
144                                 le64_to_cpu(src->ln_striped_dirs_disabled);
145         dst->ln_striped_dirs_skipped =
146                                 le64_to_cpu(src->ln_striped_dirs_skipped);
147         dst->ln_striped_shards_scanned =
148                                 le64_to_cpu(src->ln_striped_shards_scanned);
149         dst->ln_striped_shards_repaired =
150                                 le64_to_cpu(src->ln_striped_shards_repaired);
151         dst->ln_striped_shards_failed =
152                                 le64_to_cpu(src->ln_striped_shards_failed);
153         dst->ln_striped_shards_skipped =
154                                 le64_to_cpu(src->ln_striped_shards_skipped);
155         dst->ln_name_hash_repaired = le64_to_cpu(src->ln_name_hash_repaired);
156         dst->ln_local_lpf_scanned = le64_to_cpu(src->ln_local_lpf_scanned);
157         dst->ln_local_lpf_moved = le64_to_cpu(src->ln_local_lpf_moved);
158         dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped);
159         dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed);
160         dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size);
161 }
162
163 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
164                                       struct lfsck_namespace *src)
165 {
166         dst->ln_magic = cpu_to_le32(src->ln_magic);
167         dst->ln_status = cpu_to_le32(src->ln_status);
168         dst->ln_flags = cpu_to_le32(src->ln_flags);
169         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
170         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
171         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
172         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
173         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
174         dst->ln_time_last_checkpoint =
175                                 cpu_to_le64(src->ln_time_last_checkpoint);
176         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
177                                  &src->ln_pos_latest_start);
178         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
179                                  &src->ln_pos_last_checkpoint);
180         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
181                                  &src->ln_pos_first_inconsistent);
182         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
183         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
184         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
185         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
186         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
187         dst->ln_objs_repaired_phase2 =
188                                 cpu_to_le64(src->ln_objs_repaired_phase2);
189         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
190         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
191         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
192                       &src->ln_fid_latest_scanned_phase2);
193         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
194         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
195         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
196         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
197         dst->ln_unknown_inconsistency =
198                                 cpu_to_le64(src->ln_unknown_inconsistency);
199         dst->ln_unmatched_pairs_repaired =
200                                 cpu_to_le64(src->ln_unmatched_pairs_repaired);
201         dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
202         dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
203         dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
204         dst->ln_lost_dirent_repaired =
205                                 cpu_to_le64(src->ln_lost_dirent_repaired);
206         dst->ln_striped_dirs_scanned =
207                                 cpu_to_le64(src->ln_striped_dirs_scanned);
208         dst->ln_striped_dirs_repaired =
209                                 cpu_to_le64(src->ln_striped_dirs_repaired);
210         dst->ln_striped_dirs_failed =
211                                 cpu_to_le64(src->ln_striped_dirs_failed);
212         dst->ln_striped_dirs_disabled =
213                                 cpu_to_le64(src->ln_striped_dirs_disabled);
214         dst->ln_striped_dirs_skipped =
215                                 cpu_to_le64(src->ln_striped_dirs_skipped);
216         dst->ln_striped_shards_scanned =
217                                 cpu_to_le64(src->ln_striped_shards_scanned);
218         dst->ln_striped_shards_repaired =
219                                 cpu_to_le64(src->ln_striped_shards_repaired);
220         dst->ln_striped_shards_failed =
221                                 cpu_to_le64(src->ln_striped_shards_failed);
222         dst->ln_striped_shards_skipped =
223                                 cpu_to_le64(src->ln_striped_shards_skipped);
224         dst->ln_name_hash_repaired = cpu_to_le64(src->ln_name_hash_repaired);
225         dst->ln_local_lpf_scanned = cpu_to_le64(src->ln_local_lpf_scanned);
226         dst->ln_local_lpf_moved = cpu_to_le64(src->ln_local_lpf_moved);
227         dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped);
228         dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed);
229         dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size);
230 }
231
232 static void lfsck_namespace_record_failure(const struct lu_env *env,
233                                            struct lfsck_instance *lfsck,
234                                            struct lfsck_namespace *ns)
235 {
236         struct lfsck_position pos;
237
238         ns->ln_items_failed++;
239         lfsck_pos_fill(env, lfsck, &pos, false);
240         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
241             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
242                 ns->ln_pos_first_inconsistent = pos;
243
244                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
245                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
246                        lfsck_lfsck2name(lfsck),
247                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
248                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
249                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
250         }
251 }
252
253 /**
254  * Load the MDT bitmap from the lfsck_namespace trace file.
255  *
256  * \param[in] env       pointer to the thread context
257  * \param[in] com       pointer to the lfsck component
258  *
259  * \retval              0 for success
260  * \retval              negative error number on failure or data corruption
261  */
262 static int lfsck_namespace_load_bitmap(const struct lu_env *env,
263                                        struct lfsck_component *com)
264 {
265         struct dt_object                *obj    = com->lc_obj;
266         struct lfsck_assistant_data     *lad    = com->lc_data;
267         struct lfsck_namespace          *ns     = com->lc_file_ram;
268         cfs_bitmap_t                    *bitmap = lad->lad_bitmap;
269         ssize_t                          size;
270         __u32                            nbits;
271         int                              rc;
272         ENTRY;
273
274         if (com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size >
275             ns->ln_bitmap_size)
276                 nbits = com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size;
277         else
278                 nbits = ns->ln_bitmap_size;
279
280         if (unlikely(nbits < BITS_PER_LONG))
281                 nbits = BITS_PER_LONG;
282
283         if (nbits > bitmap->size) {
284                 __u32 new_bits = bitmap->size;
285                 cfs_bitmap_t *new_bitmap;
286
287                 while (new_bits < nbits)
288                         new_bits <<= 1;
289
290                 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
291                 if (new_bitmap == NULL)
292                         RETURN(-ENOMEM);
293
294                 lad->lad_bitmap = new_bitmap;
295                 CFS_FREE_BITMAP(bitmap);
296                 bitmap = new_bitmap;
297         }
298
299         if (ns->ln_bitmap_size == 0) {
300                 lad->lad_incomplete = 0;
301                 CFS_RESET_BITMAP(bitmap);
302
303                 RETURN(0);
304         }
305
306         size = (ns->ln_bitmap_size + 7) >> 3;
307         rc = dt_xattr_get(env, obj,
308                           lfsck_buf_get(env, bitmap->data, size),
309                           XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA);
310         if (rc != size)
311                 RETURN(rc >= 0 ? -EINVAL : rc);
312
313         if (cfs_bitmap_check_empty(bitmap))
314                 lad->lad_incomplete = 0;
315         else
316                 lad->lad_incomplete = 1;
317
318         RETURN(0);
319 }
320
321 /**
322  * Load namespace LFSCK statistics information from the trace file.
323  *
324  * For old release (Lustre-2.6 or older), the statistics information was
325  * stored as XATTR_NAME_LFSCK_NAMESPACE_OLD EA. But in Lustre-2.7, we need
326  * more statistics information. To avoid confusing old MDT when downgrade,
327  * Lustre-2.7 stores the namespace LFSCK statistics information as new
328  * XATTR_NAME_LFSCK_NAMESPACE EA.
329  *
330  * \param[in] env       pointer to the thread context
331  * \param[in] com       pointer to the lfsck component
332  *
333  * \retval              0 for success
334  * \retval              negative error number on failure
335  */
336 static int lfsck_namespace_load(const struct lu_env *env,
337                                 struct lfsck_component *com)
338 {
339         int len = com->lc_file_size;
340         int rc;
341
342         rc = dt_xattr_get(env, com->lc_obj,
343                           lfsck_buf_get(env, com->lc_file_disk, len),
344                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
345         if (rc == len) {
346                 struct lfsck_namespace *ns = com->lc_file_ram;
347
348                 lfsck_namespace_le_to_cpu(ns,
349                                 (struct lfsck_namespace *)com->lc_file_disk);
350                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
351                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
352                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
353                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
354                         rc = -ESTALE;
355                 } else {
356                         rc = 0;
357                 }
358         } else if (rc != -ENODATA) {
359                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
360                        "expected = %d: rc = %d\n",
361                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
362                 if (rc >= 0)
363                         rc = -ESTALE;
364         } else {
365                 /* Check whether it is old trace file or not.
366                  * If yes, it should be reset via returning -ESTALE. */
367                 rc = dt_xattr_get(env, com->lc_obj,
368                                   lfsck_buf_get(env, com->lc_file_disk, len),
369                                   XATTR_NAME_LFSCK_NAMESPACE_OLD, BYPASS_CAPA);
370                 if (rc >= 0)
371                         rc = -ESTALE;
372         }
373
374         return rc;
375 }
376
377 static int lfsck_namespace_store(const struct lu_env *env,
378                                  struct lfsck_component *com, bool init)
379 {
380         struct dt_object                *obj    = com->lc_obj;
381         struct lfsck_instance           *lfsck  = com->lc_lfsck;
382         struct lfsck_namespace          *ns     = com->lc_file_ram;
383         struct lfsck_assistant_data     *lad    = com->lc_data;
384         cfs_bitmap_t                    *bitmap = NULL;
385         struct thandle                  *handle;
386         __u32                            nbits  = 0;
387         int                              len    = com->lc_file_size;
388         int                              rc;
389 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
390         struct lu_buf            tbuf   = { &len, sizeof(len) };
391 #endif
392         ENTRY;
393
394         if (lad != NULL) {
395                 bitmap = lad->lad_bitmap;
396                 nbits = bitmap->size;
397
398                 LASSERT(nbits > 0);
399                 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
400         }
401
402         ns->ln_bitmap_size = nbits;
403         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
404                                   ns);
405         handle = dt_trans_create(env, lfsck->li_bottom);
406         if (IS_ERR(handle))
407                 GOTO(log, rc = PTR_ERR(handle));
408
409         rc = dt_declare_xattr_set(env, obj,
410                                   lfsck_buf_get(env, com->lc_file_disk, len),
411                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
412         if (rc != 0)
413                 GOTO(out, rc);
414
415         if (bitmap != NULL) {
416                 rc = dt_declare_xattr_set(env, obj,
417                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
418                                 XATTR_NAME_LFSCK_BITMAP, 0, handle);
419                 if (rc != 0)
420                         GOTO(out, rc);
421         }
422
423 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
424         /* To be compatible with old Lustre-2.x MDT (x <= 6), generate dummy
425          * XATTR_NAME_LFSCK_NAMESPACE_OLD EA, then when downgrade to Lustre-2.x,
426          * the old LFSCK will find "invalid" XATTR_NAME_LFSCK_NAMESPACE_OLD EA,
427          * then reset the namespace LFSCK trace file. */
428         if (init) {
429                 rc = dt_declare_xattr_set(env, obj, &tbuf,
430                                           XATTR_NAME_LFSCK_NAMESPACE_OLD,
431                                           LU_XATTR_CREATE, handle);
432                 if (rc != 0)
433                         GOTO(out, rc);
434         }
435 #endif
436
437         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
438         if (rc != 0)
439                 GOTO(out, rc);
440
441         rc = dt_xattr_set(env, obj,
442                           lfsck_buf_get(env, com->lc_file_disk, len),
443                           XATTR_NAME_LFSCK_NAMESPACE, 0, handle, BYPASS_CAPA);
444         if (rc == 0 && bitmap != NULL)
445                 rc = dt_xattr_set(env, obj,
446                                   lfsck_buf_get(env, bitmap->data, nbits >> 3),
447                                   XATTR_NAME_LFSCK_BITMAP, 0, handle,
448                                   BYPASS_CAPA);
449
450 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
451         if (rc == 0 && init)
452                 rc = dt_xattr_set(env, obj, &tbuf,
453                                   XATTR_NAME_LFSCK_NAMESPACE_OLD,
454                                   LU_XATTR_CREATE, handle, BYPASS_CAPA);
455 #endif
456
457         GOTO(out, rc);
458
459 out:
460         dt_trans_stop(env, lfsck->li_bottom, handle);
461
462 log:
463         if (rc != 0)
464                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
465                        lfsck_lfsck2name(lfsck), rc);
466         return rc;
467 }
468
469 static int lfsck_namespace_init(const struct lu_env *env,
470                                 struct lfsck_component *com)
471 {
472         struct lfsck_namespace *ns = com->lc_file_ram;
473         int rc;
474
475         memset(ns, 0, sizeof(*ns));
476         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
477         ns->ln_status = LS_INIT;
478         down_write(&com->lc_sem);
479         rc = lfsck_namespace_store(env, com, true);
480         up_write(&com->lc_sem);
481         return rc;
482 }
483
484 /**
485  * Update the namespace LFSCK trace file for the given @fid
486  *
487  * \param[in] env       pointer to the thread context
488  * \param[in] com       pointer to the lfsck component
489  * \param[in] fid       the fid which flags to be updated in the lfsck
490  *                      trace file
491  * \param[in] add       true if add new flags, otherwise remove flags
492  *
493  * \retval              0 for succeed or nothing to be done
494  * \retval              negative error number on failure
495  */
496 int lfsck_namespace_trace_update(const struct lu_env *env,
497                                  struct lfsck_component *com,
498                                  const struct lu_fid *fid,
499                                  const __u8 flags, bool add)
500 {
501         struct lfsck_instance   *lfsck  = com->lc_lfsck;
502         struct dt_object        *obj    = com->lc_obj;
503         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
504         struct dt_device        *dev    = lfsck->li_bottom;
505         struct thandle          *th     = NULL;
506         int                      rc     = 0;
507         __u8                     old    = 0;
508         __u8                     new    = 0;
509         ENTRY;
510
511         LASSERT(flags != 0);
512
513         down_write(&com->lc_sem);
514         fid_cpu_to_be(key, fid);
515         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
516                        (const struct dt_key *)key, BYPASS_CAPA);
517         if (rc == -ENOENT) {
518                 if (!add)
519                         GOTO(unlock, rc = 0);
520
521                 old = 0;
522                 new = flags;
523         } else if (rc == 0) {
524                 if (add) {
525                         if ((old & flags) == flags)
526                                 GOTO(unlock, rc = 0);
527
528                         new = old | flags;
529                 } else {
530                         if ((old & flags) == 0)
531                                 GOTO(unlock, rc = 0);
532
533                         new = old & ~flags;
534                 }
535         } else {
536                 GOTO(log, rc);
537         }
538
539         th = dt_trans_create(env, dev);
540         if (IS_ERR(th))
541                 GOTO(log, rc = PTR_ERR(th));
542
543         if (old != 0) {
544                 rc = dt_declare_delete(env, obj,
545                                        (const struct dt_key *)key, th);
546                 if (rc != 0)
547                         GOTO(log, rc);
548         }
549
550         if (new != 0) {
551                 rc = dt_declare_insert(env, obj,
552                                        (const struct dt_rec *)&new,
553                                        (const struct dt_key *)key, th);
554                 if (rc != 0)
555                         GOTO(log, rc);
556         }
557
558         rc = dt_trans_start_local(env, dev, th);
559         if (rc != 0)
560                 GOTO(log, rc);
561
562         if (old != 0) {
563                 rc = dt_delete(env, obj, (const struct dt_key *)key,
564                                th, BYPASS_CAPA);
565                 if (rc != 0)
566                         GOTO(log, rc);
567         }
568
569         if (new != 0) {
570                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
571                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
572                 if (rc != 0)
573                         GOTO(log, rc);
574         }
575
576         GOTO(log, rc);
577
578 log:
579         if (th != NULL && !IS_ERR(th))
580                 dt_trans_stop(env, dev, th);
581
582         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
583                "trace file, flags %x, old %x, new %x: rc = %d\n",
584                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
585                (__u32)flags, (__u32)old, (__u32)new, rc);
586
587 unlock:
588         up_write(&com->lc_sem);
589
590         return rc;
591 }
592
593 int lfsck_namespace_check_exist(const struct lu_env *env,
594                                 struct dt_object *dir,
595                                 struct dt_object *obj, const char *name)
596 {
597         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
598         int               rc;
599         ENTRY;
600
601         if (unlikely(lfsck_is_dead_obj(obj)))
602                 RETURN(LFSCK_NAMEENTRY_DEAD);
603
604         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
605                        (const struct dt_key *)name, BYPASS_CAPA);
606         if (rc == -ENOENT)
607                 RETURN(LFSCK_NAMEENTRY_REMOVED);
608
609         if (rc < 0)
610                 RETURN(rc);
611
612         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
613                 RETURN(LFSCK_NAMEENTRY_RECREATED);
614
615         RETURN(0);
616 }
617
618 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
619                                             struct dt_object *obj,
620                                             struct thandle *handle)
621 {
622         int rc;
623
624         /* For destroying all invalid linkEA entries. */
625         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
626         if (rc != 0)
627                 return rc;
628
629         /* For insert new linkEA entry. */
630         rc = dt_declare_xattr_set(env, obj,
631                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
632                         XATTR_NAME_LINK, 0, handle);
633         return rc;
634 }
635
636 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
637                        struct linkea_data *ldata)
638 {
639         int rc;
640
641         if (ldata->ld_buf->lb_buf == NULL)
642                 return -ENOMEM;
643
644         if (!dt_object_exists(obj))
645                 return -ENOENT;
646
647         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
648         if (rc == -ERANGE) {
649                 /* Buf was too small, figure out what we need. */
650                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
651                                   BYPASS_CAPA);
652                 if (rc <= 0)
653                         return rc;
654
655                 lu_buf_realloc(ldata->ld_buf, rc);
656                 if (ldata->ld_buf->lb_buf == NULL)
657                         return -ENOMEM;
658
659                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
660                                   BYPASS_CAPA);
661         }
662
663         if (rc > 0)
664                 rc = linkea_init(ldata);
665
666         return rc;
667 }
668
669 /**
670  * Remove linkEA for the given object.
671  *
672  * The caller should take the ldlm lock before the calling.
673  *
674  * \param[in] env       pointer to the thread context
675  * \param[in] com       pointer to the lfsck component
676  * \param[in] obj       pointer to the dt_object to be handled
677  *
678  * \retval              0 for repaired cases
679  * \retval              negative error number on failure
680  */
681 static int lfsck_namespace_links_remove(const struct lu_env *env,
682                                         struct lfsck_component *com,
683                                         struct dt_object *obj)
684 {
685         struct lfsck_instance           *lfsck  = com->lc_lfsck;
686         struct dt_device                *dev    = lfsck->li_bottom;
687         struct thandle                  *th     = NULL;
688         int                              rc     = 0;
689         ENTRY;
690
691         LASSERT(dt_object_remote(obj) == 0);
692
693         th = dt_trans_create(env, dev);
694         if (IS_ERR(th))
695                 GOTO(log, rc = PTR_ERR(th));
696
697         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
698         if (rc != 0)
699                 GOTO(stop, rc);
700
701         rc = dt_trans_start_local(env, dev, th);
702         if (rc != 0)
703                 GOTO(stop, rc);
704
705         dt_write_lock(env, obj, 0);
706         if (unlikely(lfsck_is_dead_obj(obj)))
707                 GOTO(unlock, rc = -ENOENT);
708
709         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
710                 GOTO(unlock, rc = 0);
711
712         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
713
714         GOTO(unlock, rc);
715
716 unlock:
717         dt_write_unlock(env, obj);
718
719 stop:
720         dt_trans_stop(env, dev, th);
721
722 log:
723         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
724                "for the object "DFID": rc = %d\n",
725                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
726
727         if (rc == 0) {
728                 struct lfsck_namespace *ns = com->lc_file_ram;
729
730                 ns->ln_flags |= LF_INCONSISTENT;
731         }
732
733         return rc;
734 }
735
736 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
737                              struct linkea_data *ldata, struct thandle *handle)
738 {
739         const struct lu_buf *buf = lfsck_buf_get_const(env,
740                                                        ldata->ld_buf->lb_buf,
741                                                        ldata->ld_leh->leh_len);
742
743         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
744                             BYPASS_CAPA);
745 }
746
747 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
748                                                 struct lu_name *cname,
749                                                 struct lu_fid *pfid,
750                                                 char *buf)
751 {
752         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
753         /* To guarantee the 'name' is terminated with '0'. */
754         memcpy(buf, cname->ln_name, cname->ln_namelen);
755         buf[cname->ln_namelen] = 0;
756         cname->ln_name = buf;
757 }
758
759 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
760                                                struct lu_name *cname,
761                                                struct lu_fid *pfid,
762                                                bool remove)
763 {
764         struct link_ea_entry    *oldlee;
765         int                      oldlen;
766         int                      repeated = 0;
767
768         oldlee = ldata->ld_lee;
769         oldlen = ldata->ld_reclen;
770         linkea_next_entry(ldata);
771         while (ldata->ld_lee != NULL) {
772                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
773                                    ldata->ld_lee->lee_reclen[1];
774                 if (unlikely(ldata->ld_reclen == oldlen &&
775                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
776                         repeated++;
777                         if (!remove)
778                                 break;
779
780                         linkea_del_buf(ldata, cname);
781                 } else {
782                         linkea_next_entry(ldata);
783                 }
784         }
785         ldata->ld_lee = oldlee;
786         ldata->ld_reclen = oldlen;
787
788         return repeated;
789 }
790
791 /**
792  * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
793  *
794  * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
795  * with the given type to generate the name, the detailed rules for name
796  * have been described as following.
797  *
798  * The function also generates the linkEA corresponding to the name entry
799  * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
800  *
801  * \param[in] env       pointer to the thread context
802  * \param[in] com       pointer to the lfsck component
803  * \param[in] orphan    pointer to the orphan MDT-object
804  * \param[in] infix     additional information for the orphan name, such as
805  *                      the FID for original
806  * \param[in] type      the type for describing why the orphan MDT-object is
807  *                      created. The rules are as following:
808  *
809  *  type "D":           The MDT-object is a directory, it may knows its parent
810  *                      but because there is no valid linkEA, the LFSCK cannot
811  *                      know where to put it back to the namespace.
812  *  type "O":           The MDT-object has no linkEA, and there is no name
813  *                      entry that references the MDT-object.
814  *
815  *  type "S":           The orphan MDT-object is a shard of a striped directory
816  *
817  * \see lfsck_layout_recreate_parent() for more types.
818  *
819  * The orphan name will be like:
820  * ${FID}-${infix}-${type}-${conflict_version}
821  *
822  * \param[out] count    if some others inserted some linkEA entries by race,
823  *                      then return the linkEA entries count.
824  *
825  * \retval              positive number for repaired cases
826  * \retval              0 if needs to repair nothing
827  * \retval              negative error number on failure
828  */
829 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
830                                          struct lfsck_component *com,
831                                          struct dt_object *orphan,
832                                          const char *infix, const char *type,
833                                          int *count)
834 {
835         struct lfsck_thread_info        *info   = lfsck_env_info(env);
836         struct lu_name                  *cname  = &info->lti_name;
837         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
838         struct lu_fid                   *tfid   = &info->lti_fid5;
839         struct lu_attr                  *la     = &info->lti_la3;
840         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
841         const struct lu_fid             *pfid;
842         struct lfsck_instance           *lfsck  = com->lc_lfsck;
843         struct dt_device                *dev    = lfsck->li_bottom;
844         struct dt_object                *parent;
845         struct thandle                  *th     = NULL;
846         struct lustre_handle             plh    = { 0 };
847         struct lustre_handle             clh    = { 0 };
848         struct linkea_data               ldata  = { 0 };
849         struct lu_buf                    linkea_buf;
850         int                              namelen;
851         int                              idx    = 0;
852         int                              rc     = 0;
853         bool                             exist  = false;
854         ENTRY;
855
856         cname->ln_name = NULL;
857         /* Create .lustre/lost+found/MDTxxxx when needed. */
858         if (unlikely(lfsck->li_lpf_obj == NULL)) {
859                 rc = lfsck_create_lpf(env, lfsck);
860                 if (rc != 0)
861                         GOTO(log, rc);
862         }
863
864         parent = lfsck->li_lpf_obj;
865         pfid = lfsck_dto2fid(parent);
866
867         /* Hold update lock on the parent to prevent others to access. */
868         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
869                               MDS_INODELOCK_UPDATE, LCK_EX);
870         if (rc != 0)
871                 GOTO(log, rc);
872
873         do {
874                 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
875                                    PFID(cfid), infix, type, idx++);
876                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
877                                (const struct dt_key *)info->lti_key,
878                                BYPASS_CAPA);
879                 if (rc != 0 && rc != -ENOENT)
880                         GOTO(log, rc);
881
882                 if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid)))
883                         exist = true;
884         } while (rc == 0 && !exist);
885
886         cname->ln_name = info->lti_key;
887         cname->ln_namelen = namelen;
888         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
889         if (rc != 0)
890                 GOTO(log, rc);
891
892         rc = linkea_add_buf(&ldata, cname, pfid);
893         if (rc != 0)
894                 GOTO(log, rc);
895
896         rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
897                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
898                               LCK_EX);
899         if (rc != 0)
900                 GOTO(log, rc);
901
902         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
903                        ldata.ld_leh->leh_len);
904         th = dt_trans_create(env, dev);
905         if (IS_ERR(th))
906                 GOTO(log, rc = PTR_ERR(th));
907
908         if (S_ISDIR(lfsck_object_type(orphan))) {
909                 rc = dt_declare_delete(env, orphan,
910                                        (const struct dt_key *)dotdot, th);
911                 if (rc != 0)
912                         GOTO(stop, rc);
913
914                 rec->rec_type = S_IFDIR;
915                 rec->rec_fid = pfid;
916                 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
917                                        (const struct dt_key *)dotdot, th);
918                 if (rc != 0)
919                         GOTO(stop, rc);
920         }
921
922         rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
923                                   XATTR_NAME_LINK, 0, th);
924         if (rc != 0)
925                 GOTO(stop, rc);
926
927         if (!exist) {
928                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
929                 rec->rec_fid = cfid;
930                 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
931                                        (const struct dt_key *)cname->ln_name,
932                                        th);
933                 if (rc != 0)
934                         GOTO(stop, rc);
935
936                 if (S_ISDIR(rec->rec_type)) {
937                         rc = dt_declare_ref_add(env, parent, th);
938                         if (rc != 0)
939                                 GOTO(stop, rc);
940                 }
941         }
942
943         memset(la, 0, sizeof(*la));
944         la->la_ctime = cfs_time_current_sec();
945         la->la_valid = LA_CTIME;
946         rc = dt_declare_attr_set(env, orphan, la, th);
947         if (rc != 0)
948                 GOTO(stop, rc);
949
950         rc = dt_trans_start_local(env, dev, th);
951         if (rc != 0)
952                 GOTO(stop, rc);
953
954         dt_write_lock(env, orphan, 0);
955         rc = lfsck_links_read(env, orphan, &ldata);
956         if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
957                    (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
958                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
959                         GOTO(unlock, rc = 1);
960
961                 if (S_ISDIR(lfsck_object_type(orphan))) {
962                         rc = dt_delete(env, orphan,
963                                        (const struct dt_key *)dotdot, th,
964                                        BYPASS_CAPA);
965                         if (rc != 0)
966                                 GOTO(unlock, rc);
967
968                         rec->rec_type = S_IFDIR;
969                         rec->rec_fid = pfid;
970                         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
971                                        (const struct dt_key *)dotdot, th,
972                                        BYPASS_CAPA, 1);
973                         if (rc != 0)
974                                 GOTO(unlock, rc);
975                 }
976
977                 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
978                                   th, BYPASS_CAPA);
979         } else {
980                 if (rc == 0 && count != NULL)
981                         *count = ldata.ld_leh->leh_reccount;
982
983                 GOTO(unlock, rc);
984         }
985         dt_write_unlock(env, orphan);
986
987         if (rc == 0 && !exist) {
988                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
989                 rec->rec_fid = cfid;
990                 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
991                                (const struct dt_key *)cname->ln_name,
992                                th, BYPASS_CAPA, 1);
993                 if (rc == 0 && S_ISDIR(rec->rec_type)) {
994                         dt_write_lock(env, parent, 0);
995                         rc = dt_ref_add(env, parent, th);
996                         dt_write_unlock(env, parent);
997                 }
998         }
999
1000         if (rc != 0)
1001                 GOTO(unlock, rc);
1002
1003         rc = dt_attr_set(env, orphan, la, th, BYPASS_CAPA);
1004
1005         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1006
1007 unlock:
1008         dt_write_unlock(env, orphan);
1009
1010 stop:
1011         dt_trans_stop(env, dev, th);
1012
1013 log:
1014         lfsck_ibits_unlock(&clh, LCK_EX);
1015         lfsck_ibits_unlock(&plh, LCK_EX);
1016         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
1017                "object "DFID", name = %s: rc = %d\n",
1018                lfsck_lfsck2name(lfsck), PFID(cfid),
1019                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1020
1021         if (rc != 0) {
1022                 struct lfsck_namespace *ns = com->lc_file_ram;
1023
1024                 ns->ln_flags |= LF_INCONSISTENT;
1025         }
1026
1027         return rc;
1028 }
1029
1030 /**
1031  * Add the specified name entry back to namespace.
1032  *
1033  * If there is a linkEA entry that back references a name entry under
1034  * some parent directory, but such parent directory does not have the
1035  * claimed name entry. On the other hand, the linkEA entries count is
1036  * not larger than the MDT-object's hard link count. Under such case,
1037  * it is quite possible that the name entry is lost. Then the LFSCK
1038  * should add the name entry back to the namespace.
1039  *
1040  * \param[in] env       pointer to the thread context
1041  * \param[in] com       pointer to the lfsck component
1042  * \param[in] parent    pointer to the directory under which the name entry
1043  *                      will be inserted into
1044  * \param[in] child     pointer to the object referenced by the name entry
1045  *                      that to be inserted into the parent
1046  * \param[in] name      the name for the child in the parent directory
1047  *
1048  * \retval              positive number for repaired cases
1049  * \retval              0 if nothing to be repaired
1050  * \retval              negative error number on failure
1051  */
1052 static int lfsck_namespace_insert_normal(const struct lu_env *env,
1053                                          struct lfsck_component *com,
1054                                          struct dt_object *parent,
1055                                          struct dt_object *child,
1056                                          const char *name)
1057 {
1058         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1059         struct lu_attr                  *la     = &info->lti_la;
1060         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1061         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1062         struct dt_device                *dev    = lfsck->li_next;
1063         struct thandle                  *th     = NULL;
1064         struct lustre_handle             lh     = { 0 };
1065         int                              rc     = 0;
1066         ENTRY;
1067
1068         if (unlikely(!dt_try_as_dir(env, parent)))
1069                 GOTO(log, rc = -ENOTDIR);
1070
1071         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1072                 GOTO(log, rc = 1);
1073
1074         /* Hold update lock on the parent to prevent others to access. */
1075         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1076                               MDS_INODELOCK_UPDATE, LCK_EX);
1077         if (rc != 0)
1078                 GOTO(log, rc);
1079
1080         th = dt_trans_create(env, dev);
1081         if (IS_ERR(th))
1082                 GOTO(unlock, rc = PTR_ERR(th));
1083
1084         rec->rec_type = lfsck_object_type(child) & S_IFMT;
1085         rec->rec_fid = lfsck_dto2fid(child);
1086         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1087                                (const struct dt_key *)name, th);
1088         if (rc != 0)
1089                 GOTO(stop, rc);
1090
1091         if (S_ISDIR(rec->rec_type)) {
1092                 rc = dt_declare_ref_add(env, parent, th);
1093                 if (rc != 0)
1094                         GOTO(stop, rc);
1095         }
1096
1097         memset(la, 0, sizeof(*la));
1098         la->la_ctime = cfs_time_current_sec();
1099         la->la_valid = LA_CTIME;
1100         rc = dt_declare_attr_set(env, parent, la, th);
1101         if (rc != 0)
1102                 GOTO(stop, rc);
1103
1104         rc = dt_declare_attr_set(env, child, la, th);
1105         if (rc != 0)
1106                 GOTO(stop, rc);
1107
1108         rc = dt_trans_start_local(env, dev, th);
1109         if (rc != 0)
1110                 GOTO(stop, rc);
1111
1112         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1113                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1114         if (rc != 0)
1115                 GOTO(stop, rc);
1116
1117         if (S_ISDIR(rec->rec_type)) {
1118                 dt_write_lock(env, parent, 0);
1119                 rc = dt_ref_add(env, parent, th);
1120                 dt_write_unlock(env, parent);
1121                 if (rc != 0)
1122                         GOTO(stop, rc);
1123         }
1124
1125         la->la_ctime = cfs_time_current_sec();
1126         rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA);
1127         if (rc != 0)
1128                 GOTO(stop, rc);
1129
1130         rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
1131
1132         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1133
1134 stop:
1135         dt_trans_stop(env, dev, th);
1136
1137 unlock:
1138         lfsck_ibits_unlock(&lh, LCK_EX);
1139
1140 log:
1141         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
1142                "the name %s and type %o to the parent "DFID": rc = %d\n",
1143                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name,
1144                lfsck_object_type(child) & S_IFMT,
1145                PFID(lfsck_dto2fid(parent)), rc);
1146
1147         if (rc != 0) {
1148                 struct lfsck_namespace *ns = com->lc_file_ram;
1149
1150                 ns->ln_flags |= LF_INCONSISTENT;
1151                 if (rc > 0)
1152                         ns->ln_lost_dirent_repaired++;
1153         }
1154
1155         return rc;
1156 }
1157
1158 /**
1159  * Create the specified orphan MDT-object on remote MDT.
1160  *
1161  * The LFSCK instance on this MDT will send LFSCK RPC to remote MDT to
1162  * ask the remote LFSCK instance to create the specified orphan object
1163  * under .lustre/lost+found/MDTxxxx/ directory with the name:
1164  * ${FID}-P-${conflict_version}.
1165  *
1166  * \param[in] env       pointer to the thread context
1167  * \param[in] com       pointer to the lfsck component
1168  * \param[in] orphan    pointer to the orphan MDT-object
1169  * \param[in] type      the orphan's type to be created
1170  *
1171  *  type "P":           The orphan object to be created was a parent directory
1172  *                      of some MDT-object which linkEA shows that the @orphan
1173  *                      object is missing.
1174  *
1175  * \see lfsck_layout_recreate_parent() for more types.
1176  *
1177  * \param[in] lmv       pointer to master LMV EA that will be set to the orphan
1178  *
1179  * \retval              positive number for repaired cases
1180  * \retval              0 if needs to repair nothing
1181  * \retval              negative error number on failure
1182  */
1183 static int lfsck_namespace_create_orphan_remote(const struct lu_env *env,
1184                                                 struct lfsck_component *com,
1185                                                 struct dt_object *orphan,
1186                                                 __u32 type,
1187                                                 struct lmv_mds_md_v1 *lmv)
1188 {
1189         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1190         struct lfsck_request            *lr     = &info->lti_lr;
1191         struct lu_seq_range             *range  = &info->lti_range;
1192         const struct lu_fid             *fid    = lfsck_dto2fid(orphan);
1193         struct lfsck_namespace          *ns     = com->lc_file_ram;
1194         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1195         struct seq_server_site          *ss     =
1196                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1197         struct lfsck_tgt_desc           *ltd    = NULL;
1198         struct ptlrpc_request           *req    = NULL;
1199         int                              rc;
1200         ENTRY;
1201
1202         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1203                 GOTO(out, rc = 1);
1204
1205         fld_range_set_mdt(range);
1206         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
1207         if (rc != 0)
1208                 GOTO(out, rc);
1209
1210         ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, range->lsr_index);
1211         if (ltd == NULL) {
1212                 ns->ln_flags |= LF_INCOMPLETE;
1213
1214                 GOTO(out, rc = -ENODEV);
1215         }
1216
1217         req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
1218                                    &RQF_LFSCK_NOTIFY);
1219         if (req == NULL)
1220                 GOTO(out, rc = -ENOMEM);
1221
1222         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1223         if (rc != 0) {
1224                 ptlrpc_request_free(req);
1225
1226                 GOTO(out, rc);
1227         }
1228
1229         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1230         memset(lr, 0, sizeof(*lr));
1231         lr->lr_event = LE_CREATE_ORPHAN;
1232         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1233         lr->lr_active = LFSCK_TYPE_NAMESPACE;
1234         lr->lr_fid = *fid;
1235         lr->lr_type = type;
1236         if (lmv != NULL) {
1237                 lr->lr_hash_type = lmv->lmv_hash_type;
1238                 lr->lr_stripe_count = lmv->lmv_stripe_count;
1239                 lr->lr_layout_version = lmv->lmv_layout_version;
1240                 memcpy(lr->lr_pool_name, lmv->lmv_pool_name,
1241                        sizeof(lr->lr_pool_name));
1242         }
1243
1244         ptlrpc_request_set_replen(req);
1245         rc = ptlrpc_queue_wait(req);
1246         ptlrpc_req_finished(req);
1247
1248         if (rc == 0)
1249                 rc = 1;
1250         else if (rc == -EEXIST)
1251                 rc = 0;
1252
1253         GOTO(out, rc);
1254
1255 out:
1256         CDEBUG(D_LFSCK, "%s: namespace LFSCK create object "
1257                DFID" on the MDT %x remotely: rc = %d\n",
1258                lfsck_lfsck2name(lfsck), PFID(fid),
1259                ltd != NULL ? ltd->ltd_index : -1, rc);
1260
1261         if (ltd != NULL)
1262                 lfsck_tgt_put(ltd);
1263
1264         return rc;
1265 }
1266
1267 /**
1268  * Create the specified orphan MDT-object locally.
1269  *
1270  * For the case that the parent MDT-object stored in some MDT-object's
1271  * linkEA entry is lost, the LFSCK will re-create the parent object as
1272  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1273  * with the name ${FID}-P-${conflict_version}.
1274  *
1275  * \param[in] env       pointer to the thread context
1276  * \param[in] com       pointer to the lfsck component
1277  * \param[in] orphan    pointer to the orphan MDT-object to be created
1278  * \param[in] type      the orphan's type to be created
1279  *
1280  *  type "P":           The orphan object to be created was a parent directory
1281  *                      of some MDT-object which linkEA shows that the @orphan
1282  *                      object is missing.
1283  *
1284  * \see lfsck_layout_recreate_parent() for more types.
1285  *
1286  * \param[in] lmv       pointer to master LMV EA that will be set to the orphan
1287  *
1288  * \retval              positive number for repaired cases
1289  * \retval              negative error number on failure
1290  */
1291 static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
1292                                                struct lfsck_component *com,
1293                                                struct dt_object *orphan,
1294                                                __u32 type,
1295                                                struct lmv_mds_md_v1 *lmv)
1296 {
1297         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1298         struct lu_attr                  *la     = &info->lti_la;
1299         struct dt_allocation_hint       *hint   = &info->lti_hint;
1300         struct dt_object_format         *dof    = &info->lti_dof;
1301         struct lu_name                  *cname  = &info->lti_name2;
1302         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1303         struct lu_fid                   *tfid   = &info->lti_fid;
1304         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1305         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
1306         const struct lu_fid             *pfid;
1307         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1308         struct dt_device                *dev    = lfsck->li_bottom;
1309         struct dt_object                *parent = NULL;
1310         struct dt_object                *child  = NULL;
1311         struct thandle                  *th     = NULL;
1312         struct lustre_handle             lh     = { 0 };
1313         struct linkea_data               ldata  = { 0 };
1314         struct lu_buf                    linkea_buf;
1315         struct lu_buf                    lmv_buf;
1316         char                             name[32];
1317         int                              namelen;
1318         int                              idx    = 0;
1319         int                              rc     = 0;
1320         ENTRY;
1321
1322         LASSERT(!dt_object_exists(orphan));
1323         LASSERT(!dt_object_remote(orphan));
1324
1325         /* @orphan maybe not attached to lfsck->li_bottom */
1326         child = lfsck_object_find_by_dev(env, dev, cfid);
1327         if (IS_ERR(child))
1328                 GOTO(log, rc = PTR_ERR(child));
1329
1330         cname->ln_name = NULL;
1331         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1332                 GOTO(log, rc = 1);
1333
1334         /* Create .lustre/lost+found/MDTxxxx when needed. */
1335         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1336                 rc = lfsck_create_lpf(env, lfsck);
1337                 if (rc != 0)
1338                         GOTO(log, rc);
1339         }
1340
1341         parent = lfsck->li_lpf_obj;
1342         pfid = lfsck_dto2fid(parent);
1343
1344         /* Hold update lock on the parent to prevent others to access. */
1345         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1346                               MDS_INODELOCK_UPDATE, LCK_EX);
1347         if (rc != 0)
1348                 GOTO(log, rc);
1349
1350         do {
1351                 namelen = snprintf(name, 31, DFID"-P-%d",
1352                                    PFID(cfid), idx++);
1353                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1354                                (const struct dt_key *)name, BYPASS_CAPA);
1355                 if (rc != 0 && rc != -ENOENT)
1356                         GOTO(unlock1, rc);
1357         } while (rc == 0);
1358
1359         cname->ln_name = name;
1360         cname->ln_namelen = namelen;
1361
1362         memset(la, 0, sizeof(*la));
1363         la->la_mode = type | (S_ISDIR(type) ? 0700 : 0600);
1364         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
1365                        LA_ATIME | LA_MTIME | LA_CTIME;
1366
1367         child->do_ops->do_ah_init(env, hint, parent, child,
1368                                   la->la_mode & S_IFMT);
1369
1370         memset(dof, 0, sizeof(*dof));
1371         dof->dof_type = dt_mode_to_dft(type);
1372
1373         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
1374         if (rc != 0)
1375                 GOTO(unlock1, rc);
1376
1377         rc = linkea_add_buf(&ldata, cname, pfid);
1378         if (rc != 0)
1379                 GOTO(unlock1, rc);
1380
1381         th = dt_trans_create(env, dev);
1382         if (IS_ERR(th))
1383                 GOTO(unlock1, rc = PTR_ERR(th));
1384
1385         rc = dt_declare_create(env, child, la, hint, dof, th);
1386         if (rc == 0 && S_ISDIR(type))
1387                 rc = dt_declare_ref_add(env, child, th);
1388
1389         if (rc != 0)
1390                 GOTO(stop, rc);
1391
1392         if (lmv != NULL) {
1393                 lmv->lmv_magic = LMV_MAGIC;
1394                 lmv->lmv_master_mdt_index = lfsck_dev_idx(dev);
1395                 lfsck_lmv_header_cpu_to_le(lmv2, lmv);
1396                 lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
1397                 rc = dt_declare_xattr_set(env, child, &lmv_buf,
1398                                           XATTR_NAME_LMV, 0, th);
1399                 if (rc != 0)
1400                         GOTO(stop, rc);
1401         }
1402
1403         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1404                        ldata.ld_leh->leh_len);
1405         rc = dt_declare_xattr_set(env, child, &linkea_buf,
1406                                   XATTR_NAME_LINK, 0, th);
1407         if (rc != 0)
1408                 GOTO(stop, rc);
1409
1410         rec->rec_type = type;
1411         rec->rec_fid = cfid;
1412         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1413                                (const struct dt_key *)name, th);
1414         if (rc == 0 && S_ISDIR(type))
1415                 rc = dt_declare_ref_add(env, parent, th);
1416
1417         if (rc != 0)
1418                 GOTO(stop, rc);
1419
1420         rc = dt_trans_start_local(env, dev, th);
1421         if (rc != 0)
1422                 GOTO(stop, rc);
1423
1424         dt_write_lock(env, child, 0);
1425         rc = dt_create(env, child, la, hint, dof, th);
1426         if (rc != 0)
1427                 GOTO(unlock2, rc);
1428
1429         if (S_ISDIR(type)) {
1430                 if (unlikely(!dt_try_as_dir(env, child)))
1431                         GOTO(unlock2, rc = -ENOTDIR);
1432
1433                 rec->rec_type = S_IFDIR;
1434                 rec->rec_fid = cfid;
1435                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
1436                                (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
1437                 if (rc != 0)
1438                         GOTO(unlock2, rc);
1439
1440                 rec->rec_fid = pfid;
1441                 rc = dt_insert(env, child, (const struct dt_rec *)rec,
1442                                (const struct dt_key *)dotdot, th,
1443                                BYPASS_CAPA, 1);
1444                 if (rc != 0)
1445                         GOTO(unlock2, rc);
1446
1447                 rc = dt_ref_add(env, child, th);
1448                 if (rc != 0)
1449                         GOTO(unlock2, rc);
1450         }
1451
1452         if (lmv != NULL) {
1453                 rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV, 0,
1454                                   th, BYPASS_CAPA);
1455                 if (rc != 0)
1456                         GOTO(unlock2, rc);
1457         }
1458
1459         rc = dt_xattr_set(env, child, &linkea_buf,
1460                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1461         dt_write_unlock(env, child);
1462         if (rc != 0)
1463                 GOTO(stop, rc);
1464
1465         rec->rec_type = type;
1466         rec->rec_fid = cfid;
1467         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1468                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1469         if (rc == 0 && S_ISDIR(type)) {
1470                 dt_write_lock(env, parent, 0);
1471                 rc = dt_ref_add(env, parent, th);
1472                 dt_write_unlock(env, parent);
1473         }
1474
1475         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1476
1477 unlock2:
1478         dt_write_unlock(env, child);
1479
1480 stop:
1481         dt_trans_stop(env, dev, th);
1482
1483 unlock1:
1484         lfsck_ibits_unlock(&lh, LCK_EX);
1485
1486 log:
1487         CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan locally for "
1488                "the object "DFID", name = %s, type %o: rc = %d\n",
1489                lfsck_lfsck2name(lfsck), PFID(cfid),
1490                cname->ln_name != NULL ? cname->ln_name : "<NULL>", type, rc);
1491
1492         if (child != NULL && !IS_ERR(child))
1493                 lfsck_object_put(env, child);
1494
1495         return rc;
1496 }
1497
1498 /**
1499  * Create the specified orphan MDT-object.
1500  *
1501  * For the case that the parent MDT-object stored in some MDT-object's
1502  * linkEA entry is lost, the LFSCK will re-create the parent object as
1503  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1504  * with the name: ${FID}-P-${conflict_version}.
1505  *
1506  * \param[in] env       pointer to the thread context
1507  * \param[in] com       pointer to the lfsck component
1508  * \param[in] orphan    pointer to the orphan MDT-object
1509  *
1510  *  type "P":           The orphan object to be created was a parent directory
1511  *                      of some MDT-object which linkEA shows that the @orphan
1512  *                      object is missing.
1513  *
1514  * \see lfsck_layout_recreate_parent() for more types.
1515  *
1516  * \param[in] lmv       pointer to master LMV EA that will be set to the orphan
1517  *
1518  * \retval              positive number for repaired cases
1519  * \retval              0 if needs to repair nothing
1520  * \retval              negative error number on failure
1521  */
1522 static int lfsck_namespace_create_orphan(const struct lu_env *env,
1523                                          struct lfsck_component *com,
1524                                          struct dt_object *orphan,
1525                                          struct lmv_mds_md_v1 *lmv)
1526 {
1527         struct lfsck_namespace *ns = com->lc_file_ram;
1528         int                     rc;
1529
1530         if (dt_object_remote(orphan))
1531                 rc = lfsck_namespace_create_orphan_remote(env, com, orphan,
1532                                                           S_IFDIR, lmv);
1533         else
1534                 rc = lfsck_namespace_create_orphan_local(env, com, orphan,
1535                                                          S_IFDIR, lmv);
1536
1537         if (rc != 0)
1538                 ns->ln_flags |= LF_INCONSISTENT;
1539
1540         return rc;
1541 }
1542
1543 /**
1544  * Remove the specified entry from the linkEA.
1545  *
1546  * Locate the linkEA entry with the given @cname and @pfid, then
1547  * remove this entry or the other entries those are repeated with
1548  * this entry.
1549  *
1550  * \param[in] env       pointer to the thread context
1551  * \param[in] com       pointer to the lfsck component
1552  * \param[in] obj       pointer to the dt_object to be handled
1553  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1554  * \param[in] cname     the name for the child in the parent directory
1555  * \param[in] pfid      the parent directory's FID for the linkEA
1556  * \param[in] next      if true, then remove the first found linkEA
1557  *                      entry, and move the ldata->ld_lee to next entry
1558  *
1559  * \retval              positive number for repaired cases
1560  * \retval              0 if nothing to be repaired
1561  * \retval              negative error number on failure
1562  */
1563 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
1564                                          struct lfsck_component *com,
1565                                          struct dt_object *obj,
1566                                          struct linkea_data *ldata,
1567                                          struct lu_name *cname,
1568                                          struct lu_fid *pfid,
1569                                          bool next)
1570 {
1571         struct lfsck_instance           *lfsck     = com->lc_lfsck;
1572         struct dt_device                *dev       = lfsck->li_bottom;
1573         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
1574         struct thandle                  *th        = NULL;
1575         struct lustre_handle             lh        = { 0 };
1576         struct linkea_data               ldata_new = { 0 };
1577         struct lu_buf                    linkea_buf;
1578         int                              rc        = 0;
1579         ENTRY;
1580
1581         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1582                               MDS_INODELOCK_UPDATE |
1583                               MDS_INODELOCK_XATTR, LCK_EX);
1584         if (rc != 0)
1585                 GOTO(log, rc);
1586
1587         if (next)
1588                 linkea_del_buf(ldata, cname);
1589         else
1590                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
1591                                                     true);
1592         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1593                        ldata->ld_leh->leh_len);
1594
1595 again:
1596         th = dt_trans_create(env, dev);
1597         if (IS_ERR(th))
1598                 GOTO(unlock1, rc = PTR_ERR(th));
1599
1600         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1601                                   XATTR_NAME_LINK, 0, th);
1602         if (rc != 0)
1603                 GOTO(stop, rc);
1604
1605         rc = dt_trans_start_local(env, dev, th);
1606         if (rc != 0)
1607                 GOTO(stop, rc);
1608
1609         dt_write_lock(env, obj, 0);
1610         if (unlikely(lfsck_is_dead_obj(obj)))
1611                 GOTO(unlock2, rc = -ENOENT);
1612
1613         rc = lfsck_links_read2(env, obj, &ldata_new);
1614         if (rc != 0)
1615                 GOTO(unlock2, rc);
1616
1617         /* The specified linkEA entry has been removed by race. */
1618         rc = linkea_links_find(&ldata_new, cname, pfid);
1619         if (rc != 0)
1620                 GOTO(unlock2, rc = 0);
1621
1622         if (bk->lb_param & LPF_DRYRUN)
1623                 GOTO(unlock2, rc = 1);
1624
1625         if (next)
1626                 linkea_del_buf(&ldata_new, cname);
1627         else
1628                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
1629                                                     true);
1630
1631         if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
1632                 dt_write_unlock(env, obj);
1633                 dt_trans_stop(env, dev, th);
1634                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1635                                ldata_new.ld_leh->leh_len);
1636                 goto again;
1637         }
1638
1639         lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1640                        ldata_new.ld_leh->leh_len);
1641         rc = dt_xattr_set(env, obj, &linkea_buf,
1642                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1643
1644         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1645
1646 unlock2:
1647         dt_write_unlock(env, obj);
1648
1649 stop:
1650         dt_trans_stop(env, dev, th);
1651
1652 unlock1:
1653         lfsck_ibits_unlock(&lh, LCK_EX);
1654
1655 log:
1656         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
1657                "for the object: "DFID", parent "DFID", name %.*s\n",
1658                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
1659                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
1660                cname->ln_name);
1661
1662         if (rc != 0) {
1663                 struct lfsck_namespace *ns = com->lc_file_ram;
1664
1665                 ns->ln_flags |= LF_INCONSISTENT;
1666         }
1667
1668         return rc;
1669 }
1670
1671 /**
1672  * Conditionally remove the specified entry from the linkEA.
1673  *
1674  * Take the parent lock firstly, then check whether the specified
1675  * name entry exists or not: if yes, do nothing; otherwise, call
1676  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
1677  *
1678  * \param[in] env       pointer to the thread context
1679  * \param[in] com       pointer to the lfsck component
1680  * \param[in] parent    pointer to the parent directory
1681  * \param[in] child     pointer to the child object that holds the linkEA
1682  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1683  * \param[in] cname     the name for the child in the parent directory
1684  * \param[in] pfid      the parent directory's FID for the linkEA
1685  *
1686  * \retval              positive number for repaired cases
1687  * \retval              0 if nothing to be repaired
1688  * \retval              negative error number on failure
1689  */
1690 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
1691                                               struct lfsck_component *com,
1692                                               struct dt_object *parent,
1693                                               struct dt_object *child,
1694                                               struct linkea_data *ldata,
1695                                               struct lu_name *cname,
1696                                               struct lu_fid *pfid)
1697 {
1698         struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
1699         struct lustre_handle     lh     = { 0 };
1700         int                      rc;
1701         ENTRY;
1702
1703         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
1704                               MDS_INODELOCK_UPDATE, LCK_EX);
1705         if (rc != 0)
1706                 RETURN(rc);
1707
1708         dt_read_lock(env, parent, 0);
1709         if (unlikely(lfsck_is_dead_obj(parent))) {
1710                 dt_read_unlock(env, parent);
1711                 lfsck_ibits_unlock(&lh, LCK_EX);
1712                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1713                                                    cname, pfid, true);
1714
1715                 RETURN(rc);
1716         }
1717
1718         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1719                        (const struct dt_key *)cname->ln_name,
1720                        BYPASS_CAPA);
1721         dt_read_unlock(env, parent);
1722
1723         /* It is safe to release the ldlm lock, because when the logic come
1724          * here, we have got all the needed information above whether the
1725          * linkEA entry is valid or not. It is not important that others
1726          * may add new linkEA entry after the ldlm lock released. If other
1727          * has removed the specified linkEA entry by race, then it is OK,
1728          * because the subsequent lfsck_namespace_shrink_linkea() can handle
1729          * such case. */
1730         lfsck_ibits_unlock(&lh, LCK_EX);
1731         if (rc == -ENOENT) {
1732                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1733                                                    cname, pfid, true);
1734
1735                 RETURN(rc);
1736         }
1737
1738         if (rc != 0)
1739                 RETURN(rc);
1740
1741         /* The LFSCK just found some internal status of cross-MDTs
1742          * create operation. That is normal. */
1743         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1744                 linkea_next_entry(ldata);
1745
1746                 RETURN(0);
1747         }
1748
1749         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1750                                            pfid, true);
1751
1752         RETURN(rc);
1753 }
1754
1755 /**
1756  * Conditionally replace name entry in the parent.
1757  *
1758  * As required, the LFSCK may re-create the lost MDT-object for dangling
1759  * name entry, but such repairing may be wrong because of bad FID in the
1760  * name entry. As the LFSCK processing, the real MDT-object may be found,
1761  * then the LFSCK should check whether the former re-created MDT-object
1762  * has been modified or not, if not, then destroy it and update the name
1763  * entry in the parent to reference the real MDT-object.
1764  *
1765  * \param[in] env       pointer to the thread context
1766  * \param[in] com       pointer to the lfsck component
1767  * \param[in] parent    pointer to the parent directory
1768  * \param[in] child     pointer to the MDT-object that may be the real
1769  *                      MDT-object corresponding to the name entry in parent
1770  * \param[in] cfid      the current FID in the name entry
1771  * \param[in] cname     contains the name of the child in the parent directory
1772  *
1773  * \retval              positive number for repaired cases
1774  * \retval              0 if nothing to be repaired
1775  * \retval              negative error number on failure
1776  */
1777 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1778                                         struct lfsck_component *com,
1779                                         struct dt_object *parent,
1780                                         struct dt_object *child,
1781                                         const struct lu_fid *cfid,
1782                                         const struct lu_name *cname)
1783 {
1784         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1785         struct lu_fid                   *tfid   = &info->lti_fid5;
1786         struct lu_attr                  *la     = &info->lti_la;
1787         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1788         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1789         struct dt_device                *dev    = lfsck->li_next;
1790         const char                      *name   = cname->ln_name;
1791         struct dt_object                *obj    = NULL;
1792         struct lustre_handle             plh    = { 0 };
1793         struct lustre_handle             clh    = { 0 };
1794         struct linkea_data               ldata  = { 0 };
1795         struct thandle                  *th     = NULL;
1796         bool                             exist  = true;
1797         int                              rc     = 0;
1798         ENTRY;
1799
1800         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
1801                               MDS_INODELOCK_UPDATE, LCK_EX);
1802         if (rc != 0)
1803                 GOTO(log, rc);
1804
1805         if (!fid_is_sane(cfid)) {
1806                 exist = false;
1807                 goto replace;
1808         }
1809
1810         obj = lfsck_object_find(env, lfsck, cfid);
1811         if (IS_ERR(obj)) {
1812                 rc = PTR_ERR(obj);
1813                 if (rc == -ENOENT) {
1814                         exist = false;
1815                         goto replace;
1816                 }
1817
1818                 GOTO(log, rc);
1819         }
1820
1821         if (!dt_object_exists(obj)) {
1822                 exist = false;
1823                 goto replace;
1824         }
1825
1826         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1827                        (const struct dt_key *)name, BYPASS_CAPA);
1828         if (rc == -ENOENT) {
1829                 exist = false;
1830                 goto replace;
1831         }
1832
1833         if (rc != 0)
1834                 GOTO(log, rc);
1835
1836         /* Someone changed the name entry, cannot replace it. */
1837         if (!lu_fid_eq(cfid, tfid))
1838                 GOTO(log, rc = 0);
1839
1840         /* lock the object to be destroyed. */
1841         rc = lfsck_ibits_lock(env, lfsck, obj, &clh,
1842                               MDS_INODELOCK_UPDATE |
1843                               MDS_INODELOCK_XATTR, LCK_EX);
1844         if (rc != 0)
1845                 GOTO(log, rc);
1846
1847         if (unlikely(lfsck_is_dead_obj(obj))) {
1848                 exist = false;
1849                 goto replace;
1850         }
1851
1852         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1853         if (rc != 0)
1854                 GOTO(log, rc);
1855
1856         /* The object has been modified by other(s), or it is not created by
1857          * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1858         if (la->la_ctime != 0)
1859                 GOTO(log, rc);
1860
1861         if (S_ISREG(la->la_mode)) {
1862                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
1863                                   BYPASS_CAPA);
1864                 /* If someone has created related OST-object(s),
1865                  * then keep it. */
1866                 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1867                         GOTO(log, rc = (rc > 0 ? 0 : rc));
1868         }
1869
1870 replace:
1871         dt_read_lock(env, child, 0);
1872         rc = lfsck_links_read2(env, child, &ldata);
1873         dt_read_unlock(env, child);
1874
1875         /* Someone changed the child, no need to replace. */
1876         if (rc == -ENODATA)
1877                 GOTO(log, rc = 0);
1878
1879         if (rc != 0)
1880                 GOTO(log, rc);
1881
1882         rc = linkea_links_find(&ldata, cname, lfsck_dto2fid(parent));
1883         /* Someone moved the child, no need to replace. */
1884         if (rc != 0)
1885                 GOTO(log, rc = 0);
1886
1887         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1888                 GOTO(log, rc = 1);
1889
1890         th = dt_trans_create(env, dev);
1891         if (IS_ERR(th))
1892                 GOTO(log, rc = PTR_ERR(th));
1893
1894         if (exist) {
1895                 rc = dt_declare_destroy(env, obj, th);
1896                 if (rc != 0)
1897                         GOTO(stop, rc);
1898         }
1899
1900         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1901         if (rc != 0)
1902                 GOTO(stop, rc);
1903
1904         rec->rec_type = S_IFDIR;
1905         rec->rec_fid = lfsck_dto2fid(child);
1906         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1907                                (const struct dt_key *)name, th);
1908         if (rc != 0)
1909                 GOTO(stop, rc);
1910
1911         rc = dt_trans_start(env, dev, th);
1912         if (rc != 0)
1913                 GOTO(stop, rc);
1914
1915         if (exist) {
1916                 rc = dt_destroy(env, obj, th);
1917                 if (rc != 0)
1918                         GOTO(stop, rc);
1919         }
1920
1921         /* The old name entry maybe not exist. */
1922         dt_delete(env, parent, (const struct dt_key *)name, th,
1923                   BYPASS_CAPA);
1924
1925         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1926                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1927
1928         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1929
1930 stop:
1931         dt_trans_stop(env, dev, th);
1932
1933 log:
1934         lfsck_ibits_unlock(&clh, LCK_EX);
1935         lfsck_ibits_unlock(&plh, LCK_EX);
1936         if (obj != NULL && !IS_ERR(obj))
1937                 lfsck_object_put(env, obj);
1938
1939         CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1940                "object "DFID" because of conflict with the object "DFID
1941                " under the parent "DFID" with name %s: rc = %d\n",
1942                lfsck_lfsck2name(lfsck), PFID(cfid),
1943                PFID(lfsck_dto2fid(child)), PFID(lfsck_dto2fid(parent)),
1944                name, rc);
1945
1946         return rc;
1947 }
1948
1949 /**
1950  * Overwrite the linkEA for the object with the given ldata.
1951  *
1952  * The caller should take the ldlm lock before the calling.
1953  *
1954  * \param[in] env       pointer to the thread context
1955  * \param[in] com       pointer to the lfsck component
1956  * \param[in] obj       pointer to the dt_object to be handled
1957  * \param[in] ldata     pointer to the new linkEA data
1958  *
1959  * \retval              positive number for repaired cases
1960  * \retval              0 if nothing to be repaired
1961  * \retval              negative error number on failure
1962  */
1963 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1964                                    struct lfsck_component *com,
1965                                    struct dt_object *obj,
1966                                    struct linkea_data *ldata)
1967 {
1968         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1969         struct dt_device                *dev    = lfsck->li_bottom;
1970         struct thandle                  *th     = NULL;
1971         struct lu_buf                    linkea_buf;
1972         int                              rc     = 0;
1973         ENTRY;
1974
1975         LASSERT(!dt_object_remote(obj));
1976
1977         th = dt_trans_create(env, dev);
1978         if (IS_ERR(th))
1979                 GOTO(log, rc = PTR_ERR(th));
1980
1981         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1982                        ldata->ld_leh->leh_len);
1983         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1984                                   XATTR_NAME_LINK, 0, th);
1985         if (rc != 0)
1986                 GOTO(stop, rc);
1987
1988         rc = dt_trans_start_local(env, dev, th);
1989         if (rc != 0)
1990                 GOTO(stop, rc);
1991
1992         dt_write_lock(env, obj, 0);
1993         if (unlikely(lfsck_is_dead_obj(obj)))
1994                 GOTO(unlock, rc = 0);
1995
1996         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1997                 GOTO(unlock, rc = 1);
1998
1999         rc = dt_xattr_set(env, obj, &linkea_buf,
2000                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2001
2002         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2003
2004 unlock:
2005         dt_write_unlock(env, obj);
2006
2007 stop:
2008         dt_trans_stop(env, dev, th);
2009
2010 log:
2011         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
2012                "object "DFID": rc = %d\n",
2013                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
2014
2015         if (rc != 0) {
2016                 struct lfsck_namespace *ns = com->lc_file_ram;
2017
2018                 ns->ln_flags |= LF_INCONSISTENT;
2019         }
2020
2021         return rc;
2022 }
2023
2024 /**
2025  * Repair invalid name entry.
2026  *
2027  * If the name entry contains invalid information, such as bad file type
2028  * or (and) corrupted object FID, then either remove the name entry or
2029  * udpate the name entry with the given (right) information.
2030  *
2031  * \param[in] env       pointer to the thread context
2032  * \param[in] com       pointer to the lfsck component
2033  * \param[in] parent    pointer to the parent directory
2034  * \param[in] child     pointer to the object referenced by the name entry
2035  * \param[in] name      the old name of the child under the parent directory
2036  * \param[in] name2     the new name of the child under the parent directory
2037  * \param[in] type      the type claimed by the name entry
2038  * \param[in] update    update the name entry if true; otherwise, remove it
2039  * \param[in] dec       decrease the parent nlink count if true
2040  *
2041  * \retval              positive number for repaired successfully
2042  * \retval              0 if nothing to be repaired
2043  * \retval              negative error number on failure
2044  */
2045 int lfsck_namespace_repair_dirent(const struct lu_env *env,
2046                                   struct lfsck_component *com,
2047                                   struct dt_object *parent,
2048                                   struct dt_object *child,
2049                                   const char *name, const char *name2,
2050                                   __u16 type, bool update, bool dec)
2051 {
2052         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2053         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
2054         const struct lu_fid             *cfid   = lfsck_dto2fid(child);
2055         struct lu_fid                   *tfid   = &info->lti_fid5;
2056         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2057         struct dt_device                *dev    = lfsck->li_next;
2058         struct thandle                  *th     = NULL;
2059         struct lustre_handle             lh     = { 0 };
2060         int                              rc     = 0;
2061         ENTRY;
2062
2063         if (unlikely(!dt_try_as_dir(env, parent)))
2064                 GOTO(log, rc = -ENOTDIR);
2065
2066         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2067                               MDS_INODELOCK_UPDATE, LCK_EX);
2068         if (rc != 0)
2069                 GOTO(log, rc);
2070
2071         th = dt_trans_create(env, dev);
2072         if (IS_ERR(th))
2073                 GOTO(unlock1, rc = PTR_ERR(th));
2074
2075         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2076         if (rc != 0)
2077                 GOTO(stop, rc);
2078
2079         if (update) {
2080                 rec->rec_type = lfsck_object_type(child) & S_IFMT;
2081                 rec->rec_fid = cfid;
2082                 rc = dt_declare_insert(env, parent,
2083                                        (const struct dt_rec *)rec,
2084                                        (const struct dt_key *)name2, th);
2085                 if (rc != 0)
2086                         GOTO(stop, rc);
2087         }
2088
2089         if (dec) {
2090                 rc = dt_declare_ref_del(env, parent, th);
2091                 if (rc != 0)
2092                         GOTO(stop, rc);
2093         }
2094
2095         rc = dt_trans_start(env, dev, th);
2096         if (rc != 0)
2097                 GOTO(stop, rc);
2098
2099         dt_write_lock(env, parent, 0);
2100         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
2101                        (const struct dt_key *)name, BYPASS_CAPA);
2102         /* Someone has removed the bad name entry by race. */
2103         if (rc == -ENOENT)
2104                 GOTO(unlock2, rc = 0);
2105
2106         if (rc != 0)
2107                 GOTO(unlock2, rc);
2108
2109         /* Someone has removed the bad name entry and reused it for other
2110          * object by race. */
2111         if (!lu_fid_eq(tfid, cfid))
2112                 GOTO(unlock2, rc = 0);
2113
2114         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2115                 GOTO(unlock2, rc = 1);
2116
2117         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2118                        BYPASS_CAPA);
2119         if (rc != 0)
2120                 GOTO(unlock2, rc);
2121
2122         if (update) {
2123                 rc = dt_insert(env, parent,
2124                                (const struct dt_rec *)rec,
2125                                (const struct dt_key *)name2, th,
2126                                BYPASS_CAPA, 1);
2127                 if (rc != 0)
2128                         GOTO(unlock2, rc);
2129         }
2130
2131         if (dec) {
2132                 rc = dt_ref_del(env, parent, th);
2133                 if (rc != 0)
2134                         GOTO(unlock2, rc);
2135         }
2136
2137         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2138
2139 unlock2:
2140         dt_write_unlock(env, parent);
2141
2142 stop:
2143         dt_trans_stop(env, dev, th);
2144
2145         /* We are not sure whether the child will become orphan or not.
2146          * Record it in the LFSCK trace file for further checking in
2147          * the second-stage scanning. */
2148         if (!update && !dec && rc == 0)
2149                 lfsck_namespace_trace_update(env, com, cfid,
2150                                              LNTF_CHECK_LINKEA, true);
2151
2152 unlock1:
2153         lfsck_ibits_unlock(&lh, LCK_EX);
2154
2155 log:
2156         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
2157                "entry for: parent "DFID", child "DFID", name %s, type "
2158                "in name entry %o, type claimed by child %o. repair it "
2159                "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck),
2160                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2161                name, type, update ? lfsck_object_type(child) : 0,
2162                update ? "updating" : "removing", name2, rc);
2163
2164         if (rc != 0) {
2165                 struct lfsck_namespace *ns = com->lc_file_ram;
2166
2167                 ns->ln_flags |= LF_INCONSISTENT;
2168         }
2169
2170         return rc;
2171 }
2172
2173 /**
2174  * Update the ".." name entry for the given object.
2175  *
2176  * The object's ".." is corrupted, this function will update the ".." name
2177  * entry with the given pfid, and the linkEA with the given ldata.
2178  *
2179  * The caller should take the ldlm lock before the calling.
2180  *
2181  * \param[in] env       pointer to the thread context
2182  * \param[in] com       pointer to the lfsck component
2183  * \param[in] obj       pointer to the dt_object to be handled
2184  * \param[in] pfid      the new fid for the object's ".." name entry
2185  * \param[in] cname     the name for the @obj in the parent directory
2186  *
2187  * \retval              positive number for repaired cases
2188  * \retval              0 if nothing to be repaired
2189  * \retval              negative error number on failure
2190  */
2191 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
2192                                                   struct lfsck_component *com,
2193                                                   struct dt_object *obj,
2194                                                   const struct lu_fid *pfid,
2195                                                   struct lu_name *cname)
2196 {
2197         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2198         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
2199         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2200         struct dt_device                *dev    = lfsck->li_bottom;
2201         struct thandle                  *th     = NULL;
2202         struct linkea_data               ldata  = { 0 };
2203         struct lu_buf                    linkea_buf;
2204         int                              rc     = 0;
2205         ENTRY;
2206
2207         LASSERT(!dt_object_remote(obj));
2208         LASSERT(S_ISDIR(lfsck_object_type(obj)));
2209
2210         rc = linkea_data_new(&ldata, &info->lti_big_buf);
2211         if (rc != 0)
2212                 GOTO(log, rc);
2213
2214         rc = linkea_add_buf(&ldata, cname, pfid);
2215         if (rc != 0)
2216                 GOTO(log, rc);
2217
2218         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2219                        ldata.ld_leh->leh_len);
2220
2221         th = dt_trans_create(env, dev);
2222         if (IS_ERR(th))
2223                 GOTO(log, rc = PTR_ERR(th));
2224
2225         rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
2226         if (rc != 0)
2227                 GOTO(stop, rc);
2228
2229         rec->rec_type = S_IFDIR;
2230         rec->rec_fid = pfid;
2231         rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
2232                                (const struct dt_key *)dotdot, th);
2233         if (rc != 0)
2234                 GOTO(stop, rc);
2235
2236         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
2237                                   XATTR_NAME_LINK, 0, th);
2238         if (rc != 0)
2239                 GOTO(stop, rc);
2240
2241         rc = dt_trans_start_local(env, dev, th);
2242         if (rc != 0)
2243                 GOTO(stop, rc);
2244
2245         dt_write_lock(env, obj, 0);
2246         if (unlikely(lfsck_is_dead_obj(obj)))
2247                 GOTO(unlock, rc = 0);
2248
2249         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2250                 GOTO(unlock, rc = 1);
2251
2252         /* The old ".." name entry maybe not exist. */
2253         dt_delete(env, obj, (const struct dt_key *)dotdot, th,
2254                   BYPASS_CAPA);
2255
2256         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
2257                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
2258         if (rc != 0)
2259                 GOTO(unlock, rc);
2260
2261         rc = dt_xattr_set(env, obj, &linkea_buf,
2262                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2263
2264         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2265
2266 unlock:
2267         dt_write_unlock(env, obj);
2268
2269 stop:
2270         dt_trans_stop(env, dev, th);
2271
2272 log:
2273         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
2274                "the object "DFID", new parent "DFID": rc = %d\n",
2275                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
2276                PFID(pfid), rc);
2277
2278         if (rc != 0) {
2279                 struct lfsck_namespace *ns = com->lc_file_ram;
2280
2281                 ns->ln_flags |= LF_INCONSISTENT;
2282         }
2283
2284         return rc;
2285 }
2286
2287 /**
2288  * Handle orphan @obj during Double Scan Directory.
2289  *
2290  * Remove the @obj's current (invalid) linkEA entries, and insert
2291  * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
2292  * ${FID}-${PFID}-D-${conflict_version}
2293  *
2294  * The caller should take the ldlm lock before the calling.
2295  *
2296  * \param[in] env       pointer to the thread context
2297  * \param[in] com       pointer to the lfsck component
2298  * \param[in] obj       pointer to the orphan object to be handled
2299  * \param[in] pfid      the new fid for the object's ".." name entry
2300  * \param[in,out] lh    ldlm lock handler for the given @obj
2301  * \param[out] type     to tell the caller what the inconsistency is
2302  *
2303  * \retval              positive number for repaired cases
2304  * \retval              0 if nothing to be repaired
2305  * \retval              negative error number on failure
2306  */
2307 static int
2308 lfsck_namespace_dsd_orphan(const struct lu_env *env,
2309                            struct lfsck_component *com,
2310                            struct dt_object *obj,
2311                            const struct lu_fid *pfid,
2312                            struct lustre_handle *lh,
2313                            enum lfsck_namespace_inconsistency_type *type)
2314 {
2315         struct lfsck_thread_info *info = lfsck_env_info(env);
2316         struct lfsck_namespace   *ns   = com->lc_file_ram;
2317         int                       rc;
2318         ENTRY;
2319
2320         /* Remove the unrecognized linkEA. */
2321         rc = lfsck_namespace_links_remove(env, com, obj);
2322         lfsck_ibits_unlock(lh, LCK_EX);
2323         if (rc < 0 && rc != -ENODATA)
2324                 RETURN(rc);
2325
2326         *type = LNIT_MUL_REF;
2327
2328         /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2329          * ever tried to verify some remote MDT-object that resides on this
2330          * MDT, but this MDT failed to respond such request. So means there
2331          * may be some remote name entry on other MDT that references this
2332          * object with another name, so we cannot know whether this linkEA
2333          * is valid or not. So keep it there and maybe resolved when next
2334          * LFSCK run. */
2335         if (ns->ln_flags & LF_INCOMPLETE)
2336                 RETURN(0);
2337
2338         /* The unique linkEA is invalid, even if the ".." name entry may be
2339          * valid, we still cannot know via which name entry this directory
2340          * will be referenced. Then handle it as pure orphan. */
2341         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2342                  "-"DFID, PFID(pfid));
2343         rc = lfsck_namespace_insert_orphan(env, com, obj,
2344                                            info->lti_tmpbuf, "D", NULL);
2345
2346         RETURN(rc);
2347 }
2348
2349 /**
2350  * Double Scan Directory object for single linkEA entry case.
2351  *
2352  * The given @child has unique linkEA entry. If the linkEA entry is valid,
2353  * then check whether the name is in the namespace or not, if not, add the
2354  * missing name entry back to namespace. If the linkEA entry is invalid,
2355  * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
2356  * as an orphan.
2357  *
2358  * \param[in] env       pointer to the thread context
2359  * \param[in] com       pointer to the lfsck component
2360  * \param[in] child     pointer to the directory to be double scanned
2361  * \param[in] pfid      the FID corresponding to the ".." entry
2362  * \param[in] ldata     pointer to the linkEA data for the given @child
2363  * \param[in,out] lh    ldlm lock handler for the given @child
2364  * \param[out] type     to tell the caller what the inconsistency is
2365  * \param[in] retry     if found inconsistency, but the caller does not hold
2366  *                      ldlm lock on the @child, then set @retry as true
2367  *
2368  * \retval              positive number for repaired cases
2369  * \retval              0 if nothing to be repaired
2370  * \retval              negative error number on failure
2371  */
2372 static int
2373 lfsck_namespace_dsd_single(const struct lu_env *env,
2374                            struct lfsck_component *com,
2375                            struct dt_object *child,
2376                            const struct lu_fid *pfid,
2377                            struct linkea_data *ldata,
2378                            struct lustre_handle *lh,
2379                            enum lfsck_namespace_inconsistency_type *type,
2380                            bool *retry)
2381 {
2382         struct lfsck_thread_info *info          = lfsck_env_info(env);
2383         struct lu_name           *cname         = &info->lti_name;
2384         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2385         struct lu_fid            *tfid          = &info->lti_fid3;
2386         struct lfsck_namespace   *ns            = com->lc_file_ram;
2387         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2388         struct dt_object         *parent        = NULL;
2389         struct lmv_mds_md_v1     *lmv;
2390         int                       rc            = 0;
2391         ENTRY;
2392
2393         lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key);
2394         /* The unique linkEA entry with bad parent will be handled as orphan. */
2395         if (!fid_is_sane(tfid)) {
2396                 if (!lustre_handle_is_used(lh) && retry != NULL)
2397                         *retry = true;
2398                 else
2399                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2400                                                         pfid, lh, type);
2401
2402                 GOTO(out, rc);
2403         }
2404
2405         parent = lfsck_object_find_bottom(env, lfsck, tfid);
2406         if (IS_ERR(parent))
2407                 GOTO(out, rc = PTR_ERR(parent));
2408
2409         /* We trust the unique linkEA entry in spite of whether it matches the
2410          * ".." name entry or not. Because even if the linkEA entry is wrong
2411          * and the ".." name entry is right, we still cannot know via which
2412          * name entry the child will be referenced, since all known entries
2413          * have been verified during the first-stage scanning. */
2414         if (!dt_object_exists(parent)) {
2415                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2416                  * has ever tried to verify some remote MDT-object that resides
2417                  * on this MDT, but this MDT failed to respond such request. So
2418                  * means there may be some remote name entry on other MDT that
2419                  * references this object with another name, so we cannot know
2420                  * whether this linkEA is valid or not. So keep it there and
2421                  * maybe resolved when next LFSCK run. */
2422                 if (ns->ln_flags & LF_INCOMPLETE)
2423                         GOTO(out, rc = 0);
2424
2425                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2426                         *retry = true;
2427
2428                         GOTO(out, rc = 0);
2429                 }
2430
2431                 lfsck_ibits_unlock(lh, LCK_EX);
2432
2433 lost_parent:
2434                 lmv = &info->lti_lmv;
2435                 rc = lfsck_read_stripe_lmv(env, child, lmv);
2436                 if (rc != 0 && rc != -ENODATA)
2437                         GOTO(out, rc);
2438
2439                 if (rc == -ENODATA || lmv->lmv_magic != LMV_MAGIC_STRIPE) {
2440                         lmv = NULL;
2441                 } else if (lfsck_shard_name_to_index(env,
2442                                         cname->ln_name, cname->ln_namelen,
2443                                         S_IFDIR, cfid) < 0) {
2444                         /* It is an invalid name entry, we
2445                          * cannot trust the parent also. */
2446                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2447                                                 ldata, cname, tfid, true);
2448                         if (rc < 0)
2449                                 GOTO(out, rc);
2450
2451                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2452                                  "-"DFID, PFID(pfid));
2453                         rc = lfsck_namespace_insert_orphan(env, com, child,
2454                                                 info->lti_tmpbuf, "S", NULL);
2455
2456                         GOTO(out, rc);
2457                 }
2458
2459                 /* Create the lost parent as an orphan. */
2460                 rc = lfsck_namespace_create_orphan(env, com, parent, lmv);
2461                 if (rc >= 0) {
2462                         /* Add the missing name entry to the parent. */
2463                         rc = lfsck_namespace_insert_normal(env, com, parent,
2464                                                         child, cname->ln_name);
2465                         if (unlikely(rc == -EEXIST)) {
2466                                 /* Unfortunately, someone reused the name
2467                                  * under the parent by race. So we have
2468                                  * to remove the linkEA entry from
2469                                  * current child object. It means that the
2470                                  * LFSCK cannot recover the system
2471                                  * totally back to its original status,
2472                                  * but it is necessary to make the
2473                                  * current system to be consistent. */
2474                                 rc = lfsck_namespace_shrink_linkea(env,
2475                                                 com, child, ldata,
2476                                                 cname, tfid, true);
2477                                 if (rc >= 0) {
2478                                         snprintf(info->lti_tmpbuf,
2479                                                  sizeof(info->lti_tmpbuf),
2480                                                  "-"DFID, PFID(pfid));
2481                                         rc = lfsck_namespace_insert_orphan(env,
2482                                                 com, child, info->lti_tmpbuf,
2483                                                 "D", NULL);
2484                                 }
2485                         }
2486                 }
2487
2488                 GOTO(out, rc);
2489         }
2490
2491         /* The unique linkEA entry with bad parent will be handled as orphan. */
2492         if (unlikely(!dt_try_as_dir(env, parent))) {
2493                 if (!lustre_handle_is_used(lh) && retry != NULL)
2494                         *retry = true;
2495                 else
2496                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2497                                                         pfid, lh, type);
2498
2499                 GOTO(out, rc);
2500         }
2501
2502         rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
2503                        (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
2504         if (rc == -ENOENT) {
2505                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2506                  * has ever tried to verify some remote MDT-object that resides
2507                  * on this MDT, but this MDT failed to respond such request. So
2508                  * means there may be some remote name entry on other MDT that
2509                  * references this object with another name, so we cannot know
2510                  * whether this linkEA is valid or not. So keep it there and
2511                  * maybe resolved when next LFSCK run. */
2512                 if (ns->ln_flags & LF_INCOMPLETE)
2513                         GOTO(out, rc = 0);
2514
2515                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2516                         *retry = true;
2517
2518                         GOTO(out, rc = 0);
2519                 }
2520
2521                 lfsck_ibits_unlock(lh, LCK_EX);
2522                 rc = lfsck_namespace_check_name(env, parent, child, cname);
2523                 if (rc == -ENOENT)
2524                         goto lost_parent;
2525
2526                 if (rc < 0)
2527                         GOTO(out, rc);
2528
2529                 /* It is an invalid name entry, drop it. */
2530                 if (unlikely(rc > 0)) {
2531                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2532                                                 ldata, cname, tfid, true);
2533                         if (rc >= 0) {
2534                                 snprintf(info->lti_tmpbuf,
2535                                          sizeof(info->lti_tmpbuf),
2536                                          "-"DFID, PFID(pfid));
2537                                 rc = lfsck_namespace_insert_orphan(env, com,
2538                                         child, info->lti_tmpbuf, "D", NULL);
2539                         }
2540
2541                         GOTO(out, rc);
2542                 }
2543
2544                 /* Add the missing name entry back to the namespace. */
2545                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2546                                                    cname->ln_name);
2547                 if (unlikely(rc == -ESTALE))
2548                         /* It may happen when the remote object has been
2549                          * removed, but the local MDT is not aware of that. */
2550                         goto lost_parent;
2551
2552                 if (unlikely(rc == -EEXIST)) {
2553                         /* Unfortunately, someone reused the name under the
2554                          * parent by race. So we have to remove the linkEA
2555                          * entry from current child object. It means that the
2556                          * LFSCK cannot recover the system totally back to
2557                          * its original status, but it is necessary to make
2558                          * the current system to be consistent.
2559                          *
2560                          * It also may be because of the LFSCK found some
2561                          * internal status of create operation. Under such
2562                          * case, nothing to be done. */
2563                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2564                                         parent, child, ldata, cname, tfid);
2565                         if (rc >= 0) {
2566                                 snprintf(info->lti_tmpbuf,
2567                                          sizeof(info->lti_tmpbuf),
2568                                          "-"DFID, PFID(pfid));
2569                                 rc = lfsck_namespace_insert_orphan(env, com,
2570                                         child, info->lti_tmpbuf, "D", NULL);
2571                         }
2572                 }
2573
2574                 GOTO(out, rc);
2575         }
2576
2577         if (rc != 0)
2578                 GOTO(out, rc);
2579
2580         if (!lu_fid_eq(tfid, cfid)) {
2581                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2582                         *retry = true;
2583
2584                         GOTO(out, rc = 0);
2585                 }
2586
2587                 lfsck_ibits_unlock(lh, LCK_EX);
2588                 /* The name entry references another MDT-object that
2589                  * may be created by the LFSCK for repairing dangling
2590                  * name entry. Try to replace it. */
2591                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2592                                                   tfid, cname);
2593                 if (rc == 0)
2594                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2595                                                         pfid, lh, type);
2596
2597                 GOTO(out, rc);
2598         }
2599
2600         if (fid_is_zero(pfid))
2601                 GOTO(out, rc = 0);
2602
2603         /* The ".." name entry is wrong, update it. */
2604         if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
2605                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2606                         *retry = true;
2607
2608                         GOTO(out, rc = 0);
2609                 }
2610
2611                 *type = LNIT_UNMATCHED_PAIRS;
2612                 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
2613                                                 lfsck_dto2fid(parent), cname);
2614         }
2615
2616         GOTO(out, rc);
2617
2618 out:
2619         if (parent != NULL && !IS_ERR(parent))
2620                 lfsck_object_put(env, parent);
2621
2622         return rc;
2623 }
2624
2625 /**
2626  * Double Scan Directory object for multiple linkEA entries case.
2627  *
2628  * The given @child has multiple linkEA entries. There is at most one linkEA
2629  * entry will be valid, all the others will be removed. Firstly, the function
2630  * will try to find out the linkEA entry for which the name entry exists under
2631  * the given parent (@pfid). If there is no linkEA entry that matches the given
2632  * ".." name entry, then tries to find out the first linkEA entry that both the
2633  * parent and the name entry exist to rebuild a new ".." name entry.
2634  *
2635  * \param[in] env       pointer to the thread context
2636  * \param[in] com       pointer to the lfsck component
2637  * \param[in] child     pointer to the directory to be double scanned
2638  * \param[in] pfid      the FID corresponding to the ".." entry
2639  * \param[in] ldata     pointer to the linkEA data for the given @child
2640  * \param[in,out] lh    ldlm lock handler for the given @child
2641  * \param[out] type     to tell the caller what the inconsistency is
2642  * \param[in] lpf       true if the ".." entry is under lost+found/MDTxxxx/
2643  *
2644  * \retval              positive number for repaired cases
2645  * \retval              0 if nothing to be repaired
2646  * \retval              negative error number on failure
2647  */
2648 static int
2649 lfsck_namespace_dsd_multiple(const struct lu_env *env,
2650                              struct lfsck_component *com,
2651                              struct dt_object *child,
2652                              const struct lu_fid *pfid,
2653                              struct linkea_data *ldata,
2654                              struct lustre_handle *lh,
2655                              enum lfsck_namespace_inconsistency_type *type,
2656                              bool lpf)
2657 {
2658         struct lfsck_thread_info *info          = lfsck_env_info(env);
2659         struct lu_name           *cname         = &info->lti_name;
2660         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2661         struct lu_fid            *tfid          = &info->lti_fid3;
2662         struct lu_fid            *pfid2         = &info->lti_fid4;
2663         struct lfsck_namespace   *ns            = com->lc_file_ram;
2664         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2665         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2666         struct dt_object         *parent        = NULL;
2667         struct linkea_data        ldata_new     = { 0 };
2668         int                       dirent_count  = 0;
2669         int                       linkea_count  = 0;
2670         int                       rc            = 0;
2671         bool                      once          = true;
2672         ENTRY;
2673
2674 again:
2675         while (ldata->ld_lee != NULL) {
2676                 lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid,
2677                                                     info->lti_key);
2678                 /* Drop repeated linkEA entries. */
2679                 lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true);
2680                 /* Drop invalid linkEA entry. */
2681                 if (!fid_is_sane(tfid)) {
2682                         linkea_del_buf(ldata, cname);
2683                         linkea_count++;
2684                         continue;
2685                 }
2686
2687                 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
2688                  * then it is possible that: the directry object has ever
2689                  * been lost, but its name entry was there. In the former
2690                  * LFSCK run, during the first-stage scanning, the LFSCK
2691                  * found the dangling name entry, but it did not recreate
2692                  * the lost object, and when moved to the second-stage
2693                  * scanning, some children objects of the lost directory
2694                  * object were found, then the LFSCK recreated such lost
2695                  * directory object as an orphan.
2696                  *
2697                  * When the LFSCK runs again, if the dangling name is still
2698                  * there, the LFSCK should move the orphan directory object
2699                  * back to the normal namespace. */
2700                 if (!lpf && !lu_fid_eq(pfid, tfid) && once) {
2701                         linkea_next_entry(ldata);
2702                         continue;
2703                 }
2704
2705                 parent = lfsck_object_find_bottom(env, lfsck, tfid);
2706                 if (IS_ERR(parent))
2707                         RETURN(PTR_ERR(parent));
2708
2709                 if (!dt_object_exists(parent)) {
2710                         lfsck_object_put(env, parent);
2711                         if (ldata->ld_leh->leh_reccount > 1) {
2712                                 /* If it is NOT the last linkEA entry, then
2713                                  * there is still other chance to make the
2714                                  * child to be visible via other parent, then
2715                                  * remove this linkEA entry. */
2716                                 linkea_del_buf(ldata, cname);
2717                                 linkea_count++;
2718                                 continue;
2719                         }
2720
2721                         break;
2722                 }
2723
2724                 /* The linkEA entry with bad parent will be removed. */
2725                 if (unlikely(!dt_try_as_dir(env, parent))) {
2726                         lfsck_object_put(env, parent);
2727                         linkea_del_buf(ldata, cname);
2728                         linkea_count++;
2729                         continue;
2730                 }
2731
2732                 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
2733                                (const struct dt_key *)cname->ln_name,
2734                                BYPASS_CAPA);
2735                 *pfid2 = *lfsck_dto2fid(parent);
2736                 if (rc == -ENOENT) {
2737                         lfsck_object_put(env, parent);
2738                         linkea_next_entry(ldata);
2739                         continue;
2740                 }
2741
2742                 if (rc != 0) {
2743                         lfsck_object_put(env, parent);
2744
2745                         RETURN(rc);
2746                 }
2747
2748                 if (lu_fid_eq(tfid, cfid)) {
2749                         lfsck_object_put(env, parent);
2750                         if (!lu_fid_eq(pfid, pfid2)) {
2751                                 *type = LNIT_UNMATCHED_PAIRS;
2752                                 rc = lfsck_namespace_repair_unmatched_pairs(env,
2753                                                 com, child, pfid2, cname);
2754
2755                                 RETURN(rc);
2756                         }
2757
2758 rebuild:
2759                         /* It is the most common case that we find the
2760                          * name entry corresponding to the linkEA entry
2761                          * that matches the ".." name entry. */
2762                         rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
2763                         if (rc != 0)
2764                                 RETURN(rc);
2765
2766                         rc = linkea_add_buf(&ldata_new, cname, pfid2);
2767                         if (rc != 0)
2768                                 RETURN(rc);
2769
2770                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2771                                                             &ldata_new);
2772                         if (rc < 0)
2773                                 RETURN(rc);
2774
2775                         linkea_del_buf(ldata, cname);
2776                         linkea_count++;
2777                         linkea_first_entry(ldata);
2778                         /* There may be some invalid dangling name entries under
2779                          * other parent directories, remove all of them. */
2780                         while (ldata->ld_lee != NULL) {
2781                                 lfsck_namespace_unpack_linkea_entry(ldata,
2782                                                 cname, tfid, info->lti_key);
2783                                 if (!fid_is_sane(tfid))
2784                                         goto next;
2785
2786                                 parent = lfsck_object_find_bottom(env, lfsck,
2787                                                                   tfid);
2788                                 if (IS_ERR(parent)) {
2789                                         rc = PTR_ERR(parent);
2790                                         if (rc != -ENOENT &&
2791                                             bk->lb_param & LPF_FAILOUT)
2792                                                 RETURN(rc);
2793
2794                                         goto next;
2795                                 }
2796
2797                                 if (!dt_object_exists(parent)) {
2798                                         lfsck_object_put(env, parent);
2799                                         goto next;
2800                                 }
2801
2802                                 rc = lfsck_namespace_repair_dirent(env, com,
2803                                         parent, child, cname->ln_name,
2804                                         cname->ln_name, S_IFDIR, false, true);
2805                                 lfsck_object_put(env, parent);
2806                                 if (rc < 0) {
2807                                         if (bk->lb_param & LPF_FAILOUT)
2808                                                 RETURN(rc);
2809
2810                                         goto next;
2811                                 }
2812
2813                                 dirent_count += rc;
2814
2815 next:
2816                                 linkea_del_buf(ldata, cname);
2817                         }
2818
2819                         ns->ln_dirent_repaired += dirent_count;
2820
2821                         RETURN(rc);
2822                 }
2823
2824                 lfsck_ibits_unlock(lh, LCK_EX);
2825                 /* The name entry references another MDT-object that may be
2826                  * created by the LFSCK for repairing dangling name entry.
2827                  * Try to replace it. */
2828                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2829                                                   tfid, cname);
2830                 lfsck_object_put(env, parent);
2831                 if (rc < 0)
2832                         RETURN(rc);
2833
2834                 if (rc > 0)
2835                         goto rebuild;
2836
2837                 linkea_del_buf(ldata, cname);
2838         }
2839
2840         linkea_first_entry(ldata);
2841         if (ldata->ld_leh->leh_reccount == 1) {
2842                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
2843                                                 lh, type, NULL);
2844
2845                 if (rc == 0 && fid_is_zero(pfid) && linkea_count > 0)
2846                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2847                                                             ldata);
2848
2849                 RETURN(rc);
2850         }
2851
2852         /* All linkEA entries are invalid and removed, then handle the @child
2853          * as an orphan.*/
2854         if (ldata->ld_leh->leh_reccount == 0) {
2855                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
2856                                                 type);
2857
2858                 RETURN(rc);
2859         }
2860
2861         /* If the dangling name entry for the orphan directory object has
2862          * been remvoed, then just check whether the directory object is
2863          * still under the .lustre/lost+found/MDTxxxx/ or not. */
2864         if (lpf) {
2865                 lpf = false;
2866                 goto again;
2867         }
2868
2869         /* There is no linkEA entry that matches the ".." name entry. Find
2870          * the first linkEA entry that both parent and name entry exist to
2871          * rebuild a new ".." name entry. */
2872         if (once) {
2873                 once = false;
2874                 goto again;
2875         }
2876
2877         RETURN(rc);
2878 }
2879
2880 /**
2881  * Repair the object's nlink attribute.
2882  *
2883  * If all the known name entries have been verified, then the object's hard
2884  * link attribute should match the object's linkEA entries count unless the
2885  * object's has too much hard link to be recorded in the linkEA. Such cases
2886  * should have been marked in the LFSCK trace file. Otherwise, trust the
2887  * linkEA to update the object's nlink attribute.
2888  *
2889  * \param[in] env       pointer to the thread context
2890  * \param[in] com       pointer to the lfsck component
2891  * \param[in] obj       pointer to the dt_object to be handled
2892  * \param[in,out] nlink pointer to buffer to object's hard lock count before
2893  *                      and after the repairing
2894  *
2895  * \retval              positive number for repaired cases
2896  * \retval              0 if nothing to be repaired
2897  * \retval              negative error number on failure
2898  */
2899 static int lfsck_namespace_repair_nlink(const struct lu_env *env,
2900                                         struct lfsck_component *com,
2901                                         struct dt_object *obj, __u32 *nlink)
2902 {
2903         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2904         struct lu_attr                  *la     = &info->lti_la3;
2905         struct lu_fid                   *tfid   = &info->lti_fid3;
2906         struct lfsck_namespace          *ns     = com->lc_file_ram;
2907         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2908         struct dt_device                *dev    = lfsck->li_bottom;
2909         const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
2910         struct dt_object                *child  = NULL;
2911         struct thandle                  *th     = NULL;
2912         struct linkea_data               ldata  = { 0 };
2913         struct lustre_handle             lh     = { 0 };
2914         __u32                            old    = *nlink;
2915         int                              rc     = 0;
2916         __u8                             flags;
2917         ENTRY;
2918
2919         LASSERT(!dt_object_remote(obj));
2920         LASSERT(S_ISREG(lfsck_object_type(obj)));
2921
2922         child = lfsck_object_find_by_dev(env, dev, cfid);
2923         if (IS_ERR(child))
2924                 GOTO(log, rc = PTR_ERR(child));
2925
2926         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
2927                               MDS_INODELOCK_UPDATE |
2928                               MDS_INODELOCK_XATTR, LCK_EX);
2929         if (rc != 0)
2930                 GOTO(log, rc);
2931
2932         th = dt_trans_create(env, dev);
2933         if (IS_ERR(th))
2934                 GOTO(log, rc = PTR_ERR(th));
2935
2936         la->la_valid = LA_NLINK;
2937         rc = dt_declare_attr_set(env, child, la, th);
2938         if (rc != 0)
2939                 GOTO(stop, rc);
2940
2941         rc = dt_trans_start_local(env, dev, th);
2942         if (rc != 0)
2943                 GOTO(stop, rc);
2944
2945         dt_write_lock(env, child, 0);
2946         /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2947          * ever tried to verify some remote MDT-object that resides on this
2948          * MDT, but this MDT failed to respond such request. So means there
2949          * may be some remote name entry on other MDT that references this
2950          * object with another name, so we cannot know whether this linkEA
2951          * is valid or not. So keep it there and maybe resolved when next
2952          * LFSCK run. */
2953         if (ns->ln_flags & LF_INCOMPLETE)
2954                 GOTO(unlock, rc = 0);
2955
2956         fid_cpu_to_be(tfid, cfid);
2957         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags,
2958                        (const struct dt_key *)tfid, BYPASS_CAPA);
2959         if (rc != 0)
2960                 GOTO(unlock, rc);
2961
2962         if (flags & LNTF_SKIP_NLINK)
2963                 GOTO(unlock, rc = 0);
2964
2965         rc = lfsck_links_read2(env, child, &ldata);
2966         if (rc == -ENODATA)
2967                 GOTO(unlock, rc = 0);
2968
2969         if (rc != 0)
2970                 GOTO(unlock, rc);
2971
2972         if (*nlink == ldata.ld_leh->leh_reccount)
2973                 GOTO(unlock, rc = 0);
2974
2975         la->la_nlink = *nlink = ldata.ld_leh->leh_reccount;
2976         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2977                 GOTO(unlock, rc = 1);
2978
2979         rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
2980
2981         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2982
2983 unlock:
2984         dt_write_unlock(env, child);
2985
2986 stop:
2987         dt_trans_stop(env, dev, th);
2988
2989 log:
2990         lfsck_ibits_unlock(&lh, LCK_EX);
2991         if (child != NULL && !IS_ERR(child))
2992                 lfsck_object_put(env, child);
2993
2994         CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
2995                "nlink count from %u to %u: rc = %d\n",
2996                lfsck_lfsck2name(lfsck), PFID(cfid), old, *nlink, rc);
2997
2998         if (rc != 0)
2999                 ns->ln_flags |= LF_INCONSISTENT;
3000
3001         return rc;
3002 }
3003
3004 /**
3005  * Double scan the directory object for namespace LFSCK.
3006  *
3007  * This function will verify the <parent, child> pairs in the namespace tree:
3008  * the parent references the child via some name entry that should be in the
3009  * child's linkEA entry, the child should back references the parent via its
3010  * ".." name entry.
3011  *
3012  * The LFSCK will scan every linkEA entry in turn until find out the first
3013  * matched pairs. If found, then all other linkEA entries will be dropped.
3014  * If all the linkEA entries cannot match the ".." name entry, then there
3015  * are serveral possible cases:
3016  *
3017  * 1) If there is only one linkEA entry, then trust it as long as the PFID
3018  *    in the linkEA entry is valid.
3019  *
3020  * 2) If there are multiple linkEA entries, then try to find the linkEA
3021  *    that matches the ".." name entry. If found, then all other entries
3022  *    are invalid; otherwise, it is quite possible that the ".." name entry
3023  *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
3024  *    entry according to the first valid linkEA entry (both the parent and
3025  *    the name entry should exist).
3026  *
3027  * 3) If the directory object has no (valid) linkEA entry, then the
3028  *    directory object will be handled as pure orphan and inserted
3029  *    in the .lustre/lost+found/MDTxxxx/ with the name:
3030  *    ${self_FID}-${PFID}-D-${conflict_version}
3031  *
3032  * \param[in] env       pointer to the thread context
3033  * \param[in] com       pointer to the lfsck component
3034  * \param[in] child     pointer to the directory object to be handled