Whamcloud - gitweb
6e29717e5c0ffe32c25381cffb76c45d49e89079
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC_V1        0xA0629D03
45 #define LFSCK_NAMESPACE_MAGIC_V2        0xA0621A0B
46
47 /* For Lustre-2.x (x <= 6), the namespace LFSCK used LFSCK_NAMESPACE_MAGIC_V1
48  * as the trace file magic. When downgrade to such old release, the old LFSCK
49  * will not recognize the new LFSCK_NAMESPACE_MAGIC_V2 in the new trace file,
50  * then it will reset the whole LFSCK, and will not cause start failure. The
51  * similar case will happen when upgrade from such old release. */
52 #define LFSCK_NAMESPACE_MAGIC           LFSCK_NAMESPACE_MAGIC_V2
53
54 enum lfsck_nameentry_check {
55         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
56         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
57         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
58 };
59
60 static struct lfsck_namespace_req *
61 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
62                                    struct lu_dirent *ent, __u16 type)
63 {
64         struct lfsck_namespace_req *lnr;
65         int                         size;
66
67         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
68         OBD_ALLOC(lnr, size);
69         if (lnr == NULL)
70                 return ERR_PTR(-ENOMEM);
71
72         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
73         lnr->lnr_obj = lfsck_object_get(lfsck->li_obj_dir);
74         lnr->lnr_lmv = lfsck_lmv_get(lfsck->li_lmv);
75         lnr->lnr_fid = ent->lde_fid;
76         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
77         lnr->lnr_dir_cookie = ent->lde_hash;
78         lnr->lnr_attr = ent->lde_attrs;
79         lnr->lnr_size = size;
80         lnr->lnr_type = type;
81         lnr->lnr_namelen = ent->lde_namelen;
82         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
83
84         return lnr;
85 }
86
87 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
88                                                struct lfsck_assistant_req *lar)
89 {
90         struct lfsck_namespace_req *lnr =
91                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
92
93         if (lnr->lnr_lmv != NULL)
94                 lfsck_lmv_put(env, lnr->lnr_lmv);
95
96         lu_object_put(env, &lnr->lnr_obj->do_lu);
97         OBD_FREE(lnr, lnr->lnr_size);
98 }
99
100 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
101                                       struct lfsck_namespace *src)
102 {
103         dst->ln_magic = le32_to_cpu(src->ln_magic);
104         dst->ln_status = le32_to_cpu(src->ln_status);
105         dst->ln_flags = le32_to_cpu(src->ln_flags);
106         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
107         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
108         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
109         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
110         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
111         dst->ln_time_last_checkpoint =
112                                 le64_to_cpu(src->ln_time_last_checkpoint);
113         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
114                                  &src->ln_pos_latest_start);
115         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
116                                  &src->ln_pos_last_checkpoint);
117         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
118                                  &src->ln_pos_first_inconsistent);
119         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
120         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
121         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
122         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
123         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
124         dst->ln_objs_repaired_phase2 =
125                                 le64_to_cpu(src->ln_objs_repaired_phase2);
126         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
127         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
128         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
129                       &src->ln_fid_latest_scanned_phase2);
130         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
131         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
132         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
133         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
134         dst->ln_unknown_inconsistency =
135                                 le64_to_cpu(src->ln_unknown_inconsistency);
136         dst->ln_unmatched_pairs_repaired =
137                                 le64_to_cpu(src->ln_unmatched_pairs_repaired);
138         dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
139         dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
140         dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
141         dst->ln_lost_dirent_repaired =
142                                 le64_to_cpu(src->ln_lost_dirent_repaired);
143         dst->ln_striped_dirs_scanned =
144                                 le64_to_cpu(src->ln_striped_dirs_scanned);
145         dst->ln_striped_dirs_repaired =
146                                 le64_to_cpu(src->ln_striped_dirs_repaired);
147         dst->ln_striped_dirs_failed =
148                                 le64_to_cpu(src->ln_striped_dirs_failed);
149         dst->ln_striped_dirs_disabled =
150                                 le64_to_cpu(src->ln_striped_dirs_disabled);
151         dst->ln_striped_dirs_skipped =
152                                 le64_to_cpu(src->ln_striped_dirs_skipped);
153         dst->ln_striped_shards_scanned =
154                                 le64_to_cpu(src->ln_striped_shards_scanned);
155         dst->ln_striped_shards_repaired =
156                                 le64_to_cpu(src->ln_striped_shards_repaired);
157         dst->ln_striped_shards_failed =
158                                 le64_to_cpu(src->ln_striped_shards_failed);
159         dst->ln_striped_shards_skipped =
160                                 le64_to_cpu(src->ln_striped_shards_skipped);
161         dst->ln_name_hash_repaired = le64_to_cpu(src->ln_name_hash_repaired);
162         dst->ln_local_lpf_scanned = le64_to_cpu(src->ln_local_lpf_scanned);
163         dst->ln_local_lpf_moved = le64_to_cpu(src->ln_local_lpf_moved);
164         dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped);
165         dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed);
166         dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size);
167 }
168
169 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
170                                       struct lfsck_namespace *src)
171 {
172         dst->ln_magic = cpu_to_le32(src->ln_magic);
173         dst->ln_status = cpu_to_le32(src->ln_status);
174         dst->ln_flags = cpu_to_le32(src->ln_flags);
175         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
176         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
177         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
178         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
179         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
180         dst->ln_time_last_checkpoint =
181                                 cpu_to_le64(src->ln_time_last_checkpoint);
182         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
183                                  &src->ln_pos_latest_start);
184         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
185                                  &src->ln_pos_last_checkpoint);
186         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
187                                  &src->ln_pos_first_inconsistent);
188         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
189         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
190         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
191         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
192         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
193         dst->ln_objs_repaired_phase2 =
194                                 cpu_to_le64(src->ln_objs_repaired_phase2);
195         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
196         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
197         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
198                       &src->ln_fid_latest_scanned_phase2);
199         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
200         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
201         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
202         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
203         dst->ln_unknown_inconsistency =
204                                 cpu_to_le64(src->ln_unknown_inconsistency);
205         dst->ln_unmatched_pairs_repaired =
206                                 cpu_to_le64(src->ln_unmatched_pairs_repaired);
207         dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
208         dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
209         dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
210         dst->ln_lost_dirent_repaired =
211                                 cpu_to_le64(src->ln_lost_dirent_repaired);
212         dst->ln_striped_dirs_scanned =
213                                 cpu_to_le64(src->ln_striped_dirs_scanned);
214         dst->ln_striped_dirs_repaired =
215                                 cpu_to_le64(src->ln_striped_dirs_repaired);
216         dst->ln_striped_dirs_failed =
217                                 cpu_to_le64(src->ln_striped_dirs_failed);
218         dst->ln_striped_dirs_disabled =
219                                 cpu_to_le64(src->ln_striped_dirs_disabled);
220         dst->ln_striped_dirs_skipped =
221                                 cpu_to_le64(src->ln_striped_dirs_skipped);
222         dst->ln_striped_shards_scanned =
223                                 cpu_to_le64(src->ln_striped_shards_scanned);
224         dst->ln_striped_shards_repaired =
225                                 cpu_to_le64(src->ln_striped_shards_repaired);
226         dst->ln_striped_shards_failed =
227                                 cpu_to_le64(src->ln_striped_shards_failed);
228         dst->ln_striped_shards_skipped =
229                                 cpu_to_le64(src->ln_striped_shards_skipped);
230         dst->ln_name_hash_repaired = cpu_to_le64(src->ln_name_hash_repaired);
231         dst->ln_local_lpf_scanned = cpu_to_le64(src->ln_local_lpf_scanned);
232         dst->ln_local_lpf_moved = cpu_to_le64(src->ln_local_lpf_moved);
233         dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped);
234         dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed);
235         dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size);
236 }
237
238 static void lfsck_namespace_record_failure(const struct lu_env *env,
239                                            struct lfsck_instance *lfsck,
240                                            struct lfsck_namespace *ns)
241 {
242         struct lfsck_position pos;
243
244         ns->ln_items_failed++;
245         lfsck_pos_fill(env, lfsck, &pos, false);
246         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
247             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
248                 ns->ln_pos_first_inconsistent = pos;
249
250                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
251                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
252                        lfsck_lfsck2name(lfsck),
253                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
254                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
255                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
256         }
257 }
258
259 /**
260  * Load the MDT bitmap from the lfsck_namespace trace file.
261  *
262  * \param[in] env       pointer to the thread context
263  * \param[in] com       pointer to the lfsck component
264  *
265  * \retval              0 for success
266  * \retval              negative error number on failure or data corruption
267  */
268 static int lfsck_namespace_load_bitmap(const struct lu_env *env,
269                                        struct lfsck_component *com)
270 {
271         struct dt_object                *obj    = com->lc_obj;
272         struct lfsck_assistant_data     *lad    = com->lc_data;
273         struct lfsck_namespace          *ns     = com->lc_file_ram;
274         cfs_bitmap_t                    *bitmap = lad->lad_bitmap;
275         ssize_t                          size;
276         __u32                            nbits;
277         int                              rc;
278         ENTRY;
279
280         if (com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size >
281             ns->ln_bitmap_size)
282                 nbits = com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size;
283         else
284                 nbits = ns->ln_bitmap_size;
285
286         if (unlikely(nbits < BITS_PER_LONG))
287                 nbits = BITS_PER_LONG;
288
289         if (nbits > bitmap->size) {
290                 __u32 new_bits = bitmap->size;
291                 cfs_bitmap_t *new_bitmap;
292
293                 while (new_bits < nbits)
294                         new_bits <<= 1;
295
296                 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
297                 if (new_bitmap == NULL)
298                         RETURN(-ENOMEM);
299
300                 lad->lad_bitmap = new_bitmap;
301                 CFS_FREE_BITMAP(bitmap);
302                 bitmap = new_bitmap;
303         }
304
305         if (ns->ln_bitmap_size == 0) {
306                 lad->lad_incomplete = 0;
307                 CFS_RESET_BITMAP(bitmap);
308
309                 RETURN(0);
310         }
311
312         size = (ns->ln_bitmap_size + 7) >> 3;
313         rc = dt_xattr_get(env, obj,
314                           lfsck_buf_get(env, bitmap->data, size),
315                           XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA);
316         if (rc != size)
317                 RETURN(rc >= 0 ? -EINVAL : rc);
318
319         if (cfs_bitmap_check_empty(bitmap))
320                 lad->lad_incomplete = 0;
321         else
322                 lad->lad_incomplete = 1;
323
324         RETURN(0);
325 }
326
327 /**
328  * Load namespace LFSCK statistics information from the trace file.
329  *
330  * For old release (Lustre-2.6 or older), the statistics information was
331  * stored as XATTR_NAME_LFSCK_NAMESPACE_OLD EA. But in Lustre-2.7, we need
332  * more statistics information. To avoid confusing old MDT when downgrade,
333  * Lustre-2.7 stores the namespace LFSCK statistics information as new
334  * XATTR_NAME_LFSCK_NAMESPACE EA.
335  *
336  * \param[in] env       pointer to the thread context
337  * \param[in] com       pointer to the lfsck component
338  *
339  * \retval              0 for success
340  * \retval              negative error number on failure
341  */
342 static int lfsck_namespace_load(const struct lu_env *env,
343                                 struct lfsck_component *com)
344 {
345         int len = com->lc_file_size;
346         int rc;
347
348         rc = dt_xattr_get(env, com->lc_obj,
349                           lfsck_buf_get(env, com->lc_file_disk, len),
350                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
351         if (rc == len) {
352                 struct lfsck_namespace *ns = com->lc_file_ram;
353
354                 lfsck_namespace_le_to_cpu(ns,
355                                 (struct lfsck_namespace *)com->lc_file_disk);
356                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
357                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
358                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
359                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
360                         rc = -ESTALE;
361                 } else {
362                         rc = 0;
363                 }
364         } else if (rc != -ENODATA) {
365                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
366                        "expected = %d: rc = %d\n",
367                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
368                 if (rc >= 0)
369                         rc = -ESTALE;
370         } else {
371                 /* Check whether it is old trace file or not.
372                  * If yes, it should be reset via returning -ESTALE. */
373                 rc = dt_xattr_get(env, com->lc_obj,
374                                   lfsck_buf_get(env, com->lc_file_disk, len),
375                                   XATTR_NAME_LFSCK_NAMESPACE_OLD, BYPASS_CAPA);
376                 if (rc >= 0)
377                         rc = -ESTALE;
378         }
379
380         return rc;
381 }
382
383 static int lfsck_namespace_store(const struct lu_env *env,
384                                  struct lfsck_component *com, bool init)
385 {
386         struct dt_object                *obj    = com->lc_obj;
387         struct lfsck_instance           *lfsck  = com->lc_lfsck;
388         struct lfsck_namespace          *ns     = com->lc_file_ram;
389         struct lfsck_assistant_data     *lad    = com->lc_data;
390         cfs_bitmap_t                    *bitmap = NULL;
391         struct thandle                  *handle;
392         __u32                            nbits  = 0;
393         int                              len    = com->lc_file_size;
394         int                              rc;
395 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
396         struct lu_buf            tbuf   = { &len, sizeof(len) };
397 #endif
398         ENTRY;
399
400         if (lad != NULL) {
401                 bitmap = lad->lad_bitmap;
402                 nbits = bitmap->size;
403
404                 LASSERT(nbits > 0);
405                 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
406         }
407
408         ns->ln_bitmap_size = nbits;
409         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
410                                   ns);
411         handle = dt_trans_create(env, lfsck->li_bottom);
412         if (IS_ERR(handle))
413                 GOTO(log, rc = PTR_ERR(handle));
414
415         rc = dt_declare_xattr_set(env, obj,
416                                   lfsck_buf_get(env, com->lc_file_disk, len),
417                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
418         if (rc != 0)
419                 GOTO(out, rc);
420
421         if (bitmap != NULL) {
422                 rc = dt_declare_xattr_set(env, obj,
423                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
424                                 XATTR_NAME_LFSCK_BITMAP, 0, handle);
425                 if (rc != 0)
426                         GOTO(out, rc);
427         }
428
429 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
430         /* To be compatible with old Lustre-2.x MDT (x <= 6), generate dummy
431          * XATTR_NAME_LFSCK_NAMESPACE_OLD EA, then when downgrade to Lustre-2.x,
432          * the old LFSCK will find "invalid" XATTR_NAME_LFSCK_NAMESPACE_OLD EA,
433          * then reset the namespace LFSCK trace file. */
434         if (init) {
435                 rc = dt_declare_xattr_set(env, obj, &tbuf,
436                                           XATTR_NAME_LFSCK_NAMESPACE_OLD,
437                                           LU_XATTR_CREATE, handle);
438                 if (rc != 0)
439                         GOTO(out, rc);
440         }
441 #endif
442
443         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
444         if (rc != 0)
445                 GOTO(out, rc);
446
447         rc = dt_xattr_set(env, obj,
448                           lfsck_buf_get(env, com->lc_file_disk, len),
449                           XATTR_NAME_LFSCK_NAMESPACE, 0, handle, BYPASS_CAPA);
450         if (rc == 0 && bitmap != NULL)
451                 rc = dt_xattr_set(env, obj,
452                                   lfsck_buf_get(env, bitmap->data, nbits >> 3),
453                                   XATTR_NAME_LFSCK_BITMAP, 0, handle,
454                                   BYPASS_CAPA);
455
456 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
457         if (rc == 0 && init)
458                 rc = dt_xattr_set(env, obj, &tbuf,
459                                   XATTR_NAME_LFSCK_NAMESPACE_OLD,
460                                   LU_XATTR_CREATE, handle, BYPASS_CAPA);
461 #endif
462
463         GOTO(out, rc);
464
465 out:
466         dt_trans_stop(env, lfsck->li_bottom, handle);
467
468 log:
469         if (rc != 0)
470                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
471                        lfsck_lfsck2name(lfsck), rc);
472         return rc;
473 }
474
475 static struct dt_object *
476 lfsck_namespace_load_one_trace_file(const struct lu_env *env,
477                                     struct lfsck_component *com,
478                                     struct dt_object *parent,
479                                     const char *name,
480                                     const struct dt_index_features *ft,
481                                     bool reset)
482 {
483         struct lfsck_instance   *lfsck = com->lc_lfsck;
484         struct dt_object        *obj;
485         int                      rc;
486
487         if (reset) {
488                 rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
489                 if (rc != 0 && rc != -ENOENT)
490                         return ERR_PTR(rc);
491         }
492
493         if (ft != NULL)
494                 obj = local_index_find_or_create(env, lfsck->li_los, parent,
495                                         name, S_IFREG | S_IRUGO | S_IWUSR, ft);
496         else
497                 obj = local_file_find_or_create(env, lfsck->li_los, parent,
498                                         name, S_IFREG | S_IRUGO | S_IWUSR);
499
500         return obj;
501 }
502
503 static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env,
504                                                 struct lfsck_component *com,
505                                                 bool reset)
506 {
507         char                            *name = lfsck_env_info(env)->lti_key;
508         struct lfsck_sub_trace_obj      *lsto;
509         struct dt_object                *obj;
510         int                              rc;
511         int                              i;
512
513         for (i = 0, lsto = &com->lc_sub_trace_objs[0];
514              i < LFSCK_STF_COUNT; i++, lsto++) {
515                 snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i);
516                 if (lsto->lsto_obj != NULL) {
517                         if (!reset)
518                                 continue;
519
520                         lu_object_put(env, &lsto->lsto_obj->do_lu);
521                         lsto->lsto_obj = NULL;
522                 }
523
524                 obj = lfsck_namespace_load_one_trace_file(env, com,
525                                         com->lc_lfsck->li_lfsck_dir,
526                                         name, &dt_lfsck_features, reset);
527                 if (IS_ERR(obj))
528                         return PTR_ERR(obj);
529
530                 lsto->lsto_obj = obj;
531                 rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
532                 if (rc != 0)
533                         return rc;
534         }
535
536         return 0;
537 }
538
539 static int lfsck_namespace_init(const struct lu_env *env,
540                                 struct lfsck_component *com)
541 {
542         struct lfsck_namespace *ns = com->lc_file_ram;
543         int rc;
544
545         memset(ns, 0, sizeof(*ns));
546         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
547         ns->ln_status = LS_INIT;
548         down_write(&com->lc_sem);
549         rc = lfsck_namespace_store(env, com, true);
550         up_write(&com->lc_sem);
551         if (rc == 0)
552                 rc = lfsck_namespace_load_sub_trace_files(env, com, true);
553
554         return rc;
555 }
556
557 /**
558  * Update the namespace LFSCK trace file for the given @fid
559  *
560  * \param[in] env       pointer to the thread context
561  * \param[in] com       pointer to the lfsck component
562  * \param[in] fid       the fid which flags to be updated in the lfsck
563  *                      trace file
564  * \param[in] add       true if add new flags, otherwise remove flags
565  *
566  * \retval              0 for succeed or nothing to be done
567  * \retval              negative error number on failure
568  */
569 int lfsck_namespace_trace_update(const struct lu_env *env,
570                                  struct lfsck_component *com,
571                                  const struct lu_fid *fid,
572                                  const __u8 flags, bool add)
573 {
574         struct lfsck_instance   *lfsck  = com->lc_lfsck;
575         struct dt_object        *obj;
576         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
577         struct dt_device        *dev    = lfsck->li_bottom;
578         struct thandle          *th     = NULL;
579         int                      idx;
580         int                      rc     = 0;
581         __u8                     old    = 0;
582         __u8                     new    = 0;
583         ENTRY;
584
585         LASSERT(flags != 0);
586
587         if (unlikely(!fid_is_sane(fid)))
588                 RETURN(0);
589
590         idx = lfsck_sub_trace_file_fid2idx(fid);
591         obj = com->lc_sub_trace_objs[idx].lsto_obj;
592         mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
593         fid_cpu_to_be(key, fid);
594         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
595                        (const struct dt_key *)key, BYPASS_CAPA);
596         if (rc == -ENOENT) {
597                 if (!add)
598                         GOTO(unlock, rc = 0);
599
600                 old = 0;
601                 new = flags;
602         } else if (rc == 0) {
603                 if (add) {
604                         if ((old & flags) == flags)
605                                 GOTO(unlock, rc = 0);
606
607                         new = old | flags;
608                 } else {
609                         if ((old & flags) == 0)
610                                 GOTO(unlock, rc = 0);
611
612                         new = old & ~flags;
613                 }
614         } else {
615                 GOTO(log, rc);
616         }
617
618         th = dt_trans_create(env, dev);
619         if (IS_ERR(th))
620                 GOTO(log, rc = PTR_ERR(th));
621
622         if (old != 0) {
623                 rc = dt_declare_delete(env, obj,
624                                        (const struct dt_key *)key, th);
625                 if (rc != 0)
626                         GOTO(log, rc);
627         }
628
629         if (new != 0) {
630                 rc = dt_declare_insert(env, obj,
631                                        (const struct dt_rec *)&new,
632                                        (const struct dt_key *)key, th);
633                 if (rc != 0)
634                         GOTO(log, rc);
635         }
636
637         rc = dt_trans_start_local(env, dev, th);
638         if (rc != 0)
639                 GOTO(log, rc);
640
641         if (old != 0) {
642                 rc = dt_delete(env, obj, (const struct dt_key *)key,
643                                th, BYPASS_CAPA);
644                 if (rc != 0)
645                         GOTO(log, rc);
646         }
647
648         if (new != 0) {
649                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
650                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
651                 if (rc != 0)
652                         GOTO(log, rc);
653         }
654
655         GOTO(log, rc);
656
657 log:
658         if (th != NULL && !IS_ERR(th))
659                 dt_trans_stop(env, dev, th);
660
661         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
662                "trace file, flags %x, old %x, new %x: rc = %d\n",
663                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
664                (__u32)flags, (__u32)old, (__u32)new, rc);
665
666 unlock:
667         mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
668
669         return rc;
670 }
671
672 int lfsck_namespace_check_exist(const struct lu_env *env,
673                                 struct dt_object *dir,
674                                 struct dt_object *obj, const char *name)
675 {
676         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
677         int               rc;
678         ENTRY;
679
680         if (unlikely(lfsck_is_dead_obj(obj)))
681                 RETURN(LFSCK_NAMEENTRY_DEAD);
682
683         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
684                        (const struct dt_key *)name, BYPASS_CAPA);
685         if (rc == -ENOENT)
686                 RETURN(LFSCK_NAMEENTRY_REMOVED);
687
688         if (rc < 0)
689                 RETURN(rc);
690
691         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
692                 RETURN(LFSCK_NAMEENTRY_RECREATED);
693
694         RETURN(0);
695 }
696
697 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
698                                             struct dt_object *obj,
699                                             struct thandle *handle)
700 {
701         int rc;
702
703         /* For destroying all invalid linkEA entries. */
704         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
705         if (rc != 0)
706                 return rc;
707
708         /* For insert new linkEA entry. */
709         rc = dt_declare_xattr_set(env, obj,
710                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
711                         XATTR_NAME_LINK, 0, handle);
712         return rc;
713 }
714
715 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
716                        struct linkea_data *ldata)
717 {
718         int rc;
719
720         if (ldata->ld_buf->lb_buf == NULL)
721                 return -ENOMEM;
722
723         if (!dt_object_exists(obj))
724                 return -ENOENT;
725
726         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
727         if (rc == -ERANGE) {
728                 /* Buf was too small, figure out what we need. */
729                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
730                                   BYPASS_CAPA);
731                 if (rc <= 0)
732                         return rc;
733
734                 lu_buf_realloc(ldata->ld_buf, rc);
735                 if (ldata->ld_buf->lb_buf == NULL)
736                         return -ENOMEM;
737
738                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
739                                   BYPASS_CAPA);
740         }
741
742         if (rc > 0)
743                 rc = linkea_init(ldata);
744
745         return rc;
746 }
747
748 /**
749  * Remove linkEA for the given object.
750  *
751  * The caller should take the ldlm lock before the calling.
752  *
753  * \param[in] env       pointer to the thread context
754  * \param[in] com       pointer to the lfsck component
755  * \param[in] obj       pointer to the dt_object to be handled
756  *
757  * \retval              0 for repaired cases
758  * \retval              negative error number on failure
759  */
760 static int lfsck_namespace_links_remove(const struct lu_env *env,
761                                         struct lfsck_component *com,
762                                         struct dt_object *obj)
763 {
764         struct lfsck_instance           *lfsck  = com->lc_lfsck;
765         struct dt_device                *dev    = lfsck->li_bottom;
766         struct thandle                  *th     = NULL;
767         int                              rc     = 0;
768         ENTRY;
769
770         LASSERT(dt_object_remote(obj) == 0);
771
772         th = dt_trans_create(env, dev);
773         if (IS_ERR(th))
774                 GOTO(log, rc = PTR_ERR(th));
775
776         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
777         if (rc != 0)
778                 GOTO(stop, rc);
779
780         rc = dt_trans_start_local(env, dev, th);
781         if (rc != 0)
782                 GOTO(stop, rc);
783
784         dt_write_lock(env, obj, 0);
785         if (unlikely(lfsck_is_dead_obj(obj)))
786                 GOTO(unlock, rc = -ENOENT);
787
788         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
789                 GOTO(unlock, rc = 0);
790
791         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
792
793         GOTO(unlock, rc);
794
795 unlock:
796         dt_write_unlock(env, obj);
797
798 stop:
799         dt_trans_stop(env, dev, th);
800
801 log:
802         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
803                "for the object "DFID": rc = %d\n",
804                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
805
806         if (rc == 0) {
807                 struct lfsck_namespace *ns = com->lc_file_ram;
808
809                 ns->ln_flags |= LF_INCONSISTENT;
810         }
811
812         return rc;
813 }
814
815 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
816                              struct linkea_data *ldata, struct thandle *handle)
817 {
818         const struct lu_buf *buf = lfsck_buf_get_const(env,
819                                                        ldata->ld_buf->lb_buf,
820                                                        ldata->ld_leh->leh_len);
821
822         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
823                             BYPASS_CAPA);
824 }
825
826 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
827                                                 struct lu_name *cname,
828                                                 struct lu_fid *pfid,
829                                                 char *buf)
830 {
831         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
832         /* To guarantee the 'name' is terminated with '0'. */
833         memcpy(buf, cname->ln_name, cname->ln_namelen);
834         buf[cname->ln_namelen] = 0;
835         cname->ln_name = buf;
836 }
837
838 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
839                                                struct lu_name *cname,
840                                                struct lu_fid *pfid,
841                                                bool remove)
842 {
843         struct link_ea_entry    *oldlee;
844         int                      oldlen;
845         int                      repeated = 0;
846
847         oldlee = ldata->ld_lee;
848         oldlen = ldata->ld_reclen;
849         linkea_next_entry(ldata);
850         while (ldata->ld_lee != NULL) {
851                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
852                                    ldata->ld_lee->lee_reclen[1];
853                 if (unlikely(ldata->ld_reclen == oldlen &&
854                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
855                         repeated++;
856                         if (!remove)
857                                 break;
858
859                         linkea_del_buf(ldata, cname);
860                 } else {
861                         linkea_next_entry(ldata);
862                 }
863         }
864         ldata->ld_lee = oldlee;
865         ldata->ld_reclen = oldlen;
866
867         return repeated;
868 }
869
870 /**
871  * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
872  *
873  * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
874  * with the given type to generate the name, the detailed rules for name
875  * have been described as following.
876  *
877  * The function also generates the linkEA corresponding to the name entry
878  * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
879  *
880  * \param[in] env       pointer to the thread context
881  * \param[in] com       pointer to the lfsck component
882  * \param[in] orphan    pointer to the orphan MDT-object
883  * \param[in] infix     additional information for the orphan name, such as
884  *                      the FID for original
885  * \param[in] type      the type for describing why the orphan MDT-object is
886  *                      created. The rules are as following:
887  *
888  *  type "D":           The MDT-object is a directory, it may knows its parent
889  *                      but because there is no valid linkEA, the LFSCK cannot
890  *                      know where to put it back to the namespace.
891  *  type "O":           The MDT-object has no linkEA, and there is no name
892  *                      entry that references the MDT-object.
893  *
894  *  type "S":           The orphan MDT-object is a shard of a striped directory
895  *
896  * \see lfsck_layout_recreate_parent() for more types.
897  *
898  * The orphan name will be like:
899  * ${FID}-${infix}-${type}-${conflict_version}
900  *
901  * \param[out] count    if some others inserted some linkEA entries by race,
902  *                      then return the linkEA entries count.
903  *
904  * \retval              positive number for repaired cases
905  * \retval              0 if needs to repair nothing
906  * \retval              negative error number on failure
907  */
908 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
909                                          struct lfsck_component *com,
910                                          struct dt_object *orphan,
911                                          const char *infix, const char *type,
912                                          int *count)
913 {
914         struct lfsck_thread_info        *info   = lfsck_env_info(env);
915         struct lu_name                  *cname  = &info->lti_name;
916         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
917         struct lu_attr                  *la     = &info->lti_la3;
918         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
919         const struct lu_fid             *pfid;
920         struct lu_fid                    tfid;
921         struct lfsck_instance           *lfsck  = com->lc_lfsck;
922         struct dt_device                *dev    = lfsck->li_bottom;
923         struct dt_object                *parent;
924         struct thandle                  *th     = NULL;
925         struct lustre_handle             plh    = { 0 };
926         struct lustre_handle             clh    = { 0 };
927         struct linkea_data               ldata  = { NULL };
928         struct lu_buf                    linkea_buf;
929         int                              namelen;
930         int                              idx    = 0;
931         int                              rc     = 0;
932         bool                             exist  = false;
933         ENTRY;
934
935         cname->ln_name = NULL;
936         if (unlikely(lfsck->li_lpf_obj == NULL))
937                 GOTO(log, rc = -ENXIO);
938
939         parent = lfsck->li_lpf_obj;
940         pfid = lfsck_dto2fid(parent);
941
942         /* Hold update lock on the parent to prevent others to access. */
943         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
944                               MDS_INODELOCK_UPDATE, LCK_EX);
945         if (rc != 0)
946                 GOTO(log, rc);
947
948         do {
949                 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
950                                    PFID(cfid), infix, type, idx++);
951                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
952                                (const struct dt_key *)info->lti_key,
953                                BYPASS_CAPA);
954                 if (rc != 0 && rc != -ENOENT)
955                         GOTO(log, rc);
956
957                 if (unlikely(rc == 0 && lu_fid_eq(cfid, &tfid)))
958                         exist = true;
959         } while (rc == 0 && !exist);
960
961         cname->ln_name = info->lti_key;
962         cname->ln_namelen = namelen;
963         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
964         if (rc != 0)
965                 GOTO(log, rc);
966
967         rc = linkea_add_buf(&ldata, cname, pfid);
968         if (rc != 0)
969                 GOTO(log, rc);
970
971         rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
972                               MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
973                               LCK_EX);
974         if (rc != 0)
975                 GOTO(log, rc);
976
977         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
978                        ldata.ld_leh->leh_len);
979         th = dt_trans_create(env, dev);
980         if (IS_ERR(th))
981                 GOTO(log, rc = PTR_ERR(th));
982
983         if (S_ISDIR(lfsck_object_type(orphan))) {
984                 rc = dt_declare_delete(env, orphan,
985                                        (const struct dt_key *)dotdot, th);
986                 if (rc != 0)
987                         GOTO(stop, rc);
988
989                 rec->rec_type = S_IFDIR;
990                 rec->rec_fid = pfid;
991                 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
992                                        (const struct dt_key *)dotdot, th);
993                 if (rc != 0)
994                         GOTO(stop, rc);
995         }
996
997         rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
998                                   XATTR_NAME_LINK, 0, th);
999         if (rc != 0)
1000                 GOTO(stop, rc);
1001
1002         if (!exist) {
1003                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
1004                 rec->rec_fid = cfid;
1005                 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1006                                        (const struct dt_key *)cname->ln_name,
1007                                        th);
1008                 if (rc != 0)
1009                         GOTO(stop, rc);
1010
1011                 if (S_ISDIR(rec->rec_type)) {
1012                         rc = dt_declare_ref_add(env, parent, th);
1013                         if (rc != 0)
1014                                 GOTO(stop, rc);
1015                 }
1016         }
1017
1018         memset(la, 0, sizeof(*la));
1019         la->la_ctime = cfs_time_current_sec();
1020         la->la_valid = LA_CTIME;
1021         rc = dt_declare_attr_set(env, orphan, la, th);
1022         if (rc != 0)
1023                 GOTO(stop, rc);
1024
1025         rc = dt_trans_start_local(env, dev, th);
1026         if (rc != 0)
1027                 GOTO(stop, rc);
1028
1029         dt_write_lock(env, orphan, 0);
1030         rc = lfsck_links_read(env, orphan, &ldata);
1031         if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
1032                    (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
1033                 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1034                         GOTO(unlock, rc = 1);
1035
1036                 if (S_ISDIR(lfsck_object_type(orphan))) {
1037                         rc = dt_delete(env, orphan,
1038                                        (const struct dt_key *)dotdot, th,
1039                                        BYPASS_CAPA);
1040                         if (rc != 0)
1041                                 GOTO(unlock, rc);
1042
1043                         rec->rec_type = S_IFDIR;
1044                         rec->rec_fid = pfid;
1045                         rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
1046                                        (const struct dt_key *)dotdot, th,
1047                                        BYPASS_CAPA, 1);
1048                         if (rc != 0)
1049                                 GOTO(unlock, rc);
1050                 }
1051
1052                 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
1053                                   th, BYPASS_CAPA);
1054         } else {
1055                 if (rc == 0 && count != NULL)
1056                         *count = ldata.ld_leh->leh_reccount;
1057
1058                 GOTO(unlock, rc);
1059         }
1060         dt_write_unlock(env, orphan);
1061
1062         if (rc == 0 && !exist) {
1063                 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
1064                 rec->rec_fid = cfid;
1065                 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1066                                (const struct dt_key *)cname->ln_name,
1067                                th, BYPASS_CAPA, 1);
1068                 if (rc == 0 && S_ISDIR(rec->rec_type)) {
1069                         dt_write_lock(env, parent, 0);
1070                         rc = dt_ref_add(env, parent, th);
1071                         dt_write_unlock(env, parent);
1072                 }
1073         }
1074
1075         if (rc == 0)
1076                 rc = dt_attr_set(env, orphan, la, th, BYPASS_CAPA);
1077
1078         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1079
1080 unlock:
1081         dt_write_unlock(env, orphan);
1082
1083 stop:
1084         dt_trans_stop(env, dev, th);
1085
1086 log:
1087         lfsck_ibits_unlock(&clh, LCK_EX);
1088         lfsck_ibits_unlock(&plh, LCK_EX);
1089         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
1090                "object "DFID", name = %s: rc = %d\n",
1091                lfsck_lfsck2name(lfsck), PFID(cfid),
1092                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1093
1094         if (rc != 0) {
1095                 struct lfsck_namespace *ns = com->lc_file_ram;
1096
1097                 ns->ln_flags |= LF_INCONSISTENT;
1098         }
1099
1100         return rc;
1101 }
1102
1103 /**
1104  * Add the specified name entry back to namespace.
1105  *
1106  * If there is a linkEA entry that back references a name entry under
1107  * some parent directory, but such parent directory does not have the
1108  * claimed name entry. On the other hand, the linkEA entries count is
1109  * not larger than the MDT-object's hard link count. Under such case,
1110  * it is quite possible that the name entry is lost. Then the LFSCK
1111  * should add the name entry back to the namespace.
1112  *
1113  * \param[in] env       pointer to the thread context
1114  * \param[in] com       pointer to the lfsck component
1115  * \param[in] parent    pointer to the directory under which the name entry
1116  *                      will be inserted into
1117  * \param[in] child     pointer to the object referenced by the name entry
1118  *                      that to be inserted into the parent
1119  * \param[in] name      the name for the child in the parent directory
1120  *
1121  * \retval              positive number for repaired cases
1122  * \retval              0 if nothing to be repaired
1123  * \retval              negative error number on failure
1124  */
1125 static int lfsck_namespace_insert_normal(const struct lu_env *env,
1126                                          struct lfsck_component *com,
1127                                          struct dt_object *parent,
1128                                          struct dt_object *child,
1129                                          const char *name)
1130 {
1131         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1132         struct lu_attr                  *la     = &info->lti_la;
1133         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1134         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1135         struct dt_device                *dev    = lfsck->li_next;
1136         struct thandle                  *th     = NULL;
1137         struct lustre_handle             lh     = { 0 };
1138         int                              rc     = 0;
1139         ENTRY;
1140
1141         if (unlikely(!dt_try_as_dir(env, parent)))
1142                 GOTO(log, rc = -ENOTDIR);
1143
1144         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1145                 GOTO(log, rc = 1);
1146
1147         /* Hold update lock on the parent to prevent others to access. */
1148         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1149                               MDS_INODELOCK_UPDATE, LCK_EX);
1150         if (rc != 0)
1151                 GOTO(log, rc);
1152
1153         th = dt_trans_create(env, dev);
1154         if (IS_ERR(th))
1155                 GOTO(unlock, rc = PTR_ERR(th));
1156
1157         rec->rec_type = lfsck_object_type(child) & S_IFMT;
1158         rec->rec_fid = lfsck_dto2fid(child);
1159         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1160                                (const struct dt_key *)name, th);
1161         if (rc != 0)
1162                 GOTO(stop, rc);
1163
1164         if (S_ISDIR(rec->rec_type)) {
1165                 rc = dt_declare_ref_add(env, parent, th);
1166                 if (rc != 0)
1167                         GOTO(stop, rc);
1168         }
1169
1170         memset(la, 0, sizeof(*la));
1171         la->la_ctime = cfs_time_current_sec();
1172         la->la_valid = LA_CTIME;
1173         rc = dt_declare_attr_set(env, parent, la, th);
1174         if (rc != 0)
1175                 GOTO(stop, rc);
1176
1177         rc = dt_declare_attr_set(env, child, la, th);
1178         if (rc != 0)
1179                 GOTO(stop, rc);
1180
1181         rc = dt_trans_start_local(env, dev, th);
1182         if (rc != 0)
1183                 GOTO(stop, rc);
1184
1185         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1186                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1187         if (rc != 0)
1188                 GOTO(stop, rc);
1189
1190         if (S_ISDIR(rec->rec_type)) {
1191                 dt_write_lock(env, parent, 0);
1192                 rc = dt_ref_add(env, parent, th);
1193                 dt_write_unlock(env, parent);
1194                 if (rc != 0)
1195                         GOTO(stop, rc);
1196         }
1197
1198         la->la_ctime = cfs_time_current_sec();
1199         rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA);
1200         if (rc != 0)
1201                 GOTO(stop, rc);
1202
1203         rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
1204
1205         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1206
1207 stop:
1208         dt_trans_stop(env, dev, th);
1209
1210 unlock:
1211         lfsck_ibits_unlock(&lh, LCK_EX);
1212
1213 log:
1214         CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
1215                "the name %s and type %o to the parent "DFID": rc = %d\n",
1216                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name,
1217                lfsck_object_type(child) & S_IFMT,
1218                PFID(lfsck_dto2fid(parent)), rc);
1219
1220         if (rc != 0) {
1221                 struct lfsck_namespace *ns = com->lc_file_ram;
1222
1223                 ns->ln_flags |= LF_INCONSISTENT;
1224                 if (rc > 0)
1225                         ns->ln_lost_dirent_repaired++;
1226         }
1227
1228         return rc;
1229 }
1230
1231 /**
1232  * Create the specified orphan directory.
1233  *
1234  * For the case that the parent MDT-object stored in some MDT-object's
1235  * linkEA entry is lost, the LFSCK will re-create the parent object as
1236  * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1237  * with the name ${FID}-P-${conflict_version}.
1238  *
1239  * \param[in] env       pointer to the thread context
1240  * \param[in] com       pointer to the lfsck component
1241  * \param[in] orphan    pointer to the orphan MDT-object to be created
1242  * \param[in] lmv       pointer to master LMV EA that will be set to the orphan
1243  *
1244  * \retval              positive number for repaired cases
1245  * \retval              negative error number on failure
1246  */
1247 static int lfsck_namespace_create_orphan_dir(const struct lu_env *env,
1248                                              struct lfsck_component *com,
1249                                              struct dt_object *orphan,
1250                                              struct lmv_mds_md_v1 *lmv)
1251 {
1252         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1253         struct lu_attr                  *la     = &info->lti_la;
1254         struct dt_allocation_hint       *hint   = &info->lti_hint;
1255         struct dt_object_format         *dof    = &info->lti_dof;
1256         struct lu_name                  *cname  = &info->lti_name2;
1257         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1258         struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
1259         const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
1260         struct lu_fid                    tfid;
1261         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1262         struct lfsck_namespace          *ns     = com->lc_file_ram;
1263         struct dt_device                *dev;
1264         struct dt_object                *parent = NULL;
1265         struct dt_object                *child  = NULL;
1266         struct thandle                  *th     = NULL;
1267         struct lustre_handle             lh     = { 0 };
1268         struct linkea_data               ldata  = { NULL };
1269         struct lu_buf                    linkea_buf;
1270         struct lu_buf                    lmv_buf;
1271         char                             name[32];
1272         int                              namelen;
1273         int                              idx    = 0;
1274         int                              rc     = 0;
1275         ENTRY;
1276
1277         LASSERT(!dt_object_exists(orphan));
1278
1279         cname->ln_name = NULL;
1280         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1281                 GOTO(log, rc = 1);
1282
1283         if (dt_object_remote(orphan)) {
1284                 LASSERT(lfsck->li_lpf_root_obj != NULL);
1285
1286                 idx = lfsck_find_mdt_idx_by_fid(env, lfsck, cfid);
1287                 if (idx < 0)
1288                         GOTO(log, rc = idx);
1289
1290                 snprintf(name, 8, "MDT%04x", idx);
1291                 rc = dt_lookup(env, lfsck->li_lpf_root_obj,
1292                                (struct dt_rec *)&tfid,
1293                                (const struct dt_key *)name, BYPASS_CAPA);
1294                 if (rc != 0)
1295                         GOTO(log, rc = (rc == -ENOENT ? -ENXIO : rc));
1296
1297                 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
1298                 if (IS_ERR(parent))
1299                         GOTO(log, rc = PTR_ERR(parent));
1300
1301                 if (unlikely(!dt_try_as_dir(env, parent)))
1302                         GOTO(log, rc = -ENOTDIR);
1303         } else {
1304                 if (unlikely(lfsck->li_lpf_obj == NULL))
1305                         GOTO(log, rc = -ENXIO);
1306
1307                 parent = lfsck->li_lpf_obj;
1308         }
1309
1310         dev = lfsck_find_dev_by_fid(env, lfsck, cfid);
1311         if (IS_ERR(dev))
1312                 GOTO(log, rc = PTR_ERR(dev));
1313
1314         child = lfsck_object_find_by_dev(env, dev, cfid);
1315         if (IS_ERR(child))
1316                 GOTO(log, rc = PTR_ERR(child));
1317
1318         /* Hold update lock on the parent to prevent others to access. */
1319         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1320                               MDS_INODELOCK_UPDATE, LCK_EX);
1321         if (rc != 0)
1322                 GOTO(log, rc);
1323
1324         idx = 0;
1325         do {
1326                 namelen = snprintf(name, 31, DFID"-P-%d",
1327                                    PFID(cfid), idx++);
1328                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1329                                (const struct dt_key *)name, BYPASS_CAPA);
1330                 if (rc != 0 && rc != -ENOENT)
1331                         GOTO(unlock1, rc);
1332         } while (rc == 0);
1333
1334         cname->ln_name = name;
1335         cname->ln_namelen = namelen;
1336
1337         memset(la, 0, sizeof(*la));
1338         la->la_mode = S_IFDIR | 0700;
1339         la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
1340                        LA_ATIME | LA_MTIME | LA_CTIME;
1341
1342         child->do_ops->do_ah_init(env, hint, parent, child,
1343                                   la->la_mode & S_IFMT);
1344
1345         memset(dof, 0, sizeof(*dof));
1346         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1347
1348         rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
1349         if (rc != 0)
1350                 GOTO(unlock1, rc);
1351
1352         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
1353         if (rc != 0)
1354                 GOTO(unlock1, rc);
1355
1356         th = dt_trans_create(env, dev);
1357         if (IS_ERR(th))
1358                 GOTO(unlock1, rc = PTR_ERR(th));
1359
1360         /* Sync the remote transaction to guarantee that the subsequent
1361          * lock against the @orphan can find the @orphan in time. */
1362         if (dt_object_remote(orphan))
1363                 th->th_sync = 1;
1364
1365         rc = dt_declare_create(env, child, la, hint, dof, th);
1366         if (rc != 0)
1367                 GOTO(stop, rc);
1368
1369         if (unlikely(!dt_try_as_dir(env, child)))
1370                 GOTO(stop, rc = -ENOTDIR);
1371
1372         rec->rec_type = S_IFDIR;
1373         rec->rec_fid = cfid;
1374         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
1375                                (const struct dt_key *)dot, th);
1376         if (rc != 0)
1377                 GOTO(stop, rc);
1378
1379         rec->rec_fid = lfsck_dto2fid(parent);
1380         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
1381                                (const struct dt_key *)dotdot, th);
1382         if (rc == 0)
1383                 rc = dt_declare_ref_add(env, child, th);
1384
1385         if (rc != 0)
1386                 GOTO(stop, rc);
1387
1388         rc = dt_declare_ref_add(env, child, th);
1389         if (rc != 0)
1390                 GOTO(stop, rc);
1391
1392         if (lmv != NULL) {
1393                 lmv->lmv_magic = LMV_MAGIC;
1394                 lmv->lmv_master_mdt_index = lfsck_dev_idx(dev);
1395                 lfsck_lmv_header_cpu_to_le(lmv2, lmv);
1396                 lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
1397                 rc = dt_declare_xattr_set(env, child, &lmv_buf,
1398                                           XATTR_NAME_LMV, 0, th);
1399                 if (rc != 0)
1400                         GOTO(stop, rc);
1401         }
1402
1403         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1404                        ldata.ld_leh->leh_len);
1405         rc = dt_declare_xattr_set(env, child, &linkea_buf,
1406                                   XATTR_NAME_LINK, 0, th);
1407         if (rc != 0)
1408                 GOTO(stop, rc);
1409
1410         rec->rec_fid = cfid;
1411         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1412                                (const struct dt_key *)name, th);
1413         if (rc == 0)
1414                 rc = dt_declare_ref_add(env, parent, th);
1415
1416         if (rc != 0)
1417                 GOTO(stop, rc);
1418
1419         rc = dt_trans_start_local(env, dev, th);
1420         if (rc != 0)
1421                 GOTO(stop, rc);
1422
1423         dt_write_lock(env, child, 0);
1424         rc = dt_create(env, child, la, hint, dof, th);
1425         if (rc != 0)
1426                 GOTO(unlock2, rc);
1427
1428         rec->rec_fid = cfid;
1429         rc = dt_insert(env, child, (const struct dt_rec *)rec,
1430                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
1431         if (rc != 0)
1432                 GOTO(unlock2, rc);
1433
1434         rec->rec_fid = lfsck_dto2fid(parent);
1435         rc = dt_insert(env, child, (const struct dt_rec *)rec,
1436                        (const struct dt_key *)dotdot, th,
1437                        BYPASS_CAPA, 1);
1438         if (rc != 0)
1439                 GOTO(unlock2, rc);
1440
1441         rc = dt_ref_add(env, child, th);
1442         if (rc != 0)
1443                 GOTO(unlock2, rc);
1444
1445         if (lmv != NULL) {
1446                 rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV, 0,
1447                                   th, BYPASS_CAPA);
1448                 if (rc != 0)
1449                         GOTO(unlock2, rc);
1450         }
1451
1452         rc = dt_xattr_set(env, child, &linkea_buf,
1453                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1454         dt_write_unlock(env, child);
1455         if (rc != 0)
1456                 GOTO(stop, rc);
1457
1458         rec->rec_fid = cfid;
1459         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1460                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1461         if (rc == 0) {
1462                 dt_write_lock(env, parent, 0);
1463                 rc = dt_ref_add(env, parent, th);
1464                 dt_write_unlock(env, parent);
1465         }
1466
1467         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1468
1469 unlock2:
1470         dt_write_unlock(env, child);
1471
1472 stop:
1473         dt_trans_stop(env, dev, th);
1474
1475 unlock1:
1476         lfsck_ibits_unlock(&lh, LCK_EX);
1477
1478 log:
1479         CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for "
1480                "the object "DFID", name = %s: rc = %d\n",
1481                lfsck_lfsck2name(lfsck), PFID(cfid),
1482                cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1483
1484         if (child != NULL && !IS_ERR(child))
1485                 lfsck_object_put(env, child);
1486
1487         if (parent != NULL && !IS_ERR(parent) && parent != lfsck->li_lpf_obj)
1488                 lfsck_object_put(env, parent);
1489
1490         if (rc != 0)
1491                 ns->ln_flags |= LF_INCONSISTENT;
1492
1493         return rc;
1494 }
1495
1496 /**
1497  * Remove the specified entry from the linkEA.
1498  *
1499  * Locate the linkEA entry with the given @cname and @pfid, then
1500  * remove this entry or the other entries those are repeated with
1501  * this entry.
1502  *
1503  * \param[in] env       pointer to the thread context
1504  * \param[in] com       pointer to the lfsck component
1505  * \param[in] obj       pointer to the dt_object to be handled
1506  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1507  * \param[in] cname     the name for the child in the parent directory
1508  * \param[in] pfid      the parent directory's FID for the linkEA
1509  * \param[in] next      if true, then remove the first found linkEA
1510  *                      entry, and move the ldata->ld_lee to next entry
1511  *
1512  * \retval              positive number for repaired cases
1513  * \retval              0 if nothing to be repaired
1514  * \retval              negative error number on failure
1515  */
1516 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
1517                                          struct lfsck_component *com,
1518                                          struct dt_object *obj,
1519                                          struct linkea_data *ldata,
1520                                          struct lu_name *cname,
1521                                          struct lu_fid *pfid,
1522                                          bool next)
1523 {
1524         struct lfsck_instance           *lfsck     = com->lc_lfsck;
1525         struct dt_device                *dev       = lfsck->li_bottom;
1526         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
1527         struct thandle                  *th        = NULL;
1528         struct lustre_handle             lh        = { 0 };
1529         struct linkea_data               ldata_new = { NULL };
1530         struct lu_buf                    linkea_buf;
1531         int                              rc        = 0;
1532         ENTRY;
1533
1534         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1535                               MDS_INODELOCK_UPDATE |
1536                               MDS_INODELOCK_XATTR, LCK_EX);
1537         if (rc != 0)
1538                 GOTO(log, rc);
1539
1540         if (next)
1541                 linkea_del_buf(ldata, cname);
1542         else
1543                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
1544                                                     true);
1545         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1546                        ldata->ld_leh->leh_len);
1547
1548 again:
1549         th = dt_trans_create(env, dev);
1550         if (IS_ERR(th))
1551                 GOTO(unlock1, rc = PTR_ERR(th));
1552
1553         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1554                                   XATTR_NAME_LINK, 0, th);
1555         if (rc != 0)
1556                 GOTO(stop, rc);
1557
1558         rc = dt_trans_start_local(env, dev, th);
1559         if (rc != 0)
1560                 GOTO(stop, rc);
1561
1562         dt_write_lock(env, obj, 0);
1563         if (unlikely(lfsck_is_dead_obj(obj)))
1564                 GOTO(unlock2, rc = -ENOENT);
1565
1566         rc = lfsck_links_read2(env, obj, &ldata_new);
1567         if (rc != 0)
1568                 GOTO(unlock2, rc);
1569
1570         /* The specified linkEA entry has been removed by race. */
1571         rc = linkea_links_find(&ldata_new, cname, pfid);
1572         if (rc != 0)
1573                 GOTO(unlock2, rc = 0);
1574
1575         if (bk->lb_param & LPF_DRYRUN)
1576                 GOTO(unlock2, rc = 1);
1577
1578         if (next)
1579                 linkea_del_buf(&ldata_new, cname);
1580         else
1581                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
1582                                                     true);
1583
1584         if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
1585                 dt_write_unlock(env, obj);
1586                 dt_trans_stop(env, dev, th);
1587                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1588                                ldata_new.ld_leh->leh_len);
1589                 goto again;
1590         }
1591
1592         lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1593                        ldata_new.ld_leh->leh_len);
1594         rc = dt_xattr_set(env, obj, &linkea_buf,
1595                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1596
1597         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1598
1599 unlock2:
1600         dt_write_unlock(env, obj);
1601
1602 stop:
1603         dt_trans_stop(env, dev, th);
1604
1605 unlock1:
1606         lfsck_ibits_unlock(&lh, LCK_EX);
1607
1608 log:
1609         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
1610                "for the object: "DFID", parent "DFID", name %.*s\n",
1611                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
1612                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
1613                cname->ln_name);
1614
1615         if (rc != 0) {
1616                 struct lfsck_namespace *ns = com->lc_file_ram;
1617
1618                 ns->ln_flags |= LF_INCONSISTENT;
1619         }
1620
1621         return rc;
1622 }
1623
1624 /**
1625  * Conditionally remove the specified entry from the linkEA.
1626  *
1627  * Take the parent lock firstly, then check whether the specified
1628  * name entry exists or not: if yes, do nothing; otherwise, call
1629  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
1630  *
1631  * \param[in] env       pointer to the thread context
1632  * \param[in] com       pointer to the lfsck component
1633  * \param[in] parent    pointer to the parent directory
1634  * \param[in] child     pointer to the child object that holds the linkEA
1635  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
1636  * \param[in] cname     the name for the child in the parent directory
1637  * \param[in] pfid      the parent directory's FID for the linkEA
1638  *
1639  * \retval              positive number for repaired cases
1640  * \retval              0 if nothing to be repaired
1641  * \retval              negative error number on failure
1642  */
1643 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
1644                                               struct lfsck_component *com,
1645                                               struct dt_object *parent,
1646                                               struct dt_object *child,
1647                                               struct linkea_data *ldata,
1648                                               struct lu_name *cname,
1649                                               struct lu_fid *pfid)
1650 {
1651         struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
1652         struct lustre_handle     lh     = { 0 };
1653         int                      rc;
1654         ENTRY;
1655
1656         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
1657                               MDS_INODELOCK_UPDATE, LCK_EX);
1658         if (rc != 0)
1659                 RETURN(rc);
1660
1661         dt_read_lock(env, parent, 0);
1662         if (unlikely(lfsck_is_dead_obj(parent))) {
1663                 dt_read_unlock(env, parent);
1664                 lfsck_ibits_unlock(&lh, LCK_EX);
1665                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1666                                                    cname, pfid, true);
1667
1668                 RETURN(rc);
1669         }
1670
1671         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1672                        (const struct dt_key *)cname->ln_name,
1673                        BYPASS_CAPA);
1674         dt_read_unlock(env, parent);
1675
1676         /* It is safe to release the ldlm lock, because when the logic come
1677          * here, we have got all the needed information above whether the
1678          * linkEA entry is valid or not. It is not important that others
1679          * may add new linkEA entry after the ldlm lock released. If other
1680          * has removed the specified linkEA entry by race, then it is OK,
1681          * because the subsequent lfsck_namespace_shrink_linkea() can handle
1682          * such case. */
1683         lfsck_ibits_unlock(&lh, LCK_EX);
1684         if (rc == -ENOENT) {
1685                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1686                                                    cname, pfid, true);
1687
1688                 RETURN(rc);
1689         }
1690
1691         if (rc != 0)
1692                 RETURN(rc);
1693
1694         /* The LFSCK just found some internal status of cross-MDTs
1695          * create operation. That is normal. */
1696         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1697                 linkea_next_entry(ldata);
1698
1699                 RETURN(0);
1700         }
1701
1702         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1703                                            pfid, true);
1704
1705         RETURN(rc);
1706 }
1707
1708 /**
1709  * Conditionally replace name entry in the parent.
1710  *
1711  * As required, the LFSCK may re-create the lost MDT-object for dangling
1712  * name entry, but such repairing may be wrong because of bad FID in the
1713  * name entry. As the LFSCK processing, the real MDT-object may be found,
1714  * then the LFSCK should check whether the former re-created MDT-object
1715  * has been modified or not, if not, then destroy it and update the name
1716  * entry in the parent to reference the real MDT-object.
1717  *
1718  * \param[in] env       pointer to the thread context
1719  * \param[in] com       pointer to the lfsck component
1720  * \param[in] parent    pointer to the parent directory
1721  * \param[in] child     pointer to the MDT-object that may be the real
1722  *                      MDT-object corresponding to the name entry in parent
1723  * \param[in] cfid      the current FID in the name entry
1724  * \param[in] cname     contains the name of the child in the parent directory
1725  *
1726  * \retval              positive number for repaired cases
1727  * \retval              0 if nothing to be repaired
1728  * \retval              negative error number on failure
1729  */
1730 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1731                                         struct lfsck_component *com,
1732                                         struct dt_object *parent,
1733                                         struct dt_object *child,
1734                                         const struct lu_fid *cfid,
1735                                         const struct lu_name *cname)
1736 {
1737         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1738         struct lu_attr                  *la     = &info->lti_la;
1739         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
1740         struct lu_fid                    tfid;
1741         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1742         struct dt_device                *dev    = lfsck->li_next;
1743         const char                      *name   = cname->ln_name;
1744         struct dt_object                *obj    = NULL;
1745         struct lustre_handle             plh    = { 0 };
1746         struct lustre_handle             clh    = { 0 };
1747         struct linkea_data               ldata  = { NULL };
1748         struct thandle                  *th     = NULL;
1749         bool                             exist  = true;
1750         int                              rc     = 0;
1751         ENTRY;
1752
1753         rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
1754                               MDS_INODELOCK_UPDATE, LCK_EX);
1755         if (rc != 0)
1756                 GOTO(log, rc);
1757
1758         if (!fid_is_sane(cfid)) {
1759                 exist = false;
1760                 goto replace;
1761         }
1762
1763         obj = lfsck_object_find(env, lfsck, cfid);
1764         if (IS_ERR(obj)) {
1765                 rc = PTR_ERR(obj);
1766                 if (rc == -ENOENT) {
1767                         exist = false;
1768                         goto replace;
1769                 }
1770
1771                 GOTO(log, rc);
1772         }
1773
1774         if (!dt_object_exists(obj)) {
1775                 exist = false;
1776                 goto replace;
1777         }
1778
1779         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1780                        (const struct dt_key *)name, BYPASS_CAPA);
1781         if (rc == -ENOENT) {
1782                 exist = false;
1783                 goto replace;
1784         }
1785
1786         if (rc != 0)
1787                 GOTO(log, rc);
1788
1789         /* Someone changed the name entry, cannot replace it. */
1790         if (!lu_fid_eq(cfid, &tfid))
1791                 GOTO(log, rc = 0);
1792
1793         /* lock the object to be destroyed. */
1794         rc = lfsck_ibits_lock(env, lfsck, obj, &clh,
1795                               MDS_INODELOCK_UPDATE |
1796                               MDS_INODELOCK_XATTR, LCK_EX);
1797         if (rc != 0)
1798                 GOTO(log, rc);
1799
1800         if (unlikely(lfsck_is_dead_obj(obj))) {
1801                 exist = false;
1802                 goto replace;
1803         }
1804
1805         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1806         if (rc != 0)
1807                 GOTO(log, rc);
1808
1809         /* The object has been modified by other(s), or it is not created by
1810          * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1811         if (la->la_ctime != 0)
1812                 GOTO(log, rc);
1813
1814         if (S_ISREG(la->la_mode)) {
1815                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
1816                                   BYPASS_CAPA);
1817                 /* If someone has created related OST-object(s),
1818                  * then keep it. */
1819                 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1820                         GOTO(log, rc = (rc > 0 ? 0 : rc));
1821         }
1822
1823 replace:
1824         dt_read_lock(env, child, 0);
1825         rc = lfsck_links_read2(env, child, &ldata);
1826         dt_read_unlock(env, child);
1827
1828         /* Someone changed the child, no need to replace. */
1829         if (rc == -ENODATA)
1830                 GOTO(log, rc = 0);
1831
1832         if (rc != 0)
1833                 GOTO(log, rc);
1834
1835         rc = linkea_links_find(&ldata, cname, lfsck_dto2fid(parent));
1836         /* Someone moved the child, no need to replace. */
1837         if (rc != 0)
1838                 GOTO(log, rc = 0);
1839
1840         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1841                 GOTO(log, rc = 1);
1842
1843         th = dt_trans_create(env, dev);
1844         if (IS_ERR(th))
1845                 GOTO(log, rc = PTR_ERR(th));
1846
1847         if (exist) {
1848                 rc = dt_declare_destroy(env, obj, th);
1849                 if (rc != 0)
1850                         GOTO(stop, rc);
1851         }
1852
1853         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1854         if (rc != 0)
1855                 GOTO(stop, rc);
1856
1857         rec->rec_type = S_IFDIR;
1858         rec->rec_fid = lfsck_dto2fid(child);
1859         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1860                                (const struct dt_key *)name, th);
1861         if (rc != 0)
1862                 GOTO(stop, rc);
1863
1864         rc = dt_trans_start(env, dev, th);
1865         if (rc != 0)
1866                 GOTO(stop, rc);
1867
1868         if (exist) {
1869                 rc = dt_destroy(env, obj, th);
1870                 if (rc != 0)
1871                         GOTO(stop, rc);
1872         }
1873
1874         /* The old name entry maybe not exist. */
1875         dt_delete(env, parent, (const struct dt_key *)name, th,
1876                   BYPASS_CAPA);
1877
1878         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1879                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1880
1881         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1882
1883 stop:
1884         dt_trans_stop(env, dev, th);
1885
1886 log:
1887         lfsck_ibits_unlock(&clh, LCK_EX);
1888         lfsck_ibits_unlock(&plh, LCK_EX);
1889         if (obj != NULL && !IS_ERR(obj))
1890                 lfsck_object_put(env, obj);
1891
1892         CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1893                "object "DFID" because of conflict with the object "DFID
1894                " under the parent "DFID" with name %s: rc = %d\n",
1895                lfsck_lfsck2name(lfsck), PFID(cfid),
1896                PFID(lfsck_dto2fid(child)), PFID(lfsck_dto2fid(parent)),
1897                name, rc);
1898
1899         return rc;
1900 }
1901
1902 /**
1903  * Overwrite the linkEA for the object with the given ldata.
1904  *
1905  * The caller should take the ldlm lock before the calling.
1906  *
1907  * \param[in] env       pointer to the thread context
1908  * \param[in] com       pointer to the lfsck component
1909  * \param[in] obj       pointer to the dt_object to be handled
1910  * \param[in] ldata     pointer to the new linkEA data
1911  *
1912  * \retval              positive number for repaired cases
1913  * \retval              0 if nothing to be repaired
1914  * \retval              negative error number on failure
1915  */
1916 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1917                                    struct lfsck_component *com,
1918                                    struct dt_object *obj,
1919                                    struct linkea_data *ldata)
1920 {
1921         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1922         struct dt_device                *dev    = lfsck->li_bottom;
1923         struct thandle                  *th     = NULL;
1924         struct lu_buf                    linkea_buf;
1925         int                              rc     = 0;
1926         ENTRY;
1927
1928         th = dt_trans_create(env, dev);
1929         if (IS_ERR(th))
1930                 GOTO(log, rc = PTR_ERR(th));
1931
1932         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1933                        ldata->ld_leh->leh_len);
1934         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1935                                   XATTR_NAME_LINK, 0, th);
1936         if (rc != 0)
1937                 GOTO(stop, rc);
1938
1939         rc = dt_trans_start_local(env, dev, th);
1940         if (rc != 0)
1941                 GOTO(stop, rc);
1942
1943         dt_write_lock(env, obj, 0);
1944         if (unlikely(lfsck_is_dead_obj(obj)))
1945                 GOTO(unlock, rc = 0);
1946
1947         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1948                 GOTO(unlock, rc = 1);
1949
1950         rc = dt_xattr_set(env, obj, &linkea_buf,
1951                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1952
1953         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1954
1955 unlock:
1956         dt_write_unlock(env, obj);
1957
1958 stop:
1959         dt_trans_stop(env, dev, th);
1960
1961 log:
1962         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
1963                "object "DFID": rc = %d\n",
1964                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
1965
1966         if (rc != 0) {
1967                 struct lfsck_namespace *ns = com->lc_file_ram;
1968
1969                 ns->ln_flags |= LF_INCONSISTENT;
1970         }
1971
1972         return rc;
1973 }
1974
1975 /**
1976  * Repair invalid name entry.
1977  *
1978  * If the name entry contains invalid information, such as bad file type
1979  * or (and) corrupted object FID, then either remove the name entry or
1980  * udpate the name entry with the given (right) information.
1981  *
1982  * \param[in] env       pointer to the thread context
1983  * \param[in] com       pointer to the lfsck component
1984  * \param[in] parent    pointer to the parent directory
1985  * \param[in] child     pointer to the object referenced by the name entry
1986  * \param[in] name      the old name of the child under the parent directory
1987  * \param[in] name2     the new name of the child under the parent directory
1988  * \param[in] type      the type claimed by the name entry
1989  * \param[in] update    update the name entry if true; otherwise, remove it
1990  * \param[in] dec       decrease the parent nlink count if true
1991  *
1992  * \retval              positive number for repaired successfully
1993  * \retval              0 if nothing to be repaired
1994  * \retval              negative error number on failure
1995  */
1996 int lfsck_namespace_repair_dirent(const struct lu_env *env,
1997                                   struct lfsck_component *com,
1998                                   struct dt_object *parent,
1999                                   struct dt_object *child,
2000                                   const char *name, const char *name2,
2001                                   __u16 type, bool update, bool dec)
2002 {
2003         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
2004         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
2005         struct lu_fid            tfid;
2006         struct lfsck_instance   *lfsck  = com->lc_lfsck;
2007         struct dt_device        *dev    = lfsck->li_next;
2008         struct thandle          *th     = NULL;
2009         struct lustre_handle     lh     = { 0 };
2010         int                      rc     = 0;
2011         ENTRY;
2012
2013         if (unlikely(!dt_try_as_dir(env, parent)))
2014                 GOTO(log, rc = -ENOTDIR);
2015
2016         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2017                               MDS_INODELOCK_UPDATE, LCK_EX);
2018         if (rc != 0)
2019                 GOTO(log, rc);
2020
2021         th = dt_trans_create(env, dev);
2022         if (IS_ERR(th))
2023                 GOTO(unlock1, rc = PTR_ERR(th));
2024
2025         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2026         if (rc != 0)
2027                 GOTO(stop, rc);
2028
2029         if (update) {
2030                 rec->rec_type = lfsck_object_type(child) & S_IFMT;
2031                 rec->rec_fid = cfid;
2032                 rc = dt_declare_insert(env, parent,
2033                                        (const struct dt_rec *)rec,
2034                                        (const struct dt_key *)name2, th);
2035                 if (rc != 0)
2036                         GOTO(stop, rc);
2037         }
2038
2039         if (dec) {
2040                 rc = dt_declare_ref_del(env, parent, th);
2041                 if (rc != 0)
2042                         GOTO(stop, rc);
2043         }
2044
2045         rc = dt_trans_start(env, dev, th);
2046         if (rc != 0)
2047                 GOTO(stop, rc);
2048
2049         dt_write_lock(env, parent, 0);
2050         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2051                        (const struct dt_key *)name, BYPASS_CAPA);
2052         /* Someone has removed the bad name entry by race. */
2053         if (rc == -ENOENT)
2054                 GOTO(unlock2, rc = 0);
2055
2056         if (rc != 0)
2057                 GOTO(unlock2, rc);
2058
2059         /* Someone has removed the bad name entry and reused it for other
2060          * object by race. */
2061         if (!lu_fid_eq(&tfid, cfid))
2062                 GOTO(unlock2, rc = 0);
2063
2064         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2065                 GOTO(unlock2, rc = 1);
2066
2067         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2068                        BYPASS_CAPA);
2069         if (rc != 0)
2070                 GOTO(unlock2, rc);
2071
2072         if (update) {
2073                 rc = dt_insert(env, parent,
2074                                (const struct dt_rec *)rec,
2075                                (const struct dt_key *)name2, th,
2076                                BYPASS_CAPA, 1);
2077                 if (rc != 0)
2078                         GOTO(unlock2, rc);
2079         }
2080
2081         if (dec) {
2082                 rc = dt_ref_del(env, parent, th);
2083                 if (rc != 0)
2084                         GOTO(unlock2, rc);
2085         }
2086
2087         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2088
2089 unlock2:
2090         dt_write_unlock(env, parent);
2091
2092 stop:
2093         dt_trans_stop(env, dev, th);
2094
2095         /* We are not sure whether the child will become orphan or not.
2096          * Record it in the LFSCK trace file for further checking in
2097          * the second-stage scanning. */
2098         if (!update && !dec && rc == 0)
2099                 lfsck_namespace_trace_update(env, com, cfid,
2100                                              LNTF_CHECK_LINKEA, true);
2101
2102 unlock1:
2103         lfsck_ibits_unlock(&lh, LCK_EX);
2104
2105 log:
2106         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
2107                "entry for: parent "DFID", child "DFID", name %s, type "
2108                "in name entry %o, type claimed by child %o. repair it "
2109                "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck),
2110                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2111                name, type, update ? lfsck_object_type(child) : 0,
2112                update ? "updating" : "removing", name2, rc);
2113
2114         if (rc != 0) {
2115                 struct lfsck_namespace *ns = com->lc_file_ram;
2116
2117                 ns->ln_flags |= LF_INCONSISTENT;
2118         }
2119
2120         return rc;
2121 }
2122
2123 /**
2124  * Update the ".." name entry for the given object.
2125  *
2126  * The object's ".." is corrupted, this function will update the ".." name
2127  * entry with the given pfid, and the linkEA with the given ldata.
2128  *
2129  * The caller should take the ldlm lock before the calling.
2130  *
2131  * \param[in] env       pointer to the thread context
2132  * \param[in] com       pointer to the lfsck component
2133  * \param[in] obj       pointer to the dt_object to be handled
2134  * \param[in] pfid      the new fid for the object's ".." name entry
2135  * \param[in] cname     the name for the @obj in the parent directory
2136  *
2137  * \retval              positive number for repaired cases
2138  * \retval              0 if nothing to be repaired
2139  * \retval              negative error number on failure
2140  */
2141 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
2142                                                   struct lfsck_component *com,
2143                                                   struct dt_object *obj,
2144                                                   const struct lu_fid *pfid,
2145                                                   struct lu_name *cname)
2146 {
2147         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2148         struct dt_insert_rec            *rec    = &info->lti_dt_rec;
2149         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2150         struct dt_device                *dev    = lfsck->li_bottom;
2151         struct thandle                  *th     = NULL;
2152         struct linkea_data               ldata  = { NULL };
2153         struct lu_buf                    linkea_buf;
2154         int                              rc     = 0;
2155         ENTRY;
2156
2157         LASSERT(!dt_object_remote(obj));
2158         LASSERT(S_ISDIR(lfsck_object_type(obj)));
2159
2160         rc = linkea_data_new(&ldata, &info->lti_big_buf);
2161         if (rc != 0)
2162                 GOTO(log, rc);
2163
2164         rc = linkea_add_buf(&ldata, cname, pfid);
2165         if (rc != 0)
2166                 GOTO(log, rc);
2167
2168         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2169                        ldata.ld_leh->leh_len);
2170
2171         th = dt_trans_create(env, dev);
2172         if (IS_ERR(th))
2173                 GOTO(log, rc = PTR_ERR(th));
2174
2175         rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
2176         if (rc != 0)
2177                 GOTO(stop, rc);
2178
2179         rec->rec_type = S_IFDIR;
2180         rec->rec_fid = pfid;
2181         rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
2182                                (const struct dt_key *)dotdot, th);
2183         if (rc != 0)
2184                 GOTO(stop, rc);
2185
2186         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
2187                                   XATTR_NAME_LINK, 0, th);
2188         if (rc != 0)
2189                 GOTO(stop, rc);
2190
2191         rc = dt_trans_start_local(env, dev, th);
2192         if (rc != 0)
2193                 GOTO(stop, rc);
2194
2195         dt_write_lock(env, obj, 0);
2196         if (unlikely(lfsck_is_dead_obj(obj)))
2197                 GOTO(unlock, rc = 0);
2198
2199         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2200                 GOTO(unlock, rc = 1);
2201
2202         /* The old ".." name entry maybe not exist. */
2203         dt_delete(env, obj, (const struct dt_key *)dotdot, th,
2204                   BYPASS_CAPA);
2205
2206         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
2207                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
2208         if (rc != 0)
2209                 GOTO(unlock, rc);
2210
2211         rc = dt_xattr_set(env, obj, &linkea_buf,
2212                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2213
2214         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2215
2216 unlock:
2217         dt_write_unlock(env, obj);
2218
2219 stop:
2220         dt_trans_stop(env, dev, th);
2221
2222 log:
2223         CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
2224                "the object "DFID", new parent "DFID": rc = %d\n",
2225                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
2226                PFID(pfid), rc);
2227
2228         if (rc != 0) {
2229                 struct lfsck_namespace *ns = com->lc_file_ram;
2230
2231                 ns->ln_flags |= LF_INCONSISTENT;
2232         }
2233
2234         return rc;
2235 }
2236
2237 /**
2238  * Handle orphan @obj during Double Scan Directory.
2239  *
2240  * Remove the @obj's current (invalid) linkEA entries, and insert
2241  * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
2242  * ${FID}-${PFID}-D-${conflict_version}
2243  *
2244  * The caller should take the ldlm lock before the calling.
2245  *
2246  * \param[in] env       pointer to the thread context
2247  * \param[in] com       pointer to the lfsck component
2248  * \param[in] obj       pointer to the orphan object to be handled
2249  * \param[in] pfid      the new fid for the object's ".." name entry
2250  * \param[in,out] lh    ldlm lock handler for the given @obj
2251  * \param[out] type     to tell the caller what the inconsistency is
2252  *
2253  * \retval              positive number for repaired cases
2254  * \retval              0 if nothing to be repaired
2255  * \retval              negative error number on failure
2256  */
2257 static int
2258 lfsck_namespace_dsd_orphan(const struct lu_env *env,
2259                            struct lfsck_component *com,
2260                            struct dt_object *obj,
2261                            const struct lu_fid *pfid,
2262                            struct lustre_handle *lh,
2263                            enum lfsck_namespace_inconsistency_type *type)
2264 {
2265         struct lfsck_thread_info *info = lfsck_env_info(env);
2266         struct lfsck_namespace   *ns   = com->lc_file_ram;
2267         int                       rc;
2268         ENTRY;
2269
2270         /* Remove the unrecognized linkEA. */
2271         rc = lfsck_namespace_links_remove(env, com, obj);
2272         lfsck_ibits_unlock(lh, LCK_EX);
2273         if (rc < 0 && rc != -ENODATA)
2274                 RETURN(rc);
2275
2276         *type = LNIT_MUL_REF;
2277
2278         /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2279          * ever tried to verify some remote MDT-object that resides on this
2280          * MDT, but this MDT failed to respond such request. So means there
2281          * may be some remote name entry on other MDT that references this
2282          * object with another name, so we cannot know whether this linkEA
2283          * is valid or not. So keep it there and maybe resolved when next
2284          * LFSCK run. */
2285         if (ns->ln_flags & LF_INCOMPLETE)
2286                 RETURN(0);
2287
2288         /* The unique linkEA is invalid, even if the ".." name entry may be
2289          * valid, we still cannot know via which name entry this directory
2290          * will be referenced. Then handle it as pure orphan. */
2291         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2292                  "-"DFID, PFID(pfid));
2293         rc = lfsck_namespace_insert_orphan(env, com, obj,
2294                                            info->lti_tmpbuf, "D", NULL);
2295
2296         RETURN(rc);
2297 }
2298
2299 /**
2300  * Double Scan Directory object for single linkEA entry case.
2301  *
2302  * The given @child has unique linkEA entry. If the linkEA entry is valid,
2303  * then check whether the name is in the namespace or not, if not, add the
2304  * missing name entry back to namespace. If the linkEA entry is invalid,
2305  * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
2306  * as an orphan.
2307  *
2308  * \param[in] env       pointer to the thread context
2309  * \param[in] com       pointer to the lfsck component
2310  * \param[in] child     pointer to the directory to be double scanned
2311  * \param[in] pfid      the FID corresponding to the ".." entry
2312  * \param[in] ldata     pointer to the linkEA data for the given @child
2313  * \param[in,out] lh    ldlm lock handler for the given @child
2314  * \param[out] type     to tell the caller what the inconsistency is
2315  * \param[in] retry     if found inconsistency, but the caller does not hold
2316  *                      ldlm lock on the @child, then set @retry as true
2317  *
2318  * \retval              positive number for repaired cases
2319  * \retval              0 if nothing to be repaired
2320  * \retval              negative error number on failure
2321  */
2322 static int
2323 lfsck_namespace_dsd_single(const struct lu_env *env,
2324                            struct lfsck_component *com,
2325                            struct dt_object *child,
2326                            const struct lu_fid *pfid,
2327                            struct linkea_data *ldata,
2328                            struct lustre_handle *lh,
2329                            enum lfsck_namespace_inconsistency_type *type,
2330                            bool *retry)
2331 {
2332         struct lfsck_thread_info *info          = lfsck_env_info(env);
2333         struct lu_name           *cname         = &info->lti_name;
2334         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2335         struct lu_fid             tfid;
2336         struct lfsck_namespace   *ns            = com->lc_file_ram;
2337         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2338         struct dt_object         *parent        = NULL;
2339         struct lmv_mds_md_v1     *lmv;
2340         int                       rc            = 0;
2341         ENTRY;
2342
2343         lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, info->lti_key);
2344         /* The unique linkEA entry with bad parent will be handled as orphan. */
2345         if (!fid_is_sane(&tfid)) {
2346                 if (!lustre_handle_is_used(lh) && retry != NULL)
2347                         *retry = true;
2348                 else
2349                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2350                                                         pfid, lh, type);
2351
2352                 GOTO(out, rc);
2353         }
2354
2355         parent = lfsck_object_find_bottom(env, lfsck, &tfid);
2356         if (IS_ERR(parent))
2357                 GOTO(out, rc = PTR_ERR(parent));
2358
2359         /* We trust the unique linkEA entry in spite of whether it matches the
2360          * ".." name entry or not. Because even if the linkEA entry is wrong
2361          * and the ".." name entry is right, we still cannot know via which
2362          * name entry the child will be referenced, since all known entries
2363          * have been verified during the first-stage scanning. */
2364         if (!dt_object_exists(parent)) {
2365                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2366                  * has ever tried to verify some remote MDT-object that resides
2367                  * on this MDT, but this MDT failed to respond such request. So
2368                  * means there may be some remote name entry on other MDT that
2369                  * references this object with another name, so we cannot know
2370                  * whether this linkEA is valid or not. So keep it there and
2371                  * maybe resolved when next LFSCK run. */
2372                 if (ns->ln_flags & LF_INCOMPLETE)
2373                         GOTO(out, rc = 0);
2374
2375                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2376                         *retry = true;
2377
2378                         GOTO(out, rc = 0);
2379                 }
2380
2381                 lfsck_ibits_unlock(lh, LCK_EX);
2382
2383 lost_parent:
2384                 lmv = &info->lti_lmv;
2385                 rc = lfsck_read_stripe_lmv(env, child, lmv);
2386                 if (rc != 0 && rc != -ENODATA)
2387                         GOTO(out, rc);
2388
2389                 if (rc == -ENODATA || lmv->lmv_magic != LMV_MAGIC_STRIPE) {
2390                         lmv = NULL;
2391                 } else if (lfsck_shard_name_to_index(env,
2392                                         cname->ln_name, cname->ln_namelen,
2393                                         S_IFDIR, cfid) < 0) {
2394                         /* It is an invalid name entry, we
2395                          * cannot trust the parent also. */
2396                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2397                                                 ldata, cname, &tfid, true);
2398                         if (rc < 0)
2399                                 GOTO(out, rc);
2400
2401                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2402                                  "-"DFID, PFID(pfid));
2403                         rc = lfsck_namespace_insert_orphan(env, com, child,
2404                                                 info->lti_tmpbuf, "S", NULL);
2405
2406                         GOTO(out, rc);
2407                 }
2408
2409                 /* Create the lost parent as an orphan. */
2410                 rc = lfsck_namespace_create_orphan_dir(env, com, parent, lmv);
2411                 if (rc >= 0) {
2412                         /* Add the missing name entry to the parent. */
2413                         rc = lfsck_namespace_insert_normal(env, com, parent,
2414                                                         child, cname->ln_name);
2415                         if (unlikely(rc == -EEXIST)) {
2416                                 /* Unfortunately, someone reused the name
2417                                  * under the parent by race. So we have
2418                                  * to remove the linkEA entry from
2419                                  * current child object. It means that the
2420                                  * LFSCK cannot recover the system
2421                                  * totally back to its original status,
2422                                  * but it is necessary to make the
2423                                  * current system to be consistent. */
2424                                 rc = lfsck_namespace_shrink_linkea(env,
2425                                                 com, child, ldata,
2426                                                 cname, &tfid, true);
2427                                 if (rc >= 0) {
2428                                         snprintf(info->lti_tmpbuf,
2429                                                  sizeof(info->lti_tmpbuf),
2430                                                  "-"DFID, PFID(pfid));
2431                                         rc = lfsck_namespace_insert_orphan(env,
2432                                                 com, child, info->lti_tmpbuf,
2433                                                 "D", NULL);
2434                                 }
2435                         }
2436                 }
2437
2438                 GOTO(out, rc);
2439         }
2440
2441         /* The unique linkEA entry with bad parent will be handled as orphan. */
2442         if (unlikely(!dt_try_as_dir(env, parent))) {
2443                 if (!lustre_handle_is_used(lh) && retry != NULL)
2444                         *retry = true;
2445                 else
2446                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2447                                                         pfid, lh, type);
2448
2449                 GOTO(out, rc);
2450         }
2451
2452         rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2453                        (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
2454         if (rc == -ENOENT) {
2455                 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2456                  * has ever tried to verify some remote MDT-object that resides
2457                  * on this MDT, but this MDT failed to respond such request. So
2458                  * means there may be some remote name entry on other MDT that
2459                  * references this object with another name, so we cannot know
2460                  * whether this linkEA is valid or not. So keep it there and
2461                  * maybe resolved when next LFSCK run. */
2462                 if (ns->ln_flags & LF_INCOMPLETE)
2463                         GOTO(out, rc = 0);
2464
2465                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2466                         *retry = true;
2467
2468                         GOTO(out, rc = 0);
2469                 }
2470
2471                 lfsck_ibits_unlock(lh, LCK_EX);
2472                 rc = lfsck_namespace_check_name(env, parent, child, cname);
2473                 if (rc == -ENOENT)
2474                         goto lost_parent;
2475
2476                 if (rc < 0)
2477                         GOTO(out, rc);
2478
2479                 /* It is an invalid name entry, drop it. */
2480                 if (unlikely(rc > 0)) {
2481                         rc = lfsck_namespace_shrink_linkea(env, com, child,
2482                                                 ldata, cname, &tfid, true);
2483                         if (rc >= 0) {
2484                                 snprintf(info->lti_tmpbuf,
2485                                          sizeof(info->lti_tmpbuf),
2486                                          "-"DFID, PFID(pfid));
2487                                 rc = lfsck_namespace_insert_orphan(env, com,
2488                                         child, info->lti_tmpbuf, "D", NULL);
2489                         }
2490
2491                         GOTO(out, rc);
2492                 }
2493
2494                 /* Add the missing name entry back to the namespace. */
2495                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2496                                                    cname->ln_name);
2497                 if (unlikely(rc == -ESTALE))
2498                         /* It may happen when the remote object has been
2499                          * removed, but the local MDT is not aware of that. */
2500                         goto lost_parent;
2501
2502                 if (unlikely(rc == -EEXIST)) {
2503                         /* Unfortunately, someone reused the name under the
2504                          * parent by race. So we have to remove the linkEA
2505                          * entry from current child object. It means that the
2506                          * LFSCK cannot recover the system totally back to
2507                          * its original status, but it is necessary to make
2508                          * the current system to be consistent.
2509                          *
2510                          * It also may be because of the LFSCK found some
2511                          * internal status of create operation. Under such
2512                          * case, nothing to be done. */
2513                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
2514                                         parent, child, ldata, cname, &tfid);
2515                         if (rc >= 0) {
2516                                 snprintf(info->lti_tmpbuf,
2517                                          sizeof(info->lti_tmpbuf),
2518                                          "-"DFID, PFID(pfid));
2519                                 rc = lfsck_namespace_insert_orphan(env, com,
2520                                         child, info->lti_tmpbuf, "D", NULL);
2521                         }
2522                 }
2523
2524                 GOTO(out, rc);
2525         }
2526
2527         if (rc != 0)
2528                 GOTO(out, rc);
2529
2530         if (!lu_fid_eq(&tfid, cfid)) {
2531                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2532                         *retry = true;
2533
2534                         GOTO(out, rc = 0);
2535                 }
2536
2537                 lfsck_ibits_unlock(lh, LCK_EX);
2538                 /* The name entry references another MDT-object that
2539                  * may be created by the LFSCK for repairing dangling
2540                  * name entry. Try to replace it. */
2541                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2542                                                   &tfid, cname);
2543                 if (rc == 0)
2544                         rc = lfsck_namespace_dsd_orphan(env, com, child,
2545                                                         pfid, lh, type);
2546
2547                 GOTO(out, rc);
2548         }
2549
2550         if (fid_is_zero(pfid))
2551                 GOTO(out, rc = 0);
2552
2553         /* The ".." name entry is wrong, update it. */
2554         if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
2555                 if (!lustre_handle_is_used(lh) && retry != NULL) {
2556                         *retry = true;
2557
2558                         GOTO(out, rc = 0);
2559                 }
2560
2561                 *type = LNIT_UNMATCHED_PAIRS;
2562                 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
2563                                                 lfsck_dto2fid(parent), cname);
2564         }
2565
2566         GOTO(out, rc);
2567
2568 out:
2569         if (parent != NULL && !IS_ERR(parent))
2570                 lfsck_object_put(env, parent);
2571
2572         return rc;
2573 }
2574
2575 /**
2576  * Double Scan Directory object for multiple linkEA entries case.
2577  *
2578  * The given @child has multiple linkEA entries. There is at most one linkEA
2579  * entry will be valid, all the others will be removed. Firstly, the function
2580  * will try to find out the linkEA entry for which the name entry exists under
2581  * the given parent (@pfid). If there is no linkEA entry that matches the given
2582  * ".." name entry, then tries to find out the first linkEA entry that both the
2583  * parent and the name entry exist to rebuild a new ".." name entry.
2584  *
2585  * \param[in] env       pointer to the thread context
2586  * \param[in] com       pointer to the lfsck component
2587  * \param[in] child     pointer to the directory to be double scanned
2588  * \param[in] pfid      the FID corresponding to the ".." entry
2589  * \param[in] ldata     pointer to the linkEA data for the given @child
2590  * \param[in,out] lh    ldlm lock handler for the given @child
2591  * \param[out] type     to tell the caller what the inconsistency is
2592  * \param[in] lpf       true if the ".." entry is under lost+found/MDTxxxx/
2593  *
2594  * \retval              positive number for repaired cases
2595  * \retval              0 if nothing to be repaired
2596  * \retval              negative error number on failure
2597  */
2598 static int
2599 lfsck_namespace_dsd_multiple(const struct lu_env *env,
2600                              struct lfsck_component *com,
2601                              struct dt_object *child,
2602                              const struct lu_fid *pfid,
2603                              struct linkea_data *ldata,
2604                              struct lustre_handle *lh,
2605                              enum lfsck_namespace_inconsistency_type *type,
2606                              bool lpf)
2607 {
2608         struct lfsck_thread_info *info          = lfsck_env_info(env);
2609         struct lu_name           *cname         = &info->lti_name;
2610         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
2611         struct lu_fid            *pfid2         = &info->lti_fid3;
2612         struct lu_fid             tfid;
2613         struct lfsck_namespace   *ns            = com->lc_file_ram;
2614         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2615         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2616         struct dt_object         *parent        = NULL;
2617         struct linkea_data        ldata_new     = { NULL };
2618         int                       dirent_count  = 0;
2619         int                       linkea_count  = 0;
2620         int                       rc            = 0;
2621         bool                      once          = true;
2622         ENTRY;
2623
2624 again:
2625         while (ldata->ld_lee != NULL) {
2626                 lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid,
2627                                                     info->lti_key);
2628                 /* Drop repeated linkEA entries. */
2629                 lfsck_namespace_filter_linkea_entry(ldata, cname, &tfid, true);
2630                 /* Drop invalid linkEA entry. */
2631                 if (!fid_is_sane(&tfid)) {
2632                         linkea_del_buf(ldata, cname);
2633                         linkea_count++;
2634                         continue;
2635                 }
2636
2637                 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
2638                  * then it is possible that: the directry object has ever
2639                  * been lost, but its name entry was there. In the former
2640                  * LFSCK run, during the first-stage scanning, the LFSCK
2641                  * found the dangling name entry, but it did not recreate
2642                  * the lost object, and when moved to the second-stage
2643                  * scanning, some children objects of the lost directory
2644                  * object were found, then the LFSCK recreated such lost
2645                  * directory object as an orphan.
2646                  *
2647                  * When the LFSCK runs again, if the dangling name is still
2648                  * there, the LFSCK should move the orphan directory object
2649                  * back to the normal namespace. */
2650                 if (!lpf && !lu_fid_eq(pfid, &tfid) && once) {
2651                         linkea_next_entry(ldata);
2652                         continue;
2653                 }
2654
2655                 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
2656                 if (IS_ERR(parent))
2657                         RETURN(PTR_ERR(parent));
2658
2659                 if (!dt_object_exists(parent)) {
2660                         lfsck_object_put(env, parent);
2661                         if (ldata->ld_leh->leh_reccount > 1) {
2662                                 /* If it is NOT the last linkEA entry, then
2663                                  * there is still other chance to make the
2664                                  * child to be visible via other parent, then
2665                                  * remove this linkEA entry. */
2666                                 linkea_del_buf(ldata, cname);
2667                                 linkea_count++;
2668                                 continue;
2669                         }
2670
2671                         break;
2672                 }
2673
2674                 /* The linkEA entry with bad parent will be removed. */
2675                 if (unlikely(!dt_try_as_dir(env, parent))) {
2676                         lfsck_object_put(env, parent);
2677                         linkea_del_buf(ldata, cname);
2678                         linkea_count++;
2679                         continue;
2680                 }
2681
2682                 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2683                                (const struct dt_key *)cname->ln_name,
2684                                BYPASS_CAPA);
2685                 *pfid2 = *lfsck_dto2fid(parent);
2686                 if (rc == -ENOENT) {
2687                         lfsck_object_put(env, parent);
2688                         linkea_next_entry(ldata);
2689                         continue;
2690                 }
2691
2692                 if (rc != 0) {
2693                         lfsck_object_put(env, parent);
2694
2695                         RETURN(rc);
2696                 }
2697
2698                 if (lu_fid_eq(&tfid, cfid)) {
2699                         lfsck_object_put(env, parent);
2700                         if (!lu_fid_eq(pfid, pfid2)) {
2701                                 *type = LNIT_UNMATCHED_PAIRS;
2702                                 rc = lfsck_namespace_repair_unmatched_pairs(env,
2703                                                 com, child, pfid2, cname);
2704
2705                                 RETURN(rc);
2706                         }
2707
2708 rebuild:
2709                         /* It is the most common case that we find the
2710                          * name entry corresponding to the linkEA entry
2711                          * that matches the ".." name entry. */
2712                         rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
2713                         if (rc != 0)
2714                                 RETURN(rc);
2715
2716                         rc = linkea_add_buf(&ldata_new, cname, pfid2);
2717                         if (rc != 0)
2718                                 RETURN(rc);
2719
2720                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2721                                                             &ldata_new);
2722                         if (rc < 0)
2723                                 RETURN(rc);
2724
2725                         linkea_del_buf(ldata, cname);
2726                         linkea_count++;
2727                         linkea_first_entry(ldata);
2728                         /* There may be some invalid dangling name entries under
2729                          * other parent directories, remove all of them. */
2730                         while (ldata->ld_lee != NULL) {
2731                                 lfsck_namespace_unpack_linkea_entry(ldata,
2732                                                 cname, &tfid, info->lti_key);
2733                                 if (!fid_is_sane(&tfid))
2734                                         goto next;
2735
2736                                 parent = lfsck_object_find_bottom(env, lfsck,
2737                                                                   &tfid);
2738                                 if (IS_ERR(parent)) {
2739                                         rc = PTR_ERR(parent);
2740                                         if (rc != -ENOENT &&
2741                                             bk->lb_param & LPF_FAILOUT)
2742                                                 RETURN(rc);
2743
2744                                         goto next;
2745                                 }
2746
2747                                 if (!dt_object_exists(parent)) {
2748                                         lfsck_object_put(env, parent);
2749                                         goto next;
2750                                 }
2751
2752                                 rc = lfsck_namespace_repair_dirent(env, com,
2753                                         parent, child, cname->ln_name,
2754                                         cname->ln_name, S_IFDIR, false, true);
2755                                 lfsck_object_put(env, parent);
2756                                 if (rc < 0) {
2757                                         if (bk->lb_param & LPF_FAILOUT)
2758                                                 RETURN(rc);
2759
2760                                         goto next;
2761                                 }
2762
2763                                 dirent_count += rc;
2764
2765 next:
2766                                 linkea_del_buf(ldata, cname);
2767                         }
2768
2769                         ns->ln_dirent_repaired += dirent_count;
2770
2771                         RETURN(rc);
2772                 }
2773
2774                 lfsck_ibits_unlock(lh, LCK_EX);
2775                 /* The name entry references another MDT-object that may be
2776                  * created by the LFSCK for repairing dangling name entry.
2777                  * Try to replace it. */
2778                 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2779                                                   &tfid, cname);
2780                 lfsck_object_put(env, parent);
2781                 if (rc < 0)
2782                         RETURN(rc);
2783
2784                 if (rc > 0)
2785                         goto rebuild;
2786
2787                 linkea_del_buf(ldata, cname);
2788         }
2789
2790         linkea_first_entry(ldata);
2791         if (ldata->ld_leh->leh_reccount == 1) {
2792                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
2793                                                 lh, type, NULL);
2794
2795                 if (rc == 0 && fid_is_zero(pfid) && linkea_count > 0)
2796                         rc = lfsck_namespace_rebuild_linkea(env, com, child,
2797                                                             ldata);
2798
2799                 RETURN(rc);
2800         }
2801
2802         /* All linkEA entries are invalid and removed, then handle the @child
2803          * as an orphan.*/
2804         if (ldata->ld_leh->leh_reccount == 0) {
2805                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
2806                                                 type);
2807
2808                 RETURN(rc);
2809         }
2810
2811         /* If the dangling name entry for the orphan directory object has
2812          * been remvoed, then just check whether the directory object is
2813          * still under the .lustre/lost+found/MDTxxxx/ or not. */
2814         if (lpf) {
2815                 lpf = false;
2816                 goto again;
2817         }
2818
2819         /* There is no linkEA entry that matches the ".." name entry. Find
2820          * the first linkEA entry that both parent and name entry exist to
2821          * rebuild a new ".." name entry. */
2822         if (once) {
2823                 once = false;
2824                 goto again;
2825         }
2826
2827         RETURN(rc);
2828 }
2829
2830 /**
2831  * Repair the object's nlink attribute.
2832  *
2833  * If all the known name entries have been verified, then the object's hard
2834  * link attribute should match the object's linkEA entries count unless the
2835  * object's has too much hard link to be recorded in the linkEA. Such cases
2836  * should have been marked in the LFSCK trace file. Otherwise, trust the
2837  * linkEA to update the object's nlink attribute.
2838  *
2839  * \param[in] env       pointer to the thread context
2840  * \param[in] com       pointer to the lfsck component
2841  * \param[in] obj       pointer to the dt_object to be handled
2842  * \param[in,out] la    pointer to buffer to object's attribute before
2843  *                      and after the repairing
2844  *
2845  * \retval              positive number for repaired cases
2846  * \retval              0 if nothing to be repaired
2847  * \retval              negative error number on failure
2848  */
2849 static int lfsck_namespace_repair_nlink(const struct lu_env *env,
2850                                         struct lfsck_component *com,
2851                                         struct dt_object *obj,
2852                                         struct lu_attr *la)
2853 {
2854         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2855         struct lu_fid                   *tfid   = &info->lti_fid3;
2856         struct lfsck_namespace          *ns     = com->lc_file_ram;
2857         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2858         struct dt_device                *dev    = lfsck->li_bottom;
2859         const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
2860         struct dt_object                *child  = NULL;
2861         struct thandle                  *th     = NULL;
2862         struct linkea_data               ldata  = { NULL };
2863         struct lustre_handle             lh     = { 0 };
2864         __u32                            old    = la->la_nlink;
2865         int                              idx;
2866         int                              rc     = 0;
2867         __u8                             flags;
2868         ENTRY;
2869
2870         LASSERT(!dt_object_remote(obj));
2871         LASSERT(S_ISREG(lfsck_object_type(obj)));
2872
2873         child = lfsck_object_find_by_dev(env, dev, cfid);
2874         if (IS_ERR(child))
2875                 GOTO(log, rc = PTR_ERR(child));
2876
2877         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
2878                               MDS_INODELOCK_UPDATE |
2879                               MDS_INODELOCK_XATTR, LCK_EX);
2880         if (rc != 0)
2881                 GOTO(log, rc);
2882
2883         th = dt_trans_create(env, dev);
2884         if (IS_ERR(th))
2885                 GOTO(log, rc = PTR_ERR(th));
2886
2887         la->la_valid = LA_NLINK;
2888         rc = dt_declare_attr_set(env, child, la, th);
2889         if (rc != 0)
2890                 GOTO(stop, rc);
2891
2892         rc = dt_trans_start_local(env, dev, th);
2893         if (rc != 0)
2894                 GOTO(stop, rc);
2895
2896         dt_write_lock(env, child, 0);
2897         /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2898          * ever tried to verify some remote MDT-object that resides on this
2899          * MDT, but this MDT failed to respond such request. So means there
2900          * may be some remote name entry on other MDT that references this
2901          * object with another name, so we cannot know whether this linkEA
2902          * is valid or not. So keep it there and maybe resolved when next
2903          * LFSCK run. */
2904         if (ns->ln_flags & LF_INCOMPLETE)
2905                 GOTO(unlock, rc = 0);
2906
2907         fid_cpu_to_be(tfid, cfid);
2908         idx = lfsck_sub_trace_file_fid2idx(cfid);
2909         rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj,
2910                        (struct dt_rec *)&flags, (const struct dt_key *)tfid,
2911                        BYPASS_CAPA);
2912         if (rc != 0)
2913                 GOTO(unlock, rc);
2914
2915         if (flags & LNTF_SKIP_NLINK)
2916                 GOTO(unlock, rc = 0);
2917
2918         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
2919         if (rc != 0)
2920                 GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc));
2921
2922         rc = lfsck_links_read2(env, child, &ldata);
2923         if (rc != 0)
2924                 GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc));
2925
2926         if (la->la_nlink == ldata.ld_leh->leh_reccount ||
2927             unlikely(la->la_nlink == 0))
2928                 GOTO(unlock, rc = 0);
2929
2930         la->la_nlink = ldata.ld_leh->leh_reccount;
2931         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2932                 GOTO(unlock, rc = 1);
2933
2934         rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
2935
2936         GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2937
2938 unlock:
2939         dt_write_unlock(env, child);
2940
2941 stop:
2942         dt_trans_stop(env, dev, th);
2943
2944 log:
2945         lfsck_ibits_unlock(&lh, LCK_EX);
2946         if (child != NULL && !IS_ERR(child))
2947                 lfsck_object_put(env, child);
2948
2949         CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
2950                "nlink count from %u to %u: rc = %d\n",
2951                lfsck_lfsck2name(lfsck), PFID(cfid), old, la->la_nlink, rc);
2952
2953         if (rc != 0)
2954                 ns->ln_flags |= LF_INCONSISTENT;
2955
2956         return rc;
2957 }
2958
2959 /**
2960  * Double scan the directory object for namespace LFSCK.
2961  *
2962  * This function will verify the <parent, child> pairs in the namespace tree:
2963  * the parent references the child via some name entry that should be in the
2964  * child's linkEA entry, the child should back references the parent via its
2965  * ".." name entry.
2966  *
2967  * The LFSCK will scan every linkEA entry in turn until find out the first
2968  * matched pairs. If found, then all other linkEA entries will be dropped.
2969  * If all the linkEA entries cannot match the ".." name entry, then there
2970  * are serveral possible cases:
2971  *
2972  * 1) If there is only one linkEA entry, then trust it as long as the PFID
2973  *    in the linkEA entry is valid.
2974  *
2975  * 2) If there are multiple linkEA entries, then try to find the linkEA
2976  *    that matches the ".." name entry. If found, then all other entries
2977  *    are invalid; otherwise, it is quite possible that the ".." name entry
2978  *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
2979  *    entry according to the first valid linkEA entry (both the parent and
2980  *    the name entry should exist).
2981  *
2982  * 3) If the directory object has no (valid) linkEA entry, then the
2983  *    directory object will be handled as pure orphan and inserted
2984  *    in the .lustre/lost+found/MDTxxxx/ with the name:
2985  *    ${self_FID}-${PFID}-D-${conflict_version}
2986  *
2987  * \param[in] env       pointer to the thread context
2988  * \param[in] com       pointer to the lfsck component
2989  * \param[in] child     pointer to the directory object to be handled
2990  * \param[in] flags     to indicate the specical checking on the @child
2991  *
2992  * \retval              positive number for repaired cases
2993  * \retval              0 if nothing to be repaired
2994  * \retval              negative error number on failure
2995  */
2996 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
2997                                            struct lfsck_component *com,
2998                                            struct dt_object *child, __u8 flags)
2999 {
3000         struct lfsck_thread_info *info          = lfsck_env_info(env);
3001         const struct lu_fid      *cfid          = lfsck_dto2fid(child);
3002         struct lu_fid            *pfid          = &info->lti_fid2;
3003         struct lfsck_namespace   *ns            = com->lc_file_ram;
3004         struct lfsck_instance    *lfsck         = com->lc_lfsck;
3005         struct lustre_handle      lh            = { 0 };
3006         struct linkea_data        ldata         = { NULL };
3007         bool                      unknown       = false;
3008         bool                      lpf           = false;
3009         bool                      retry         = false;
3010         enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
3011         int                       rc            = 0;
3012         ENTRY;
3013
3014         LASSERT(!dt_object_remote(child));
3015
3016         if (flags & LNTF_UNCERTAIN_LMV) {
3017                 if (flags & LNTF_RECHECK_NAME_HASH) {
3018                         rc = lfsck_namespace_scan_shard(env, com, child);
3019                         if (rc < 0)
3020                                 RETURN(rc);
3021
3022                         ns->ln_striped_shards_scanned++;
3023                 } else {
3024                         ns->ln_striped_shards_skipped++;
3025                 }
3026         }
3027
3028         flags &= ~(LNTF_RECHECK_NAME_HASH | LNTF_UNCERTAIN_LMV);
3029         if (flags == 0)
3030                 RETURN(0);
3031
3032         if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) &&
3033             !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
3034                 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
3035                        "the namespace LFSCK, then the LFSCK cannot guarantee"
3036                        "all the name entries have been verified in first-stage"
3037                        "scanning. So have to skip orphan related handling for"
3038                        "the directory object "DFID" with remote name entry\n",
3039                        lfsck_lfsck2name(lfsck), PFID(cfid));
3040
3041                 RETURN(0);
3042         }
3043
3044         if (unlikely(!dt_try_as_dir(env, child)))
3045                 GOTO(out, rc = -ENOTDIR);
3046
3047         /* We only take ldlm lock on the @child when required. When the
3048          * logic comes here for the first time, it is always false. */
3049         if (0) {
3050
3051 lock:
3052                 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
3053                                       MDS_INODELOCK_UPDATE |
3054                                       MDS_INODELOCK_XATTR, LCK_EX);
3055                 if (rc != 0)
3056                         GOTO(out, rc);
3057         }
3058
3059         dt_read_lock(env, child, 0);
3060         if (unlikely(lfsck_is_dead_obj(child))) {
3061                 dt_read_unlock(env, child);
3062
3063                 GOTO(out, rc = 0);
3064         }
3065
3066         rc = dt_lookup(env, child, (struct dt_rec *)pfid,
3067                        (const struct dt_key *)dotdot, BYPASS_CAPA);
3068         if (rc != 0) {
3069                 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
3070                         dt_read_unlock(env, child);
3071
3072                         GOTO(out, rc);
3073                 }
3074
3075                 if (!lustre_handle_is_used(&lh)) {
3076                         dt_read_unlock(env, child);
3077                         goto lock;
3078                 }
3079
3080                 fid_zero(pfid);
3081         } else if (lfsck->li_lpf_obj != NULL &&
3082                    lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
3083                 lpf = true;
3084         } else if (unlikely(!fid_is_sane(pfid))) {
3085                 fid_zero(pfid);
3086         }
3087
3088         rc = lfsck_links_read(env, child, &ldata);
3089         dt_read_unlock(env, child);
3090         if (rc != 0) {
3091                 if (rc != -ENODATA && rc != -EINVAL)
3092                         GOTO(out, rc);
3093
3094                 if (!lustre_handle_is_used(&lh))
3095                         goto lock;
3096
3097                 if (rc == -EINVAL && !fid_is_zero(pfid)) {
3098                         /* Remove the corrupted linkEA. */
3099                         rc = lfsck_namespace_links_remove(env, com, child);
3100                         if (rc == 0)
3101                                 /* Here, because of the crashed linkEA, we
3102                                  * cannot know whether there is some parent
3103                                  * that references the child directory via
3104                                  * some name entry or not. So keep it there,
3105                                  * when the LFSCK run next time, if there is
3106                                  * some parent that references this object,
3107                                  * then the LFSCK can rebuild the linkEA;
3108                                  * otherwise, this object will be handled
3109                                  * as orphan as above. */
3110                                 unknown = true;
3111                 } else {
3112                         /* 1. If we have neither ".." nor linkEA,
3113                          *    then it is an orphan.
3114                          *
3115                          * 2. If we only have the ".." name entry,
3116                          *    but no parent references this child
3117                          *    directory, then handle it as orphan. */
3118                         lfsck_ibits_unlock(&lh, LCK_EX);
3119                         type = LNIT_MUL_REF;
3120
3121                         /* If the LFSCK is marked as LF_INCOMPLETE,
3122                          * then means some MDT has ever tried to
3123                          * verify some remote MDT-object that resides
3124                          * on this MDT, but this MDT failed to respond
3125                          * such request. So means there may be some
3126                          * remote name entry on other MDT that
3127                          * references this object with another name,
3128                          * so we cannot know whether this linkEA is
3129                          * valid or not. So keep it there and maybe
3130                          * resolved when next LFSCK run. */
3131                         if (ns->ln_flags & LF_INCOMPLETE)
3132                                 GOTO(out, rc = 0);
3133
3134                         snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
3135                                  "-"DFID, PFID(pfid));
3136                         rc = lfsck_namespace_insert_orphan(env, com, child,
3137                                                 info->lti_tmpbuf, "D", NULL);
3138                 }
3139
3140                 GOTO(out, rc);
3141         }
3142
3143         linkea_first_entry(&ldata);
3144         /* This is the most common case: the object has unique linkEA entry. */
3145         if (ldata.ld_leh->leh_reccount == 1) {
3146                 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
3147                                                 &lh, &type, &retry);
3148                 if (retry) {
3149                         LASSERT(!lustre_handle_is_used(&lh));
3150
3151                         retry = false;
3152                         goto lock;
3153                 }
3154
3155                 GOTO(out, rc);
3156         }
3157
3158         if (!lustre_handle_is_used(&lh))
3159                 goto lock;
3160
3161         if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
3162                 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
3163                                                 &type);
3164
3165                 GOTO(out, rc);
3166         }
3167
3168         /* When we come here, the cases usually like that:
3169          * 1) The directory object has a corrupted linkEA entry. During the
3170          *    first-stage scanning, the LFSCK cannot know such corruption,
3171          *    then it appends the right linkEA entry according to the found
3172          *    name entry after the bad one.
3173          *
3174          * 2) The directory object has a right linkEA entry. During the
3175          *    first-stage scanning, the LFSCK finds some bad name entry,
3176          *    but the LFSCK cannot aware that at that time, then it adds
3177          *    the bad linkEA entry for further processing. */
3178         rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
3179                                           &lh, &type, lpf);
3180
3181         GOTO(out, rc);
3182
3183 out:
3184         lfsck_ibits_unlock(&lh, LCK_EX);
3185         if (rc > 0) {
3186                 switch (type) {
3187                 case LNIT_BAD_LINKEA:
3188                         ns->ln_linkea_repaired++;
3189                         break;
3190                 case LNIT_UNMATCHED_PAIRS:
3191                         ns->ln_unmatched_pairs_repaired++;
3192                         break;
3193                 case LNIT_MUL_REF:
3194                         ns->ln_mul_ref_repaired++;
3195                         break;
3196                 default:
3197                         break;
3198                 }
3199         }
3200
3201         if (unknown)
3202                 ns->ln_unknown_inconsistency++;
3203
3204         return rc;
3205 }
3206
3207 /**
3208  * Double scan the MDT-object for namespace LFSCK.
3209  *
3210  * If the MDT-object contains invalid or repeated linkEA entries, then drop
3211  * those entries from the linkEA; if the linkEA becomes empty or the object
3212  * has no linkEA, then it is an orphan and will be added into the directory
3213  * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
3214  * the remote parent; if the name entry corresponding to some linkEA entry
3215  * is lost, then add the name entry back to the namespace.
3216  *
3217  * \param[in] env       pointer to the thread context
3218  * \param[in] com       pointer to the lfsck component
3219  * \param[in] child     pointer to the dt_object to be handled
3220  * \param[in] flags     some hints to indicate how the @child should be handled
3221  *
3222  * \retval              positive number for repaired cases
3223  * \retval              0 if nothing to be repaired
3224  * \retval              negative error number on failure
3225  */
3226 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
3227                                            struct lfsck_component *com,
3228                                            struct dt_object *child, __u8 flags)
3229 {
3230         struct lfsck_thread_info *info     = lfsck_env_info(env);
3231         struct lu_attr           *la       = &info->lti_la;
3232         struct lu_name           *cname    = &info->lti_name;
3233         struct lu_fid            *pfid     = &info->lti_fid;
3234         struct lu_fid            *cfid     = &info->lti_fid2;
3235         struct lfsck_instance    *lfsck    = com->lc_lfsck;
3236         struct lfsck_namespace   *ns       = com->lc_file_ram;
3237         struct dt_object         *parent   = NULL;
3238         struct linkea_data        ldata    = { NULL };
3239         bool                      repaired = false;
3240         int                       count    = 0;
3241         int                       rc;
3242         ENTRY;
3243
3244         dt_read_lock(env, child, 0);
3245         if (unlikely(lfsck_is_dead_obj(child))) {
3246                 dt_read_unlock(env, child);
3247
3248                 RETURN(0);
3249         }
3250
3251         if (S_ISDIR(lfsck_object_type(child))) {
3252                 dt_read_unlock(env, child);
3253                 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
3254
3255                 RETURN(rc);
3256         }
3257
3258         rc = lfsck_links_read(env, child, &ldata);
3259         dt_read_unlock(env, child);
3260         if (rc != 0)
3261                 GOTO(out, rc);
3262
3263         linkea_first_entry(&ldata);
3264         while (ldata.ld_lee != NULL) {
3265                 lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
3266                                                     info->lti_key);
3267                 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
3268                                                          false);
3269                 /* Found repeated linkEA entries */
3270                 if (rc > 0) {
3271                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3272                                                 &ldata, cname, pfid, false);
3273                         if (rc < 0)
3274                                 GOTO(out, rc);
3275
3276                         if (rc == 0)
3277                                 continue;
3278
3279                         repaired = true;
3280
3281                         /* fall through */
3282                 }
3283
3284                 /* Invalid PFID in the linkEA entry. */
3285                 if (!fid_is_sane(pfid)) {
3286                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3287                                                 &ldata, cname, pfid, true);
3288                         if (rc < 0)
3289                                 GOTO(out, rc);
3290
3291                         if (rc > 0)
3292                                 repaired = true;
3293
3294                         continue;
3295                 }
3296
3297                 parent = lfsck_object_find_bottom(env, lfsck, pfid);
3298                 if (IS_ERR(parent))
3299                         GOTO(out, rc = PTR_ERR(parent));
3300
3301                 if (!dt_object_exists(parent)) {
3302
3303 lost_parent:
3304                         if (ldata.ld_leh->leh_reccount > 1) {
3305                                 /* If it is NOT the last linkEA entry, then
3306                                  * there is still other chance to make the
3307                                  * child to be visible via other parent, then
3308                                  * remove this linkEA entry. */
3309                                 rc = lfsck_namespace_shrink_linkea(env, com,
3310                                         child, &ldata, cname, pfid, true);
3311                         } else {
3312                                 /* If the LFSCK is marked as LF_INCOMPLETE,
3313                                  * then means some MDT has ever tried to
3314                                  * verify some remote MDT-object that resides
3315                                  * on this MDT, but this MDT failed to respond
3316                                  * such request. So means there may be some
3317                                  * remote name entry on other MDT that
3318                                  * references this object with another name,
3319                                  * so we cannot know whether this linkEA is
3320                                  * valid or not. So keep it there and maybe
3321                                  * resolved when next LFSCK run. */
3322                                 if (ns->ln_flags & LF_INCOMPLETE) {
3323                                         lfsck_object_put(env, parent);
3324
3325                                         GOTO(out, rc = 0);
3326                                 }
3327
3328                                 /* Create the lost parent as an orphan. */
3329                                 rc = lfsck_namespace_create_orphan_dir(env, com,
3330                                                                 parent, NULL);
3331                                 if (rc < 0) {
3332                                         lfsck_object_put(env, parent);
3333
3334                                         GOTO(out, rc);
3335                                 }
3336
3337                                 if (rc > 0)
3338                                         repaired = true;
3339
3340                                 /* Add the missing name entry to the parent. */
3341                                 rc = lfsck_namespace_insert_normal(env, com,
3342                                                 parent, child, cname->ln_name);
3343                                 if (unlikely(rc == -EEXIST))
3344                                         /* Unfortunately, someone reused the
3345                                          * name under the parent by race. So we
3346                                          * have to remove the linkEA entry from
3347                                          * current child object. It means that
3348                                          * the LFSCK cannot recover the system
3349                                          * totally back to its original status,
3350                                          * but it is necessary to make the
3351                                          * current system to be consistent. */
3352                                         rc = lfsck_namespace_shrink_linkea(env,
3353                                                         com, child, &ldata,
3354                                                         cname, pfid, true);
3355                                 else
3356                                         linkea_next_entry(&ldata);
3357                         }
3358
3359                         lfsck_object_put(env, parent);
3360                         if (rc < 0)
3361                                 GOTO(out, rc);
3362
3363                         if (rc > 0)
3364                                 repaired = true;
3365
3366                         continue;
3367                 }
3368
3369                 /* The linkEA entry with bad parent will be removed. */
3370                 if (unlikely(!dt_try_as_dir(env, parent))) {
3371                         lfsck_object_put(env, parent);
3372                         rc = lfsck_namespace_shrink_linkea(env, com, child,
3373                                                 &ldata, cname, pfid, true);
3374                         if (rc < 0)
3375                                 GOTO(out, rc);
3376
3377                         if (rc > 0)
3378                                 repaired = true;
3379
3380                         continue;
3381                 }
3382
3383                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
3384                                (const struct dt_key *)cname->ln_name,
3385                                BYPASS_CAPA);
3386                 if (rc != 0 && rc != -ENOENT) {
3387                         lfsck_object_put(env, parent);
3388
3389                         GOTO(out, rc);
3390                 }
3391
3392                 if (rc == 0) {
3393                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
3394                                 /* It is the most common case that we
3395                                  * find the name entry corresponding
3396                                  * to the linkEA entry. */
3397                                 lfsck_object_put(env, parent);
3398                                 linkea_next_entry(&ldata);
3399                         } else {
3400                                 /* The name entry references another
3401                                  * MDT-object that may be created by
3402                                  * the LFSCK for repairing dangling
3403                                  * name entry. Try to replace it. */
3404                                 rc = lfsck_namespace_replace_cond(env, com,
3405                                                 parent, child, cfid, cname);
3406                                 lfsck_object_put(env, parent);
3407                                 if (rc < 0)
3408                                         GOTO(out, rc);
3409
3410                                 if (rc > 0) {
3411                                         repaired = true;
3412                                         linkea_next_entry(&ldata);
3413                                 } else {
3414                                         rc = lfsck_namespace_shrink_linkea(env,
3415                                                         com, child, &ldata,
3416                                                         cname, pfid, true);
3417                                         if (rc < 0)
3418                                                 GOTO(out, rc);
3419
3420                                         if (rc > 0)
3421                                                 repaired = true;
3422                                 }
3423                         }