4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2013, 2015, Intel Corporation.
26 * lustre/lfsck/lfsck_namespace.c
28 * Author: Fan, Yong <fan.yong@intel.com>
31 #define DEBUG_SUBSYSTEM S_LFSCK
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
42 #include "lfsck_internal.h"
44 #define LFSCK_NAMESPACE_MAGIC_V1 0xA0629D03
45 #define LFSCK_NAMESPACE_MAGIC_V2 0xA0621A0B
47 /* For Lustre-2.x (x <= 6), the namespace LFSCK used LFSCK_NAMESPACE_MAGIC_V1
48 * as the trace file magic. When downgrade to such old release, the old LFSCK
49 * will not recognize the new LFSCK_NAMESPACE_MAGIC_V2 in the new trace file,
50 * then it will reset the whole LFSCK, and will not cause start failure. The
51 * similar case will happen when upgrade from such old release. */
52 #define LFSCK_NAMESPACE_MAGIC LFSCK_NAMESPACE_MAGIC_V2
54 enum lfsck_nameentry_check {
55 LFSCK_NAMEENTRY_DEAD = 1, /* The object has been unlinked. */
56 LFSCK_NAMEENTRY_REMOVED = 2, /* The entry has been removed. */
57 LFSCK_NAMEENTRY_RECREATED = 3, /* The entry has been recreated. */
60 static struct lfsck_namespace_req *
61 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
62 struct lfsck_assistant_object *lso,
63 struct lu_dirent *ent, __u16 type)
65 struct lfsck_namespace_req *lnr;
68 size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
71 return ERR_PTR(-ENOMEM);
73 INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
74 lnr->lnr_lar.lar_parent = lfsck_assistant_object_get(lso);
75 lnr->lnr_lmv = lfsck_lmv_get(lfsck->li_lmv);
76 lnr->lnr_fid = ent->lde_fid;
77 lnr->lnr_dir_cookie = ent->lde_hash;
78 lnr->lnr_attr = ent->lde_attrs;
81 lnr->lnr_namelen = ent->lde_namelen;
82 memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
87 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
88 struct lfsck_assistant_req *lar)
90 struct lfsck_namespace_req *lnr =
91 container_of0(lar, struct lfsck_namespace_req, lnr_lar);
93 if (lnr->lnr_lmv != NULL)
94 lfsck_lmv_put(env, lnr->lnr_lmv);
96 lfsck_assistant_object_put(env, lar->lar_parent);
97 OBD_FREE(lnr, lnr->lnr_size);
100 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
101 struct lfsck_namespace *src)
103 dst->ln_magic = le32_to_cpu(src->ln_magic);
104 dst->ln_status = le32_to_cpu(src->ln_status);
105 dst->ln_flags = le32_to_cpu(src->ln_flags);
106 dst->ln_success_count = le32_to_cpu(src->ln_success_count);
107 dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
108 dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
109 dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
110 dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
111 dst->ln_time_last_checkpoint =
112 le64_to_cpu(src->ln_time_last_checkpoint);
113 lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
114 &src->ln_pos_latest_start);
115 lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
116 &src->ln_pos_last_checkpoint);
117 lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
118 &src->ln_pos_first_inconsistent);
119 dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
120 dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
121 dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
122 dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
123 dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
124 dst->ln_objs_repaired_phase2 =
125 le64_to_cpu(src->ln_objs_repaired_phase2);
126 dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
127 dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
128 fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
129 &src->ln_fid_latest_scanned_phase2);
130 dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
131 dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
132 dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
133 dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
134 dst->ln_unknown_inconsistency =
135 le64_to_cpu(src->ln_unknown_inconsistency);
136 dst->ln_unmatched_pairs_repaired =
137 le64_to_cpu(src->ln_unmatched_pairs_repaired);
138 dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
139 dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
140 dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
141 dst->ln_lost_dirent_repaired =
142 le64_to_cpu(src->ln_lost_dirent_repaired);
143 dst->ln_striped_dirs_scanned =
144 le64_to_cpu(src->ln_striped_dirs_scanned);
145 dst->ln_striped_dirs_repaired =
146 le64_to_cpu(src->ln_striped_dirs_repaired);
147 dst->ln_striped_dirs_failed =
148 le64_to_cpu(src->ln_striped_dirs_failed);
149 dst->ln_striped_dirs_disabled =
150 le64_to_cpu(src->ln_striped_dirs_disabled);
151 dst->ln_striped_dirs_skipped =
152 le64_to_cpu(src->ln_striped_dirs_skipped);
153 dst->ln_striped_shards_scanned =
154 le64_to_cpu(src->ln_striped_shards_scanned);
155 dst->ln_striped_shards_repaired =
156 le64_to_cpu(src->ln_striped_shards_repaired);
157 dst->ln_striped_shards_failed =
158 le64_to_cpu(src->ln_striped_shards_failed);
159 dst->ln_striped_shards_skipped =
160 le64_to_cpu(src->ln_striped_shards_skipped);
161 dst->ln_name_hash_repaired = le64_to_cpu(src->ln_name_hash_repaired);
162 dst->ln_local_lpf_scanned = le64_to_cpu(src->ln_local_lpf_scanned);
163 dst->ln_local_lpf_moved = le64_to_cpu(src->ln_local_lpf_moved);
164 dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped);
165 dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed);
166 dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size);
169 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
170 struct lfsck_namespace *src)
172 dst->ln_magic = cpu_to_le32(src->ln_magic);
173 dst->ln_status = cpu_to_le32(src->ln_status);
174 dst->ln_flags = cpu_to_le32(src->ln_flags);
175 dst->ln_success_count = cpu_to_le32(src->ln_success_count);
176 dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
177 dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
178 dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
179 dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
180 dst->ln_time_last_checkpoint =
181 cpu_to_le64(src->ln_time_last_checkpoint);
182 lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
183 &src->ln_pos_latest_start);
184 lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
185 &src->ln_pos_last_checkpoint);
186 lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
187 &src->ln_pos_first_inconsistent);
188 dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
189 dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
190 dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
191 dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
192 dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
193 dst->ln_objs_repaired_phase2 =
194 cpu_to_le64(src->ln_objs_repaired_phase2);
195 dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
196 dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
197 fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
198 &src->ln_fid_latest_scanned_phase2);
199 dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
200 dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
201 dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
202 dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
203 dst->ln_unknown_inconsistency =
204 cpu_to_le64(src->ln_unknown_inconsistency);
205 dst->ln_unmatched_pairs_repaired =
206 cpu_to_le64(src->ln_unmatched_pairs_repaired);
207 dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
208 dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
209 dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
210 dst->ln_lost_dirent_repaired =
211 cpu_to_le64(src->ln_lost_dirent_repaired);
212 dst->ln_striped_dirs_scanned =
213 cpu_to_le64(src->ln_striped_dirs_scanned);
214 dst->ln_striped_dirs_repaired =
215 cpu_to_le64(src->ln_striped_dirs_repaired);
216 dst->ln_striped_dirs_failed =
217 cpu_to_le64(src->ln_striped_dirs_failed);
218 dst->ln_striped_dirs_disabled =
219 cpu_to_le64(src->ln_striped_dirs_disabled);
220 dst->ln_striped_dirs_skipped =
221 cpu_to_le64(src->ln_striped_dirs_skipped);
222 dst->ln_striped_shards_scanned =
223 cpu_to_le64(src->ln_striped_shards_scanned);
224 dst->ln_striped_shards_repaired =
225 cpu_to_le64(src->ln_striped_shards_repaired);
226 dst->ln_striped_shards_failed =
227 cpu_to_le64(src->ln_striped_shards_failed);
228 dst->ln_striped_shards_skipped =
229 cpu_to_le64(src->ln_striped_shards_skipped);
230 dst->ln_name_hash_repaired = cpu_to_le64(src->ln_name_hash_repaired);
231 dst->ln_local_lpf_scanned = cpu_to_le64(src->ln_local_lpf_scanned);
232 dst->ln_local_lpf_moved = cpu_to_le64(src->ln_local_lpf_moved);
233 dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped);
234 dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed);
235 dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size);
238 static void lfsck_namespace_record_failure(const struct lu_env *env,
239 struct lfsck_instance *lfsck,
240 struct lfsck_namespace *ns)
242 struct lfsck_position pos;
244 ns->ln_items_failed++;
245 lfsck_pos_fill(env, lfsck, &pos, false);
246 if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
247 lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
248 ns->ln_pos_first_inconsistent = pos;
250 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
251 "inconsistency at the pos [%llu, "DFID", %#llx]\n",
252 lfsck_lfsck2name(lfsck),
253 ns->ln_pos_first_inconsistent.lp_oit_cookie,
254 PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
255 ns->ln_pos_first_inconsistent.lp_dir_cookie);
260 * Load the MDT bitmap from the lfsck_namespace trace file.
262 * \param[in] env pointer to the thread context
263 * \param[in] com pointer to the lfsck component
265 * \retval 0 for success
266 * \retval negative error number on failure or data corruption
268 static int lfsck_namespace_load_bitmap(const struct lu_env *env,
269 struct lfsck_component *com)
271 struct dt_object *obj = com->lc_obj;
272 struct lfsck_assistant_data *lad = com->lc_data;
273 struct lfsck_namespace *ns = com->lc_file_ram;
274 struct cfs_bitmap *bitmap = lad->lad_bitmap;
280 if (com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size >
282 nbits = com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size;
284 nbits = ns->ln_bitmap_size;
286 if (unlikely(nbits < BITS_PER_LONG))
287 nbits = BITS_PER_LONG;
289 if (nbits > bitmap->size) {
290 __u32 new_bits = bitmap->size;
291 struct cfs_bitmap *new_bitmap;
293 while (new_bits < nbits)
296 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
297 if (new_bitmap == NULL)
300 lad->lad_bitmap = new_bitmap;
301 CFS_FREE_BITMAP(bitmap);
305 if (ns->ln_bitmap_size == 0) {
306 lad->lad_incomplete = 0;
307 CFS_RESET_BITMAP(bitmap);
312 size = (ns->ln_bitmap_size + 7) >> 3;
313 rc = dt_xattr_get(env, obj,
314 lfsck_buf_get(env, bitmap->data, size),
315 XATTR_NAME_LFSCK_BITMAP);
317 RETURN(rc >= 0 ? -EINVAL : rc);
319 if (cfs_bitmap_check_empty(bitmap))
320 lad->lad_incomplete = 0;
322 lad->lad_incomplete = 1;
328 * Load namespace LFSCK statistics information from the trace file.
330 * \param[in] env pointer to the thread context
331 * \param[in] com pointer to the lfsck component
333 * \retval 0 for success
334 * \retval negative error number on failure
336 static int lfsck_namespace_load(const struct lu_env *env,
337 struct lfsck_component *com)
339 int len = com->lc_file_size;
342 rc = dt_xattr_get(env, com->lc_obj,
343 lfsck_buf_get(env, com->lc_file_disk, len),
344 XATTR_NAME_LFSCK_NAMESPACE);
346 struct lfsck_namespace *ns = com->lc_file_ram;
348 lfsck_namespace_le_to_cpu(ns,
349 (struct lfsck_namespace *)com->lc_file_disk);
350 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
351 CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
352 "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
353 ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
358 } else if (rc != -ENODATA) {
359 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
360 "expected = %d: rc = %d\n",
361 lfsck_lfsck2name(com->lc_lfsck), len, rc);
369 static int lfsck_namespace_store(const struct lu_env *env,
370 struct lfsck_component *com)
372 struct dt_object *obj = com->lc_obj;
373 struct lfsck_instance *lfsck = com->lc_lfsck;
374 struct lfsck_namespace *ns = com->lc_file_ram;
375 struct lfsck_assistant_data *lad = com->lc_data;
376 struct dt_device *dev = lfsck_obj2dev(obj);
377 struct cfs_bitmap *bitmap = NULL;
378 struct thandle *handle;
380 int len = com->lc_file_size;
385 bitmap = lad->lad_bitmap;
386 nbits = bitmap->size;
389 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
392 ns->ln_bitmap_size = nbits;
393 lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
395 handle = dt_trans_create(env, dev);
397 GOTO(log, rc = PTR_ERR(handle));
399 rc = dt_declare_xattr_set(env, obj,
400 lfsck_buf_get(env, com->lc_file_disk, len),
401 XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
405 if (bitmap != NULL) {
406 rc = dt_declare_xattr_set(env, obj,
407 lfsck_buf_get(env, bitmap->data, nbits >> 3),
408 XATTR_NAME_LFSCK_BITMAP, 0, handle);
413 rc = dt_trans_start_local(env, dev, handle);
417 rc = dt_xattr_set(env, obj,
418 lfsck_buf_get(env, com->lc_file_disk, len),
419 XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
420 if (rc == 0 && bitmap != NULL)
421 rc = dt_xattr_set(env, obj,
422 lfsck_buf_get(env, bitmap->data, nbits >> 3),
423 XATTR_NAME_LFSCK_BITMAP, 0, handle);
428 dt_trans_stop(env, dev, handle);
432 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
433 lfsck_lfsck2name(lfsck), rc);
437 static struct dt_object *
438 lfsck_namespace_load_one_trace_file(const struct lu_env *env,
439 struct lfsck_component *com,
440 struct dt_object *parent,
441 const char *name, bool reset)
443 struct lfsck_instance *lfsck = com->lc_lfsck;
444 struct dt_object *obj;
448 rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
449 if (rc != 0 && rc != -ENOENT)
453 obj = local_index_find_or_create(env, lfsck->li_los, parent, name,
454 S_IFREG | S_IRUGO | S_IWUSR,
460 static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env,
461 struct lfsck_component *com,
464 char *name = lfsck_env_info(env)->lti_key;
465 struct lfsck_sub_trace_obj *lsto;
466 struct dt_object *obj;
470 for (i = 0, lsto = &com->lc_sub_trace_objs[0];
471 i < LFSCK_STF_COUNT; i++, lsto++) {
472 snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i);
473 mutex_lock(&lsto->lsto_mutex);
474 if (lsto->lsto_obj != NULL) {
476 mutex_unlock(&lsto->lsto_mutex);
480 lfsck_object_put(env, lsto->lsto_obj);
481 lsto->lsto_obj = NULL;
484 obj = lfsck_namespace_load_one_trace_file(env, com,
485 com->lc_lfsck->li_lfsck_dir, name, reset);
486 LASSERT(obj != NULL);
490 lsto->lsto_obj = obj;
491 rc = obj->do_ops->do_index_try(env, obj,
494 mutex_unlock(&lsto->lsto_mutex);
502 static int lfsck_namespace_init(const struct lu_env *env,
503 struct lfsck_component *com)
505 struct lfsck_namespace *ns = com->lc_file_ram;
508 memset(ns, 0, sizeof(*ns));
509 ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
510 ns->ln_status = LS_INIT;
511 down_write(&com->lc_sem);
512 rc = lfsck_namespace_store(env, com);
513 up_write(&com->lc_sem);
515 rc = lfsck_namespace_load_sub_trace_files(env, com, true);
521 * Update the namespace LFSCK trace file for the given @fid
523 * \param[in] env pointer to the thread context
524 * \param[in] com pointer to the lfsck component
525 * \param[in] fid the fid which flags to be updated in the lfsck
527 * \param[in] add true if add new flags, otherwise remove flags
529 * \retval 0 for success or nothing to be done
530 * \retval negative error number on failure
532 int lfsck_namespace_trace_update(const struct lu_env *env,
533 struct lfsck_component *com,
534 const struct lu_fid *fid,
535 const __u8 flags, bool add)
537 struct lfsck_instance *lfsck = com->lc_lfsck;
538 struct dt_object *obj;
539 struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
540 struct dt_device *dev;
541 struct thandle *th = NULL;
550 if (unlikely(!fid_is_sane(fid)))
553 idx = lfsck_sub_trace_file_fid2idx(fid);
554 mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
555 obj = com->lc_sub_trace_objs[idx].lsto_obj;
556 if (unlikely(obj == NULL)) {
557 mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
561 lfsck_object_get(obj);
562 dev = lfsck_obj2dev(obj);
563 fid_cpu_to_be(key, fid);
564 rc = dt_lookup(env, obj, (struct dt_rec *)&old,
565 (const struct dt_key *)key);
568 GOTO(unlock, rc = 0);
572 } else if (rc == 0) {
574 if ((old & flags) == flags)
575 GOTO(unlock, rc = 0);
579 if ((old & flags) == 0)
580 GOTO(unlock, rc = 0);
588 th = dt_trans_create(env, dev);
590 GOTO(log, rc = PTR_ERR(th));
593 rc = dt_declare_delete(env, obj,
594 (const struct dt_key *)key, th);
600 rc = dt_declare_insert(env, obj,
601 (const struct dt_rec *)&new,
602 (const struct dt_key *)key, th);
607 rc = dt_trans_start_local(env, dev, th);
612 rc = dt_delete(env, obj, (const struct dt_key *)key, th);
618 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
619 (const struct dt_key *)key, th, 1);
627 if (th != NULL && !IS_ERR(th))
628 dt_trans_stop(env, dev, th);
630 CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
631 "trace file, flags %x, old %x, new %x: rc = %d\n",
632 lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
633 (__u32)flags, (__u32)old, (__u32)new, rc);
636 mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
637 lfsck_object_put(env, obj);
642 int lfsck_namespace_check_exist(const struct lu_env *env,
643 struct dt_object *dir,
644 struct dt_object *obj, const char *name)
646 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
650 if (unlikely(lfsck_is_dead_obj(obj)))
651 RETURN(LFSCK_NAMEENTRY_DEAD);
653 rc = dt_lookup(env, dir, (struct dt_rec *)fid,
654 (const struct dt_key *)name);
656 RETURN(LFSCK_NAMEENTRY_REMOVED);
661 if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
662 RETURN(LFSCK_NAMEENTRY_RECREATED);
667 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
668 struct dt_object *obj,
669 struct thandle *handle)
673 /* For destroying all invalid linkEA entries. */
674 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
676 /* For insert new linkEA entry. */
677 rc = dt_declare_xattr_set(env, obj,
678 lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
679 XATTR_NAME_LINK, 0, handle);
683 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
684 struct linkea_data *ldata)
688 if (ldata->ld_buf->lb_buf == NULL)
691 if (!dt_object_exists(obj))
694 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK);
696 /* Buf was too small, figure out what we need. */
697 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK);
698 if (unlikely(rc == 0))
704 lu_buf_realloc(ldata->ld_buf, rc);
705 if (ldata->ld_buf->lb_buf == NULL)
708 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK);
711 if (unlikely(rc == 0))
715 rc = linkea_init(ldata);
721 * Remove linkEA for the given object.
723 * The caller should take the ldlm lock before the calling.
725 * \param[in] env pointer to the thread context
726 * \param[in] com pointer to the lfsck component
727 * \param[in] obj pointer to the dt_object to be handled
729 * \retval 0 for repaired cases
730 * \retval negative error number on failure
732 static int lfsck_namespace_links_remove(const struct lu_env *env,
733 struct lfsck_component *com,
734 struct dt_object *obj)
736 struct lfsck_instance *lfsck = com->lc_lfsck;
737 struct dt_device *dev = lfsck_obj2dev(obj);
738 struct thandle *th = NULL;
742 LASSERT(dt_object_remote(obj) == 0);
744 th = dt_trans_create(env, dev);
746 GOTO(log, rc = PTR_ERR(th));
748 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
752 rc = dt_trans_start_local(env, dev, th);
756 dt_write_lock(env, obj, 0);
757 if (unlikely(lfsck_is_dead_obj(obj)))
758 GOTO(unlock, rc = -ENOENT);
760 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
761 GOTO(unlock, rc = 0);
763 rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th);
768 dt_write_unlock(env, obj);
771 dt_trans_stop(env, dev, th);
774 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
775 "for the object "DFID": rc = %d\n",
776 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
779 struct lfsck_namespace *ns = com->lc_file_ram;
781 ns->ln_flags |= LF_INCONSISTENT;
787 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
788 struct linkea_data *ldata, struct thandle *handle)
790 const struct lu_buf *buf = lfsck_buf_get_const(env,
791 ldata->ld_buf->lb_buf,
792 ldata->ld_leh->leh_len);
794 return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle);
797 static int lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
798 struct lu_name *cname,
800 char *buf, const int buflen)
802 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
803 if (unlikely(ldata->ld_reclen <= 0 ||
804 ldata->ld_reclen + sizeof(struct link_ea_header) >
805 ldata->ld_leh->leh_len ||
806 cname->ln_namelen <= 0 ||
807 cname->ln_namelen > NAME_MAX ||
808 cname->ln_namelen >= buflen ||
812 /* To guarantee the 'name' is terminated with '0'. */
813 memcpy(buf, cname->ln_name, cname->ln_namelen);
814 buf[cname->ln_namelen] = 0;
815 cname->ln_name = buf;
820 static void lfsck_linkea_del_buf(struct linkea_data *ldata,
821 const struct lu_name *lname)
823 LASSERT(ldata->ld_leh != NULL && ldata->ld_lee != NULL);
825 /* If current record is corrupted, all the subsequent
826 * records will be dropped. */
827 if (unlikely(ldata->ld_reclen <= 0 ||
828 ldata->ld_reclen + sizeof(struct link_ea_header) >
829 ldata->ld_leh->leh_len)) {
830 void *ptr = ldata->ld_lee;
832 ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
833 ldata->ld_leh->leh_reccount = 0;
834 linkea_first_entry(ldata);
835 while (ldata->ld_lee != NULL &&
836 (char *)ldata->ld_lee < (char *)ptr) {
837 int reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
838 ldata->ld_lee->lee_reclen[1];
840 ldata->ld_leh->leh_len += reclen;
841 ldata->ld_leh->leh_reccount++;
842 ldata->ld_lee = (struct link_ea_entry *)
843 ((char *)ldata->ld_lee + reclen);
846 ldata->ld_lee = NULL;
848 linkea_del_buf(ldata, lname);
852 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
853 struct lu_name *cname,
857 struct link_ea_entry *oldlee;
861 oldlee = ldata->ld_lee;
862 oldlen = ldata->ld_reclen;
863 linkea_next_entry(ldata);
864 while (ldata->ld_lee != NULL) {
865 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
866 ldata->ld_lee->lee_reclen[1];
867 if (unlikely(ldata->ld_reclen == oldlen &&
868 memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
873 lfsck_linkea_del_buf(ldata, cname);
875 linkea_next_entry(ldata);
878 ldata->ld_lee = oldlee;
879 ldata->ld_reclen = oldlen;
885 * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
887 * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
888 * with the given type to generate the name, the detailed rules for name
889 * have been described as following.
891 * The function also generates the linkEA corresponding to the name entry
892 * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
894 * \param[in] env pointer to the thread context
895 * \param[in] com pointer to the lfsck component
896 * \param[in] orphan pointer to the orphan MDT-object
897 * \param[in] infix additional information for the orphan name, such as
898 * the FID for original
899 * \param[in] type the type for describing why the orphan MDT-object is
900 * created. The rules are as following:
902 * type "D": The MDT-object is a directory, it may knows its parent
903 * but because there is no valid linkEA, the LFSCK cannot
904 * know where to put it back to the namespace.
905 * type "O": The MDT-object has no linkEA, and there is no name
906 * entry that references the MDT-object.
908 * type "S": The orphan MDT-object is a shard of a striped directory
910 * \see lfsck_layout_recreate_parent() for more types.
912 * The orphan name will be like:
913 * ${FID}-${infix}-${type}-${conflict_version}
915 * \param[out] count if some others inserted some linkEA entries by race,
916 * then return the linkEA entries count.
918 * \retval positive number for repaired cases
919 * \retval 0 if needs to repair nothing
920 * \retval negative error number on failure
922 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
923 struct lfsck_component *com,
924 struct dt_object *orphan,
925 const char *infix, const char *type,
928 struct lfsck_thread_info *info = lfsck_env_info(env);
929 struct lu_name *cname = &info->lti_name;
930 struct dt_insert_rec *rec = &info->lti_dt_rec;
931 struct lu_attr *la = &info->lti_la2;
932 const struct lu_fid *cfid = lfsck_dto2fid(orphan);
933 const struct lu_fid *pfid;
935 struct lfsck_instance *lfsck = com->lc_lfsck;
936 struct dt_device *dev = lfsck_obj2dev(orphan);
937 struct dt_object *parent;
938 struct thandle *th = NULL;
939 struct lfsck_lock_handle *pllh = &info->lti_llh;
940 struct lustre_handle clh = { 0 };
941 struct linkea_data ldata2 = { NULL };
942 struct lu_buf linkea_buf;
949 cname->ln_name = NULL;
950 if (unlikely(lfsck->li_lpf_obj == NULL))
951 GOTO(log, rc = -ENXIO);
953 parent = lfsck->li_lpf_obj;
954 pfid = lfsck_dto2fid(parent);
958 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
959 PFID(cfid), infix, type, idx++);
960 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
961 (const struct dt_key *)info->lti_key);
962 if (rc != 0 && rc != -ENOENT)
965 if (unlikely(rc == 0 && lu_fid_eq(cfid, &tfid)))
967 } while (rc == 0 && !exist);
969 rc = lfsck_lock(env, lfsck, parent, info->lti_key, pllh,
970 MDS_INODELOCK_UPDATE, LCK_PW);
974 /* Re-check whether the name conflict with othrs after taken
976 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
977 (const struct dt_key *)info->lti_key);
979 if (!lu_fid_eq(cfid, &tfid)) {
986 } else if (rc != -ENOENT) {
992 cname->ln_name = info->lti_key;
993 cname->ln_namelen = namelen;
994 rc = linkea_data_new(&ldata2, &info->lti_linkea_buf2);
998 rc = linkea_add_buf(&ldata2, cname, pfid);
1002 rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
1003 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP |
1004 MDS_INODELOCK_XATTR, LCK_EX);
1008 lfsck_buf_init(&linkea_buf, ldata2.ld_buf->lb_buf,
1009 ldata2.ld_leh->leh_len);
1010 th = dt_trans_create(env, dev);
1012 GOTO(log, rc = PTR_ERR(th));
1014 if (S_ISDIR(lfsck_object_type(orphan))) {
1015 rc = dt_declare_delete(env, orphan,
1016 (const struct dt_key *)dotdot, th);
1020 rec->rec_type = S_IFDIR;
1021 rec->rec_fid = pfid;
1022 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
1023 (const struct dt_key *)dotdot, th);
1028 rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
1029 XATTR_NAME_LINK, 0, th);
1034 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
1035 rec->rec_fid = cfid;
1036 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1037 (const struct dt_key *)cname->ln_name,
1042 if (S_ISDIR(rec->rec_type)) {
1043 rc = dt_declare_ref_add(env, parent, th);
1049 memset(la, 0, sizeof(*la));
1050 la->la_ctime = cfs_time_current_sec();
1051 la->la_valid = LA_CTIME;
1052 rc = dt_declare_attr_set(env, orphan, la, th);
1056 rc = dt_trans_start_local(env, dev, th);
1060 dt_write_lock(env, orphan, 0);
1061 rc = lfsck_links_read2(env, orphan, &ldata2);
1062 if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
1063 (rc == 0 && ldata2.ld_leh != NULL &&
1064 ldata2.ld_leh->leh_reccount == 0))) {
1065 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1066 GOTO(unlock, rc = 1);
1068 if (S_ISDIR(lfsck_object_type(orphan))) {
1069 rc = dt_delete(env, orphan,
1070 (const struct dt_key *)dotdot, th);
1074 rec->rec_type = S_IFDIR;
1075 rec->rec_fid = pfid;
1076 rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
1077 (const struct dt_key *)dotdot, th, 1);
1082 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
1085 if (rc == 0 && count != NULL)
1086 *count = ldata2.ld_leh->leh_reccount;
1090 dt_write_unlock(env, orphan);
1092 if (rc == 0 && !exist) {
1093 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
1094 rec->rec_fid = cfid;
1095 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1096 (const struct dt_key *)cname->ln_name, th, 1);
1097 if (rc == 0 && S_ISDIR(rec->rec_type)) {
1098 dt_write_lock(env, parent, 0);
1099 rc = dt_ref_add(env, parent, th);
1100 dt_write_unlock(env, parent);
1105 rc = dt_attr_set(env, orphan, la, th);
1107 GOTO(stop, rc = (rc == 0 ? 1 : rc));
1110 dt_write_unlock(env, orphan);
1113 dt_trans_stop(env, dev, th);
1116 lfsck_ibits_unlock(&clh, LCK_EX);
1118 CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
1119 "object "DFID", name = %s: rc = %d\n",
1120 lfsck_lfsck2name(lfsck), PFID(cfid),
1121 cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1124 struct lfsck_namespace *ns = com->lc_file_ram;
1126 ns->ln_flags |= LF_INCONSISTENT;
1133 * Add the specified name entry back to namespace.
1135 * If there is a linkEA entry that back references a name entry under
1136 * some parent directory, but such parent directory does not have the
1137 * claimed name entry. On the other hand, the linkEA entries count is
1138 * not larger than the MDT-object's hard link count. Under such case,
1139 * it is quite possible that the name entry is lost. Then the LFSCK
1140 * should add the name entry back to the namespace.
1142 * \param[in] env pointer to the thread context
1143 * \param[in] com pointer to the lfsck component
1144 * \param[in] parent pointer to the directory under which the name entry
1145 * will be inserted into
1146 * \param[in] child pointer to the object referenced by the name entry
1147 * that to be inserted into the parent
1148 * \param[in] name the name for the child in the parent directory
1150 * \retval positive number for repaired cases
1151 * \retval 0 if nothing to be repaired
1152 * \retval negative error number on failure
1154 static int lfsck_namespace_insert_normal(const struct lu_env *env,
1155 struct lfsck_component *com,
1156 struct dt_object *parent,
1157 struct dt_object *child,
1160 struct lfsck_thread_info *info = lfsck_env_info(env);
1161 struct lu_attr *la = &info->lti_la;
1162 struct dt_insert_rec *rec = &info->lti_dt_rec;
1163 struct lfsck_instance *lfsck = com->lc_lfsck;
1164 /* The child and its name may be on different MDTs. */
1165 const struct lu_fid *pfid = lfsck_dto2fid(parent);
1166 const struct lu_fid *cfid = lfsck_dto2fid(child);
1167 struct dt_device *dev = lfsck->li_next;
1168 struct thandle *th = NULL;
1169 struct lfsck_lock_handle *llh = &info->lti_llh;
1173 /* @parent/@child may be based on lfsck->li_bottom,
1174 * but here we need the object based on the lfsck->li_next. */
1176 parent = lfsck_object_locate(dev, parent);
1178 GOTO(log, rc = PTR_ERR(parent));
1180 if (unlikely(!dt_try_as_dir(env, parent)))
1181 GOTO(log, rc = -ENOTDIR);
1183 child = lfsck_object_locate(dev, child);
1185 GOTO(log, rc = PTR_ERR(child));
1187 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1190 rc = lfsck_lock(env, lfsck, parent, name, llh,
1191 MDS_INODELOCK_UPDATE, LCK_PW);
1195 th = dt_trans_create(env, dev);
1197 GOTO(unlock, rc = PTR_ERR(th));
1199 rec->rec_type = lfsck_object_type(child) & S_IFMT;
1200 rec->rec_fid = cfid;
1201 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1202 (const struct dt_key *)name, th);
1206 if (S_ISDIR(rec->rec_type)) {
1207 rc = dt_declare_ref_add(env, parent, th);
1212 memset(la, 0, sizeof(*la));
1213 la->la_ctime = cfs_time_current_sec();
1214 la->la_valid = LA_CTIME;
1215 rc = dt_declare_attr_set(env, parent, la, th);
1219 rc = dt_declare_attr_set(env, child, la, th);
1223 rc = dt_trans_start_local(env, dev, th);
1227 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1228 (const struct dt_key *)name, th, 1);
1232 if (S_ISDIR(rec->rec_type)) {
1233 dt_write_lock(env, parent, 0);
1234 rc = dt_ref_add(env, parent, th);
1235 dt_write_unlock(env, parent);
1240 la->la_ctime = cfs_time_current_sec();
1241 rc = dt_attr_set(env, parent, la, th);
1245 rc = dt_attr_set(env, child, la, th);
1247 GOTO(stop, rc = (rc == 0 ? 1 : rc));
1250 dt_trans_stop(env, dev, th);
1256 CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
1257 "the name %s and type %o to the parent "DFID": rc = %d\n",
1258 lfsck_lfsck2name(lfsck), PFID(cfid), name,
1259 lfsck_object_type(child) & S_IFMT, PFID(pfid), rc);
1262 struct lfsck_namespace *ns = com->lc_file_ram;
1264 ns->ln_flags |= LF_INCONSISTENT;
1266 ns->ln_lost_dirent_repaired++;
1273 * Create the specified orphan directory.
1275 * For the case that the parent MDT-object stored in some MDT-object's
1276 * linkEA entry is lost, the LFSCK will re-create the parent object as
1277 * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
1278 * with the name ${FID}-P-${conflict_version}.
1280 * \param[in] env pointer to the thread context
1281 * \param[in] com pointer to the lfsck component
1282 * \param[in] orphan pointer to the orphan MDT-object to be created
1283 * \param[in] lmv pointer to master LMV EA that will be set to the orphan
1285 * \retval positive number for repaired cases
1286 * \retval negative error number on failure
1288 static int lfsck_namespace_create_orphan_dir(const struct lu_env *env,
1289 struct lfsck_component *com,
1290 struct dt_object *orphan,
1291 struct lmv_mds_md_v1 *lmv)
1293 struct lfsck_thread_info *info = lfsck_env_info(env);
1294 struct lu_attr *la = &info->lti_la;
1295 struct dt_allocation_hint *hint = &info->lti_hint;
1296 struct dt_object_format *dof = &info->lti_dof;
1297 struct lu_name *cname = &info->lti_name2;
1298 struct dt_insert_rec *rec = &info->lti_dt_rec;
1299 struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2;
1300 const struct lu_fid *cfid = lfsck_dto2fid(orphan);
1302 struct lfsck_instance *lfsck = com->lc_lfsck;
1303 struct lfsck_namespace *ns = com->lc_file_ram;
1304 struct dt_device *dev = lfsck_obj2dev(orphan);
1305 struct dt_object *parent = NULL;
1306 struct thandle *th = NULL;
1307 struct lfsck_lock_handle *llh = &info->lti_llh;
1308 struct linkea_data ldata = { NULL };
1309 struct lu_buf linkea_buf;
1310 struct lu_buf lmv_buf;
1318 LASSERT(!dt_object_exists(orphan));
1320 cname->ln_name = NULL;
1321 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1324 if (dt_object_remote(orphan)) {
1325 LASSERT(lfsck->li_lpf_root_obj != NULL);
1327 idx = lfsck_find_mdt_idx_by_fid(env, lfsck, cfid);
1329 GOTO(log, rc = idx);
1331 snprintf(name, 8, "MDT%04x", idx);
1332 rc = dt_lookup(env, lfsck->li_lpf_root_obj,
1333 (struct dt_rec *)&tfid,
1334 (const struct dt_key *)name);
1336 GOTO(log, rc = (rc == -ENOENT ? -ENXIO : rc));
1338 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
1340 GOTO(log, rc = PTR_ERR(parent));
1342 if (unlikely(!dt_try_as_dir(env, parent)))
1343 GOTO(log, rc = -ENOTDIR);
1345 if (unlikely(lfsck->li_lpf_obj == NULL))
1346 GOTO(log, rc = -ENXIO);
1348 parent = lfsck->li_lpf_obj;
1351 dev = lfsck_find_dev_by_fid(env, lfsck, cfid);
1353 GOTO(log, rc = PTR_ERR(dev));
1359 namelen = snprintf(name, 31, DFID"-P-%d",
1361 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1362 (const struct dt_key *)name);
1363 if (rc != 0 && rc != -ENOENT)
1367 rc = lfsck_lock(env, lfsck, parent, name, llh,
1368 MDS_INODELOCK_UPDATE, LCK_PW);
1372 /* Re-check whether the name conflict with othrs after taken
1374 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1375 (const struct dt_key *)name);
1376 if (unlikely(rc == 0)) {
1384 cname->ln_name = name;
1385 cname->ln_namelen = namelen;
1387 memset(la, 0, sizeof(*la));
1388 la->la_mode = S_IFDIR | 0700;
1389 la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
1390 LA_ATIME | LA_MTIME | LA_CTIME;
1392 orphan->do_ops->do_ah_init(env, hint, parent, orphan,
1393 la->la_mode & S_IFMT);
1395 memset(dof, 0, sizeof(*dof));
1396 dof->dof_type = dt_mode_to_dft(S_IFDIR);
1398 rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
1402 rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
1406 th = dt_trans_create(env, dev);
1408 GOTO(unlock1, rc = PTR_ERR(th));
1410 /* Sync the remote transaction to guarantee that the subsequent
1411 * lock against the @orphan can find the @orphan in time. */
1412 if (dt_object_remote(orphan))
1415 rc = dt_declare_create(env, orphan, la, hint, dof, th);
1419 if (unlikely(!dt_try_as_dir(env, orphan)))
1420 GOTO(stop, rc = -ENOTDIR);
1422 rc = dt_declare_ref_add(env, orphan, th);
1426 rec->rec_type = S_IFDIR;
1427 rec->rec_fid = cfid;
1428 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
1429 (const struct dt_key *)dot, th);
1433 rec->rec_fid = lfsck_dto2fid(parent);
1434 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
1435 (const struct dt_key *)dotdot, th);
1440 lmv->lmv_magic = LMV_MAGIC;
1441 lmv->lmv_master_mdt_index = lfsck_dev_idx(lfsck);
1442 lfsck_lmv_header_cpu_to_le(lmv2, lmv);
1443 lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
1444 rc = dt_declare_xattr_set(env, orphan, &lmv_buf,
1445 XATTR_NAME_LMV, 0, th);
1450 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1451 ldata.ld_leh->leh_len);
1452 rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
1453 XATTR_NAME_LINK, 0, th);
1457 rec->rec_fid = cfid;
1458 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1459 (const struct dt_key *)name, th);
1461 rc = dt_declare_ref_add(env, parent, th);
1466 rc = dt_trans_start_local(env, dev, th);
1470 dt_write_lock(env, orphan, 0);
1471 rc = dt_create(env, orphan, la, hint, dof, th);
1475 rc = dt_ref_add(env, orphan, th);
1479 rec->rec_fid = cfid;
1480 rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
1481 (const struct dt_key *)dot, th, 1);
1485 rec->rec_fid = lfsck_dto2fid(parent);
1486 rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
1487 (const struct dt_key *)dotdot, th, 1);
1492 rc = dt_xattr_set(env, orphan, &lmv_buf, XATTR_NAME_LMV, 0, th);
1497 rc = dt_xattr_set(env, orphan, &linkea_buf,
1498 XATTR_NAME_LINK, 0, th);
1499 dt_write_unlock(env, orphan);
1503 rec->rec_fid = cfid;
1504 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1505 (const struct dt_key *)name, th, 1);
1507 dt_write_lock(env, parent, 0);
1508 rc = dt_ref_add(env, parent, th);
1509 dt_write_unlock(env, parent);
1512 GOTO(stop, rc = (rc == 0 ? 1 : rc));
1515 dt_write_unlock(env, orphan);
1518 rc1 = dt_trans_stop(env, dev, th);
1519 if (rc1 != 0 && rc > 0)
1526 CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for "
1527 "the object "DFID", name = %s: rc = %d\n",
1528 lfsck_lfsck2name(lfsck), PFID(cfid),
1529 cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
1531 if (parent != NULL && !IS_ERR(parent) && parent != lfsck->li_lpf_obj)
1532 lfsck_object_put(env, parent);
1535 ns->ln_flags |= LF_INCONSISTENT;
1541 * Remove the specified entry from the linkEA.
1543 * Locate the linkEA entry with the given @cname and @pfid, then
1544 * remove this entry or the other entries those are repeated with
1547 * \param[in] env pointer to the thread context
1548 * \param[in] com pointer to the lfsck component
1549 * \param[in] obj pointer to the dt_object to be handled
1550 * \param[in,out]ldata pointer to the buffer that holds the linkEA
1551 * \param[in] cname the name for the child in the parent directory
1552 * \param[in] pfid the parent directory's FID for the linkEA
1553 * \param[in] next if true, then remove the first found linkEA
1554 * entry, and move the ldata->ld_lee to next entry
1556 * \retval positive number for repaired cases
1557 * \retval 0 if nothing to be repaired
1558 * \retval negative error number on failure
1560 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
1561 struct lfsck_component *com,
1562 struct dt_object *obj,
1563 struct linkea_data *ldata,
1564 struct lu_name *cname,
1565 struct lu_fid *pfid,
1568 struct lfsck_instance *lfsck = com->lc_lfsck;
1569 struct dt_device *dev = lfsck_obj2dev(obj);
1570 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1571 struct thandle *th = NULL;
1572 struct lustre_handle lh = { 0 };
1573 struct linkea_data ldata_new = { NULL };
1574 struct lu_buf linkea_buf;
1579 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1580 MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1586 lfsck_linkea_del_buf(ldata, cname);
1588 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
1590 if (ldata->ld_leh->leh_reccount > 0) {
1591 lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1592 ldata->ld_leh->leh_len);
1593 buflen = linkea_buf.lb_len;
1597 th = dt_trans_create(env, dev);
1599 GOTO(unlock1, rc = PTR_ERR(th));
1602 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1603 XATTR_NAME_LINK, 0, th);
1605 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
1609 rc = dt_trans_start_local(env, dev, th);
1613 dt_write_lock(env, obj, 0);
1614 if (unlikely(lfsck_is_dead_obj(obj)))
1615 GOTO(unlock2, rc = -ENOENT);
1617 rc = lfsck_links_read2(env, obj, &ldata_new);
1620 rc = (rc == -ENODATA ? 0 : rc));
1622 /* The specified linkEA entry has been removed by race. */
1623 rc = linkea_links_find(&ldata_new, cname, pfid);
1625 GOTO(unlock2, rc = 0);
1627 if (bk->lb_param & LPF_DRYRUN)
1628 GOTO(unlock2, rc = 1);
1631 lfsck_linkea_del_buf(&ldata_new, cname);
1633 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
1636 if (buflen < ldata_new.ld_leh->leh_len) {
1637 dt_write_unlock(env, obj);
1638 dt_trans_stop(env, dev, th);
1639 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1640 ldata_new.ld_leh->leh_len);
1644 if (ldata_new.ld_leh->leh_reccount > 0) {
1645 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1646 ldata_new.ld_leh->leh_len);
1647 rc = dt_xattr_set(env, obj, &linkea_buf,
1648 XATTR_NAME_LINK, 0, th);
1650 rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th);
1653 GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1656 dt_write_unlock(env, obj);
1659 dt_trans_stop(env, dev, th);
1662 lfsck_ibits_unlock(&lh, LCK_EX);
1665 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
1666 "for the object: "DFID", parent "DFID", name %.*s\n",
1667 lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
1668 PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
1672 struct lfsck_namespace *ns = com->lc_file_ram;
1674 ns->ln_flags |= LF_INCONSISTENT;
1681 * Conditionally remove the specified entry from the linkEA.
1683 * Take the parent lock firstly, then check whether the specified
1684 * name entry exists or not: if yes, do nothing; otherwise, call
1685 * lfsck_namespace_shrink_linkea() to remove the linkea entry.
1687 * \param[in] env pointer to the thread context
1688 * \param[in] com pointer to the lfsck component
1689 * \param[in] parent pointer to the parent directory
1690 * \param[in] child pointer to the child object that holds the linkEA
1691 * \param[in,out]ldata pointer to the buffer that holds the linkEA
1692 * \param[in] cname the name for the child in the parent directory
1693 * \param[in] pfid the parent directory's FID for the linkEA
1695 * \retval positive number for repaired cases
1696 * \retval 0 if nothing to be repaired
1697 * \retval negative error number on failure
1699 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
1700 struct lfsck_component *com,
1701 struct dt_object *parent,
1702 struct dt_object *child,
1703 struct linkea_data *ldata,
1704 struct lu_name *cname,
1705 struct lu_fid *pfid)
1707 struct lfsck_thread_info *info = lfsck_env_info(env);
1708 struct lu_fid *cfid = &info->lti_fid3;
1709 struct lfsck_lock_handle *llh = &info->lti_llh;
1713 rc = lfsck_lock(env, com->lc_lfsck, parent, cname->ln_name, llh,
1714 MDS_INODELOCK_UPDATE, LCK_PR);
1718 dt_read_lock(env, parent, 0);
1719 if (unlikely(lfsck_is_dead_obj(parent))) {
1720 dt_read_unlock(env, parent);
1722 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1728 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1729 (const struct dt_key *)cname->ln_name);
1730 dt_read_unlock(env, parent);
1732 /* It is safe to release the ldlm lock, because when the logic come
1733 * here, we have got all the needed information above whether the
1734 * linkEA entry is valid or not. It is not important that others
1735 * may add new linkEA entry after the ldlm lock released. If other
1736 * has removed the specified linkEA entry by race, then it is OK,
1737 * because the subsequent lfsck_namespace_shrink_linkea() can handle
1740 if (rc == -ENOENT) {
1741 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1750 /* The LFSCK just found some internal status of cross-MDTs
1751 * create operation. That is normal. */
1752 if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1753 linkea_next_entry(ldata);
1758 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1765 * Conditionally replace name entry in the parent.
1767 * As required, the LFSCK may re-create the lost MDT-object for dangling
1768 * name entry, but such repairing may be wrong because of bad FID in the
1769 * name entry. As the LFSCK processing, the real MDT-object may be found,
1770 * then the LFSCK should check whether the former re-created MDT-object
1771 * has been modified or not, if not, then destroy it and update the name
1772 * entry in the parent to reference the real MDT-object.
1774 * \param[in] env pointer to the thread context
1775 * \param[in] com pointer to the lfsck component
1776 * \param[in] parent pointer to the parent directory
1777 * \param[in] child pointer to the MDT-object that may be the real
1778 * MDT-object corresponding to the name entry in parent
1779 * \param[in] cfid the current FID in the name entry
1780 * \param[in] cname contains the name of the child in the parent directory
1782 * \retval positive number for repaired cases
1783 * \retval 0 if nothing to be repaired
1784 * \retval negative error number on failure
1786 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1787 struct lfsck_component *com,
1788 struct dt_object *parent,
1789 struct dt_object *child,
1790 const struct lu_fid *cfid,
1791 const struct lu_name *cname)
1793 struct lfsck_thread_info *info = lfsck_env_info(env);
1794 struct lu_attr *la = &info->lti_la;
1795 struct dt_insert_rec *rec = &info->lti_dt_rec;
1797 struct lfsck_instance *lfsck = com->lc_lfsck;
1798 /* The child and its name may be on different MDTs. */
1799 struct dt_device *dev = lfsck->li_next;
1800 const char *name = cname->ln_name;
1801 const struct lu_fid *pfid = lfsck_dto2fid(parent);
1802 struct dt_object *cobj = NULL;
1803 struct lfsck_lock_handle *pllh = &info->lti_llh;
1804 struct lustre_handle clh = { 0 };
1805 struct linkea_data ldata = { NULL };
1806 struct thandle *th = NULL;
1811 /* @parent/@child may be based on lfsck->li_bottom,
1812 * but here we need the object based on the lfsck->li_next. */
1814 parent = lfsck_object_locate(dev, parent);
1816 GOTO(log, rc = PTR_ERR(parent));
1818 if (unlikely(!dt_try_as_dir(env, parent)))
1819 GOTO(log, rc = -ENOTDIR);
1821 rc = lfsck_lock(env, lfsck, parent, name, pllh,
1822 MDS_INODELOCK_UPDATE, LCK_PW);
1826 if (!fid_is_sane(cfid)) {
1831 cobj = lfsck_object_find_by_dev(env, dev, cfid);
1834 if (rc == -ENOENT) {
1842 if (!dt_object_exists(cobj)) {
1847 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
1848 (const struct dt_key *)name);
1849 if (rc == -ENOENT) {
1857 /* Someone changed the name entry, cannot replace it. */
1858 if (!lu_fid_eq(cfid, &tfid))
1861 /* lock the object to be destroyed. */
1862 rc = lfsck_ibits_lock(env, lfsck, cobj, &clh,
1863 MDS_INODELOCK_UPDATE |
1864 MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1869 if (unlikely(lfsck_is_dead_obj(cobj))) {
1874 rc = dt_attr_get(env, cobj, la);
1878 /* The object has been modified by other(s), or it is not created by
1879 * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1880 if (la->la_ctime != 0)
1883 if (S_ISREG(la->la_mode)) {
1884 rc = dt_xattr_get(env, cobj, &LU_BUF_NULL, XATTR_NAME_LOV);
1885 /* If someone has created related OST-object(s),
1887 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1888 GOTO(log, rc = (rc > 0 ? 0 : rc));
1892 dt_read_lock(env, child, 0);
1893 rc = lfsck_links_read2(env, child, &ldata);
1894 dt_read_unlock(env, child);
1896 /* Someone changed the child, no need to replace. */
1903 rc = linkea_links_find(&ldata, cname, pfid);
1904 /* Someone moved the child, no need to replace. */
1908 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1911 th = dt_trans_create(env, dev);
1913 GOTO(log, rc = PTR_ERR(th));
1916 rc = dt_declare_destroy(env, cobj, th);
1921 rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1925 rec->rec_type = S_IFDIR;
1926 rec->rec_fid = lfsck_dto2fid(child);
1927 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1928 (const struct dt_key *)name, th);
1932 rc = dt_trans_start_local(env, dev, th);
1937 rc = dt_destroy(env, cobj, th);
1942 /* The old name entry maybe not exist. */
1943 rc = dt_delete(env, parent, (const struct dt_key *)name, th);
1944 if (rc != 0 && rc != -ENOENT)
1947 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1948 (const struct dt_key *)name, th, 1);
1950 GOTO(stop, rc = (rc == 0 ? 1 : rc));
1953 dt_trans_stop(env, dev, th);
1956 lfsck_ibits_unlock(&clh, LCK_EX);
1959 if (cobj != NULL && !IS_ERR(cobj))
1960 lfsck_object_put(env, cobj);
1962 CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1963 "object "DFID" because of conflict with the object "DFID
1964 " under the parent "DFID" with name %s: rc = %d\n",
1965 lfsck_lfsck2name(lfsck), PFID(cfid),
1966 PFID(lfsck_dto2fid(child)), PFID(pfid), name, rc);
1972 * Overwrite the linkEA for the object with the given ldata.
1974 * The caller should take the ldlm lock before the calling.
1976 * \param[in] env pointer to the thread context
1977 * \param[in] com pointer to the lfsck component
1978 * \param[in] obj pointer to the dt_object to be handled
1979 * \param[in] ldata pointer to the new linkEA data
1981 * \retval positive number for repaired cases
1982 * \retval 0 if nothing to be repaired
1983 * \retval negative error number on failure
1985 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1986 struct lfsck_component *com,
1987 struct dt_object *obj,
1988 struct linkea_data *ldata)
1990 struct lfsck_instance *lfsck = com->lc_lfsck;
1991 struct dt_device *dev = lfsck_obj2dev(obj);
1992 struct thandle *th = NULL;
1993 struct lu_buf linkea_buf;
1997 th = dt_trans_create(env, dev);
1999 GOTO(log, rc = PTR_ERR(th));
2001 lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
2002 ldata->ld_leh->leh_len);
2003 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
2004 XATTR_NAME_LINK, 0, th);
2008 rc = dt_trans_start_local(env, dev, th);
2012 dt_write_lock(env, obj, 0);
2013 if (unlikely(lfsck_is_dead_obj(obj)))
2014 GOTO(unlock, rc = 0);
2016 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2017 GOTO(unlock, rc = 1);
2019 rc = dt_xattr_set(env, obj, &linkea_buf,
2020 XATTR_NAME_LINK, 0, th);
2022 GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2025 dt_write_unlock(env, obj);
2028 dt_trans_stop(env, dev, th);
2031 CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
2032 "object "DFID": rc = %d\n",
2033 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
2036 struct lfsck_namespace *ns = com->lc_file_ram;
2038 ns->ln_flags |= LF_INCONSISTENT;
2045 * Repair invalid name entry.
2047 * If the name entry contains invalid information, such as bad file type
2048 * or (and) corrupted object FID, then either remove the name entry or
2049 * udpate the name entry with the given (right) information.
2051 * \param[in] env pointer to the thread context
2052 * \param[in] com pointer to the lfsck component
2053 * \param[in] parent pointer to the parent directory
2054 * \param[in] child pointer to the object referenced by the name entry
2055 * \param[in] name the old name of the child under the parent directory
2056 * \param[in] name2 the new name of the child under the parent directory
2057 * \param[in] type the type claimed by the name entry
2058 * \param[in] update update the name entry if true; otherwise, remove it
2059 * \param[in] dec decrease the parent nlink count if true
2061 * \retval positive number for repaired successfully
2062 * \retval 0 if nothing to be repaired
2063 * \retval negative error number on failure
2065 int lfsck_namespace_repair_dirent(const struct lu_env *env,
2066 struct lfsck_component *com,
2067 struct dt_object *parent,
2068 struct dt_object *child,
2069 const char *name, const char *name2,
2070 __u16 type, bool update, bool dec)
2072 struct lfsck_thread_info *info = lfsck_env_info(env);
2073 struct dt_insert_rec *rec = &info->lti_dt_rec;
2074 const struct lu_fid *pfid = lfsck_dto2fid(parent);
2075 const struct lu_fid *cfid = lfsck_dto2fid(child);
2077 struct lfsck_instance *lfsck = com->lc_lfsck;
2078 struct dt_device *dev = lfsck->li_next;
2079 struct thandle *th = NULL;
2080 struct lfsck_lock_handle *llh = &info->lti_llh;
2081 struct lustre_handle lh = { 0 };
2085 parent = lfsck_object_locate(dev, parent);
2087 GOTO(log, rc = PTR_ERR(parent));
2089 if (unlikely(!dt_try_as_dir(env, parent)))
2090 GOTO(log, rc = -ENOTDIR);
2092 if (!update || strcmp(name, name2) == 0)
2093 rc = lfsck_lock(env, lfsck, parent, name, llh,
2094 MDS_INODELOCK_UPDATE, LCK_PW);
2096 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2097 MDS_INODELOCK_UPDATE, LCK_PW);
2101 th = dt_trans_create(env, dev);
2103 GOTO(unlock1, rc = PTR_ERR(th));
2105 rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2110 rec->rec_type = lfsck_object_type(child) & S_IFMT;
2111 rec->rec_fid = cfid;
2112 rc = dt_declare_insert(env, parent,
2113 (const struct dt_rec *)rec,
2114 (const struct dt_key *)name2, th);
2119 if (dec && S_ISDIR(type)) {
2120 rc = dt_declare_ref_del(env, parent, th);
2125 rc = dt_trans_start_local(env, dev, th);
2130 dt_write_lock(env, parent, 0);
2131 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2132 (const struct dt_key *)name);
2133 /* Someone has removed the bad name entry by race. */
2135 GOTO(unlock2, rc = 0);
2140 /* Someone has removed the bad name entry and reused it for other
2141 * object by race. */
2142 if (!lu_fid_eq(&tfid, cfid))
2143 GOTO(unlock2, rc = 0);
2145 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2146 GOTO(unlock2, rc = 1);
2148 rc = dt_delete(env, parent, (const struct dt_key *)name, th);
2153 rc = dt_insert(env, parent,
2154 (const struct dt_rec *)rec,
2155 (const struct dt_key *)name2, th, 1);
2160 if (dec && S_ISDIR(type)) {
2161 rc = dt_ref_del(env, parent, th);
2166 GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2169 dt_write_unlock(env, parent);
2172 dt_trans_stop(env, dev, th);
2174 /* We are not sure whether the child will become orphan or not.
2175 * Record it in the LFSCK trace file for further checking in
2176 * the second-stage scanning. */
2177 if (!update && !dec && rc == 0)
2178 lfsck_namespace_trace_update(env, com, cfid,
2179 LNTF_CHECK_LINKEA, true);
2182 /* It is harmless even if unlock the unused lock_handle */
2183 lfsck_ibits_unlock(&lh, LCK_PW);
2187 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
2188 "entry for: parent "DFID", child "DFID", name %s, type "
2189 "in name entry %o, type claimed by child %o. repair it "
2190 "by %s with new name2 %s: rc = %d\n",
2191 lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2192 name, type, update ? lfsck_object_type(child) : 0,
2193 update ? "updating" : "removing", name2, rc);
2196 struct lfsck_namespace *ns = com->lc_file_ram;
2198 ns->ln_flags |= LF_INCONSISTENT;
2205 * Update the ".." name entry for the given object.
2207 * The object's ".." is corrupted, this function will update the ".." name
2208 * entry with the given pfid, and the linkEA with the given ldata.
2210 * The caller should take the ldlm lock before the calling.
2212 * \param[in] env pointer to the thread context
2213 * \param[in] com pointer to the lfsck component
2214 * \param[in] obj pointer to the dt_object to be handled
2215 * \param[in] pfid the new fid for the object's ".." name entry
2216 * \param[in] cname the name for the @obj in the parent directory
2218 * \retval positive number for repaired cases
2219 * \retval 0 if nothing to be repaired
2220 * \retval negative error number on failure
2222 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
2223 struct lfsck_component *com,
2224 struct dt_object *obj,
2225 const struct lu_fid *pfid,
2226 struct lu_name *cname)
2228 struct lfsck_thread_info *info = lfsck_env_info(env);
2229 struct dt_insert_rec *rec = &info->lti_dt_rec;
2230 struct lfsck_instance *lfsck = com->lc_lfsck;
2231 struct dt_device *dev = lfsck_obj2dev(obj);
2232 struct thandle *th = NULL;
2233 struct linkea_data ldata = { NULL };
2234 struct lu_buf linkea_buf;
2238 LASSERT(!dt_object_remote(obj));
2239 LASSERT(S_ISDIR(lfsck_object_type(obj)));
2241 rc = linkea_data_new(&ldata, &info->lti_big_buf);
2245 rc = linkea_add_buf(&ldata, cname, pfid);
2249 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2250 ldata.ld_leh->leh_len);
2252 th = dt_trans_create(env, dev);
2254 GOTO(log, rc = PTR_ERR(th));
2256 rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
2260 rec->rec_type = S_IFDIR;
2261 rec->rec_fid = pfid;
2262 rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
2263 (const struct dt_key *)dotdot, th);
2267 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
2268 XATTR_NAME_LINK, 0, th);
2272 rc = dt_trans_start_local(env, dev, th);
2276 dt_write_lock(env, obj, 0);
2277 if (unlikely(lfsck_is_dead_obj(obj)))
2278 GOTO(unlock, rc = 0);
2280 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
2281 GOTO(unlock, rc = 1);
2283 /* The old ".." name entry maybe not exist. */
2284 dt_delete(env, obj, (const struct dt_key *)dotdot, th);
2286 rc = dt_insert(env, obj, (const struct dt_rec *)rec,
2287 (const struct dt_key *)dotdot, th, 1);
2291 rc = dt_xattr_set(env, obj, &linkea_buf,
2292 XATTR_NAME_LINK, 0, th);
2294 GOTO(unlock, rc = (rc == 0 ? 1 : rc));
2297 dt_write_unlock(env, obj);
2300 dt_trans_stop(env, dev, th);
2303 CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
2304 "the object "DFID", new parent "DFID": rc = %d\n",
2305 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
2309 struct lfsck_namespace *ns = com->lc_file_ram;
2311 ns->ln_flags |= LF_INCONSISTENT;
2318 * Handle orphan @obj during Double Scan Directory.
2320 * Remove the @obj's current (invalid) linkEA entries, and insert
2321 * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
2322 * ${FID}-${PFID}-D-${conflict_version}
2324 * The caller should take the ldlm lock before the calling.
2326 * \param[in] env pointer to the thread context
2327 * \param[in] com pointer to the lfsck component
2328 * \param[in] obj pointer to the orphan object to be handled
2329 * \param[in] pfid the new fid for the object's ".." name entry
2330 * \param[in,out] lh ldlm lock handler for the given @obj
2331 * \param[out] type to tell the caller what the inconsistency is
2333 * \retval positive number for repaired cases
2334 * \retval 0 if nothing to be repaired
2335 * \retval negative error number on failure
2338 lfsck_namespace_dsd_orphan(const struct lu_env *env,
2339 struct lfsck_component *com,
2340 struct dt_object *obj,
2341 const struct lu_fid *pfid,
2342 struct lustre_handle *lh,
2343 enum lfsck_namespace_inconsistency_type *type)
2345 struct lfsck_thread_info *info = lfsck_env_info(env);
2346 struct lfsck_namespace *ns = com->lc_file_ram;
2350 /* Remove the unrecognized linkEA. */
2351 rc = lfsck_namespace_links_remove(env, com, obj);
2352 lfsck_ibits_unlock(lh, LCK_EX);
2353 if (rc < 0 && rc != -ENODATA)
2356 *type = LNIT_MUL_REF;
2358 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2359 * ever tried to verify some remote MDT-object that resides on this
2360 * MDT, but this MDT failed to respond such request. So means there
2361 * may be some remote name entry on other MDT that references this
2362 * object with another name, so we cannot know whether this linkEA
2363 * is valid or not. So keep it there and maybe resolved when next
2365 if (ns->ln_flags & LF_INCOMPLETE)
2368 /* The unique linkEA is invalid, even if the ".." name entry may be
2369 * valid, we still cannot know via which name entry this directory
2370 * will be referenced. Then handle it as pure orphan. */
2371 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2372 "-"DFID, PFID(pfid));
2373 rc = lfsck_namespace_insert_orphan(env, com, obj,
2374 info->lti_tmpbuf, "D", NULL);
2380 * Double Scan Directory object for single linkEA entry case.
2382 * The given @child has unique linkEA entry. If the linkEA entry is valid,
2383 * then check whether the name is in the namespace or not, if not, add the
2384 * missing name entry back to namespace. If the linkEA entry is invalid,
2385 * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
2388 * \param[in] env pointer to the thread context
2389 * \param[in] com pointer to the lfsck component
2390 * \param[in] child pointer to the directory to be double scanned
2391 * \param[in] pfid the FID corresponding to the ".." entry
2392 * \param[in] ldata pointer to the linkEA data for the given @child
2393 * \param[in,out] lh ldlm lock handler for the given @child
2394 * \param[out] type to tell the caller what the inconsistency is
2395 * \param[in] retry if found inconsistency, but the caller does not hold
2396 * ldlm lock on the @child, then set @retry as true
2397 * \param[in] unknown set if does not know how to repair the inconsistency
2399 * \retval positive number for repaired cases
2400 * \retval 0 if nothing to be repaired
2401 * \retval negative error number on failure
2404 lfsck_namespace_dsd_single(const struct lu_env *env,
2405 struct lfsck_component *com,
2406 struct dt_object *child,
2407 const struct lu_fid *pfid,
2408 struct linkea_data *ldata,
2409 struct lustre_handle *lh,
2410 enum lfsck_namespace_inconsistency_type *type,
2411 bool *retry, bool *unknown)
2413 struct lfsck_thread_info *info = lfsck_env_info(env);
2414 struct lu_name *cname = &info->lti_name;
2415 const struct lu_fid *cfid = lfsck_dto2fid(child);
2417 struct lfsck_namespace *ns = com->lc_file_ram;
2418 struct lfsck_instance *lfsck = com->lc_lfsck;
2419 struct dt_object *parent = NULL;
2420 struct lmv_mds_md_v1 *lmv;
2424 rc = lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid,
2426 sizeof(info->lti_key));
2427 /* The unique linkEA entry with bad parent will be handled as orphan. */
2429 if (!lustre_handle_is_used(lh) && retry != NULL)
2432 rc = lfsck_namespace_dsd_orphan(env, com, child,
2438 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
2440 GOTO(out, rc = PTR_ERR(parent));
2442 /* We trust the unique linkEA entry in spite of whether it matches the
2443 * ".." name entry or not. Because even if the linkEA entry is wrong
2444 * and the ".." name entry is right, we still cannot know via which
2445 * name entry the child will be referenced, since all known entries
2446 * have been verified during the first-stage scanning. */
2447 if (!dt_object_exists(parent)) {
2448 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2449 * has ever tried to verify some remote MDT-object that resides
2450 * on this MDT, but this MDT failed to respond such request. So
2451 * means there may be some remote name entry on other MDT that
2452 * references this object with another name, so we cannot know
2453 * whether this linkEA is valid or not. So keep it there and
2454 * maybe resolved when next LFSCK run. */
2455 if (ns->ln_flags & LF_INCOMPLETE)
2458 if (!lustre_handle_is_used(lh) && retry != NULL) {
2464 lfsck_ibits_unlock(lh, LCK_EX);
2467 lmv = &info->lti_lmv;
2468 rc = lfsck_read_stripe_lmv(env, child, lmv);
2469 if (rc != 0 && rc != -ENODATA)
2472 if (rc == -ENODATA || lmv->lmv_magic != LMV_MAGIC_STRIPE) {
2474 } else if (lfsck_shard_name_to_index(env,
2475 cname->ln_name, cname->ln_namelen,
2476 S_IFDIR, cfid) < 0) {
2477 /* It is an invalid name entry, we
2478 * cannot trust the parent also. */
2479 rc = lfsck_namespace_shrink_linkea(env, com, child,
2480 ldata, cname, &tfid, true);
2484 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2485 "-"DFID, PFID(pfid));
2486 rc = lfsck_namespace_insert_orphan(env, com, child,
2487 info->lti_tmpbuf, "S", NULL);
2492 /* Create the lost parent as an orphan. */
2493 rc = lfsck_namespace_create_orphan_dir(env, com, parent, lmv);
2495 /* Add the missing name entry to the parent. */
2496 rc = lfsck_namespace_insert_normal(env, com, parent,
2497 child, cname->ln_name);
2498 if (unlikely(rc == -EEXIST)) {
2499 /* Unfortunately, someone reused the name
2500 * under the parent by race. So we have
2501 * to remove the linkEA entry from
2502 * current child object. It means that the
2503 * LFSCK cannot recover the system
2504 * totally back to its original status,
2505 * but it is necessary to make the
2506 * current system to be consistent. */
2507 rc = lfsck_namespace_shrink_linkea(env,
2509 cname, &tfid, true);
2511 snprintf(info->lti_tmpbuf,
2512 sizeof(info->lti_tmpbuf),
2513 "-"DFID, PFID(pfid));
2514 rc = lfsck_namespace_insert_orphan(env,
2515 com, child, info->lti_tmpbuf,
2522 } /* !dt_object_exists(parent) */
2524 /* The unique linkEA entry with bad parent will be handled as orphan. */
2525 if (unlikely(!dt_try_as_dir(env, parent))) {
2526 if (!lustre_handle_is_used(lh) && retry != NULL)
2529 rc = lfsck_namespace_dsd_orphan(env, com, child,
2535 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2536 (const struct dt_key *)cname->ln_name);
2537 if (rc == -ENOENT) {
2538 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
2539 * has ever tried to verify some remote MDT-object that resides
2540 * on this MDT, but this MDT failed to respond such request. So
2541 * means there may be some remote name entry on other MDT that
2542 * references this object with another name, so we cannot know
2543 * whether this linkEA is valid or not. So keep it there and
2544 * maybe resolved when next LFSCK run. */
2545 if (ns->ln_flags & LF_INCOMPLETE)
2548 if (!lustre_handle_is_used(lh) && retry != NULL) {
2554 lfsck_ibits_unlock(lh, LCK_EX);
2555 rc = lfsck_namespace_check_name(env, parent, child, cname);
2562 /* It is an invalid name entry, drop it. */
2563 if (unlikely(rc > 0)) {
2564 rc = lfsck_namespace_shrink_linkea(env, com, child,
2565 ldata, cname, &tfid, true);
2567 snprintf(info->lti_tmpbuf,
2568 sizeof(info->lti_tmpbuf),
2569 "-"DFID, PFID(pfid));
2570 rc = lfsck_namespace_insert_orphan(env, com,
2571 child, info->lti_tmpbuf, "D", NULL);
2577 /* Add the missing name entry back to the namespace. */
2578 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2580 if (unlikely(rc == -ESTALE))
2581 /* It may happen when the remote object has been
2582 * removed, but the local MDT is not aware of that. */
2585 if (unlikely(rc == -EEXIST)) {
2586 /* Unfortunately, someone reused the name under the
2587 * parent by race. So we have to remove the linkEA
2588 * entry from current child object. It means that the
2589 * LFSCK cannot recover the system totally back to
2590 * its original status, but it is necessary to make
2591 * the current system to be consistent.
2593 * It also may be because of the LFSCK found some
2594 * internal status of create operation. Under such
2595 * case, nothing to be done. */
2596 rc = lfsck_namespace_shrink_linkea_cond(env, com,
2597 parent, child, ldata, cname, &tfid);
2599 snprintf(info->lti_tmpbuf,
2600 sizeof(info->lti_tmpbuf),
2601 "-"DFID, PFID(pfid));
2602 rc = lfsck_namespace_insert_orphan(env, com,
2603 child, info->lti_tmpbuf, "D", NULL);
2608 } /* rc == -ENOENT */
2613 if (!lu_fid_eq(&tfid, cfid)) {
2614 if (!lustre_handle_is_used(lh) && retry != NULL) {
2620 lfsck_ibits_unlock(lh, LCK_EX);
2621 /* The name entry references another MDT-object that
2622 * may be created by the LFSCK for repairing dangling
2623 * name entry. Try to replace it. */
2624 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2627 rc = lfsck_namespace_dsd_orphan(env, com, child,
2633 /* Zero FID may because the remote directroy object has invalid linkEA,
2634 * or lost linkEA. Under such case, the LFSCK on this MDT does not know
2635 * how to repair the inconsistency, but the namespace LFSCK on the MDT
2636 * where its name entry resides may has more information (name, FID) to
2637 * repair such inconsistency. So here, keep the inconsistency to avoid
2638 * some imporper repairing. */
2639 if (fid_is_zero(pfid)) {
2646 /* The ".." name entry is wrong, update it. */
2647 if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
2648 if (!lustre_handle_is_used(lh) && retry != NULL) {
2654 *type = LNIT_UNMATCHED_PAIRS;
2655 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
2656 lfsck_dto2fid(parent), cname);
2662 if (parent != NULL && !IS_ERR(parent))
2663 lfsck_object_put(env, parent);
2669 * Double Scan Directory object for multiple linkEA entries case.
2671 * The given @child has multiple linkEA entries. There is at most one linkEA
2672 * entry will be valid, all the others will be removed. Firstly, the function
2673 * will try to find out the linkEA entry for which the name entry exists under
2674 * the given parent (@pfid). If there is no linkEA entry that matches the given
2675 * ".." name entry, then tries to find out the first linkEA entry that both the
2676 * parent and the name entry exist to rebuild a new ".." name entry.
2678 * \param[in] env pointer to the thread context
2679 * \param[in] com pointer to the lfsck component
2680 * \param[in] child pointer to the directory to be double scanned
2681 * \param[in] pfid the FID corresponding to the ".." entry
2682 * \param[in] ldata pointer to the linkEA data for the given @child
2683 * \param[in,out] lh ldlm lock handler for the given @child
2684 * \param[out] type to tell the caller what the inconsistency is
2685 * \param[in] lpf true if the ".." entry is under lost+found/MDTxxxx/
2686 * \param[in] unknown set if does not know how to repair the inconsistency
2688 * \retval positive number for repaired cases
2689 * \retval 0 if nothing to be repaired
2690 * \retval negative error number on failure
2693 lfsck_namespace_dsd_multiple(const struct lu_env *env,
2694 struct lfsck_component *com,
2695 struct dt_object *child,
2696 const struct lu_fid *pfid,
2697 struct linkea_data *ldata,
2698 struct lustre_handle *lh,
2699 enum lfsck_namespace_inconsistency_type *type,
2700 bool lpf, bool *unknown)
2702 struct lfsck_thread_info *info = lfsck_env_info(env);
2703 struct lu_name *cname = &info->lti_name;
2704 const struct lu_fid *cfid = lfsck_dto2fid(child);
2705 struct lu_fid *pfid2 = &info->lti_fid3;
2707 struct lfsck_namespace *ns = com->lc_file_ram;
2708 struct lfsck_instance *lfsck = com->lc_lfsck;
2709 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2710 struct dt_object *parent = NULL;
2711 struct linkea_data ldata_new = { NULL };
2712 int dirent_count = 0;
2718 while (ldata->ld_lee != NULL) {
2719 rc = lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid,
2721 sizeof(info->lti_key));
2722 /* Drop invalid linkEA entry. */
2724 lfsck_linkea_del_buf(ldata, cname);
2728 /* Drop repeated linkEA entries. */
2729 lfsck_namespace_filter_linkea_entry(ldata, cname, &tfid, true);
2731 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
2732 * then it is possible that: the directry object has ever
2733 * been lost, but its name entry was there. In the former
2734 * LFSCK run, during the first-stage scanning, the LFSCK
2735 * found the dangling name entry, but it did not recreate
2736 * the lost object, and when moved to the second-stage
2737 * scanning, some children objects of the lost directory
2738 * object were found, then the LFSCK recreated such lost
2739 * directory object as an orphan.
2741 * When the LFSCK runs again, if the dangling name is still
2742 * there, the LFSCK should move the orphan directory object
2743 * back to the normal namespace. */
2744 if (!lpf && !lu_fid_eq(pfid, &tfid) && once) {
2745 linkea_next_entry(ldata);
2749 parent = lfsck_object_find_bottom(env, lfsck, &tfid);
2751 RETURN(PTR_ERR(parent));
2753 if (!dt_object_exists(parent)) {
2754 lfsck_object_put(env, parent);
2755 if (ldata->ld_leh->leh_reccount > 1) {
2756 /* If it is NOT the last linkEA entry, then
2757 * there is still other chance to make the
2758 * child to be visible via other parent, then
2759 * remove this linkEA entry. */
2760 lfsck_linkea_del_buf(ldata, cname);
2767 /* The linkEA entry with bad parent will be removed. */
2768 if (unlikely(!dt_try_as_dir(env, parent))) {
2769 lfsck_object_put(env, parent);
2770 lfsck_linkea_del_buf(ldata, cname);
2774 rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
2775 (const struct dt_key *)cname->ln_name);
2776 *pfid2 = *lfsck_dto2fid(parent);
2777 if (rc == -ENOENT) {
2778 lfsck_object_put(env, parent);
2779 linkea_next_entry(ldata);
2784 lfsck_object_put(env, parent);
2789 if (lu_fid_eq(&tfid, cfid)) {
2790 lfsck_object_put(env, parent);
2791 /* If the parent (that is declared via linkEA entry)
2792 * directory contains the specified child, but such
2793 * parent does not match the dotdot name entry, then
2794 * trust the linkEA. */
2795 if (!lu_fid_eq(pfid, pfid2)) {
2796 *type = LNIT_UNMATCHED_PAIRS;
2797 rc = lfsck_namespace_repair_unmatched_pairs(env,
2798 com, child, pfid2, cname);
2804 /* It is the most common case that we find the
2805 * name entry corresponding to the linkEA entry
2806 * that matches the ".." name entry. */
2807 rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
2811 rc = linkea_add_buf(&ldata_new, cname, pfid2);
2815 rc = lfsck_namespace_rebuild_linkea(env, com, child,
2820 lfsck_linkea_del_buf(ldata, cname);
2821 linkea_first_entry(ldata);
2822 /* There may be some invalid dangling name entries under
2823 * other parent directories, remove all of them. */
2824 while (ldata->ld_lee != NULL) {
2825 rc = lfsck_namespace_unpack_linkea_entry(ldata,
2826 cname, &tfid, info->lti_key,
2827 sizeof(info->lti_key));
2831 parent = lfsck_object_find_bottom(env, lfsck,
2833 if (IS_ERR(parent)) {
2834 rc = PTR_ERR(parent);
2835 if (rc != -ENOENT &&
2836 bk->lb_param & LPF_FAILOUT)
2842 if (!dt_object_exists(parent)) {
2843 lfsck_object_put(env, parent);
2847 rc = lfsck_namespace_repair_dirent(env, com,
2848 parent, child, cname->ln_name,
2849 cname->ln_name, S_IFDIR, false, true);
2850 lfsck_object_put(env, parent);
2852 if (bk->lb_param & LPF_FAILOUT)
2861 lfsck_linkea_del_buf(ldata, cname);
2864 ns->ln_dirent_repaired += dirent_count;
2867 } /* lu_fid_eq(&tfid, lfsck_dto2fid(child)) */
2869 lfsck_ibits_unlock(lh, LCK_EX);
2870 /* The name entry references another MDT-object that may be
2871 * created by the LFSCK for repairing dangling name entry.
2872 * Try to replace it. */
2873 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2875 lfsck_object_put(env, parent);
2882 lfsck_linkea_del_buf(ldata, cname);
2883 } /* while (ldata->ld_lee != NULL) */
2885 linkea_first_entry(ldata);
2886 if (ldata->ld_leh->leh_reccount == 1) {
2887 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
2888 lh, type, NULL, unknown);
2893 /* All linkEA entries are invalid and removed, then handle the @child
2895 if (ldata->ld_leh->leh_reccount == 0) {
2896 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
2902 /* If the dangling name entry for the orphan directory object has
2903 * been remvoed, then just check whether the directory object is
2904 * still under the .lustre/lost+found/MDTxxxx/ or not. */
2910 /* There is no linkEA entry that matches the ".." name entry. Find
2911 * the first linkEA entry that both parent and name entry exist to
2912 * rebuild a new ".." name entry. */
2922 * Repair the object's nlink attribute.
2924 * If all the known name entries have been verified, then the object's hard
2925 * link attribute should match the object's linkEA entries count unless the
2926 * object's has too much hard link to be recorded in the linkEA. Such cases
2927 * should have been marked in the LFSCK trace file. Otherwise, trust the
2928 * linkEA to update the object's nlink attribute.
2930 * \param[in] env pointer to the thread context
2931 * \param[in] com pointer to the lfsck component
2932 * \param[in] obj pointer to the dt_object to be handled
2933 * \param[in,out] la pointer to buffer to object's attribute before
2934 * and after the repairing
2936 * \retval positive number for repaired cases
2937 * \retval 0 if nothing to be repaired
2938 * \retval negative error number on failure
2940 static int lfsck_namespace_repair_nlink(const struct lu_env *env,
2941 struct lfsck_component *com,
2942 struct dt_object *obj,
2945 struct lfsck_thread_info *info = lfsck_env_info(env);
2946 struct lu_fid *tfid = &info->lti_fid3;
2947 struct lfsck_namespace *ns = com->lc_file_ram;
2948 struct lfsck_instance *lfsck = com->lc_lfsck;
2949 struct dt_device *dev = lfsck_obj2dev(obj);
2950 const struct lu_fid *cfid = lfsck_dto2fid(obj);
2951 struct thandle *th = NULL;
2952 struct linkea_data ldata = { NULL };
2953 struct lustre_handle lh = { 0 };
2954 __u32 old = la->la_nlink;
2960 LASSERT(!dt_object_remote(obj));
2962 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
2963 MDS_INODELOCK_UPDATE, LCK_PW);
2967 th = dt_trans_create(env, dev);
2969 GOTO(log, rc = PTR_ERR(th));
2971 la->la_valid = LA_NLINK;
2972 rc = dt_declare_attr_set(env, obj, la, th);
2976 rc = dt_trans_start_local(env, dev, th);
2980 dt_write_lock(env, obj, 0);
2981 /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
2982 * ever tried to verify some remote MDT-object that resides on this
2983 * MDT, but this MDT failed to respond such request. So means there
2984 * may be some remote name entry on other MDT that references this
2985 * object with another name, so we cannot know whether this linkEA
2986 * is valid or not. So keep it there and maybe resolved when next
2988 if (ns->ln_flags & LF_INCOMPLETE)
2989 GOTO(unlock, rc = 0);
2991 fid_cpu_to_be(tfid, cfid);
2992 idx = lfsck_sub_trace_file_fid2idx(cfid);
2993 rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj,
2994 (struct dt_rec *)&flags, (const struct dt_key *)tfid);
2998 if (flags & LNTF_SKIP_NLINK)
2999 GOTO(unlock, rc = 0);
3001 rc = dt_attr_get(env, obj, la);
3003 GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc));
3005 rc = lfsck_links_read2(env, obj, &ldata);
3007 GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc));
3009 if (la->la_nlink == ldata.ld_leh->leh_reccount ||
3010 unlikely(la->la_nlink == 0))
3011 GOTO(unlock, rc = 0);
3013 la->la_nlink = ldata.ld_leh->leh_reccount;
3014 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
3015 GOTO(unlock, rc = 1);
3017 rc = dt_attr_set(env, obj, la, th);
3019 GOTO(unlock, rc = (rc == 0 ? 1 : rc));
3022 dt_write_unlock(env, obj);
3025 dt_trans_stop(env, dev, th);
3028 lfsck_ibits_unlock(&lh, LCK_PW);
3029 CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
3030 "nlink count from %u to %u: rc = %d\n",
3031 lfsck_lfsck2name(lfsck), PFID(cfid), old, la->la_nlink, rc);
3034 ns->ln_flags |= LF_INCONSISTENT;
3040 * Double scan the directory object for namespace LFSCK.
3042 * This function will verify the <parent, child> pairs in the namespace tree:
3043 * the parent references the child via some name entry that should be in the
3044 * child's linkEA entry, the child should back references the parent via its
3047 * The LFSCK will scan every linkEA entry in turn until find out the first
3048 * matched pairs. If found, then all other linkEA entries will be dropped.
3049 * If all the linkEA entries cannot match the ".." name entry, then there
3050 * are serveral possible cases:
3052 * 1) If there is only one linkEA entry, then trust it as long as the PFID
3053 * in the linkEA entry is valid.
3055 * 2) If there are multiple linkEA entries, then try to find the linkEA
3056 * that matches the ".." name entry. If found, then all other entries
3057 * are invalid; otherwise, it is quite possible that the ".." name entry
3058 * is corrupted. Under such case, the LFSCK will rebuild the ".." name
3059 * entry according to the first valid linkEA entry (both the parent and
3060 * the name entry should exist).
3062 * 3) If the directory object has no (valid) linkEA entry, then the
3063 * directory object will be handled as pure orphan and inserted
3064 * in the .lustre/lost+found/MDTxxxx/ with the name:
3065 * ${self_FID}-${PFID}-D-${conflict_version}
3067 * \param[in] env pointer to the thread context
3068 * \param[in] com pointer to the lfsck component
3069 * \param[in] child pointer to the directory object to be handled
3070 * \param[in] flags to indicate the specical checking on the @child
3072 * \retval positive number for repaired cases
3073 * \retval 0 if nothing to be repaired
3074 * \retval negative error number on failure
3076 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
3077 struct lfsck_component *com,
3078 struct dt_object *child, __u8 flags)
3080 struct lfsck_thread_info *info = lfsck_env_info(env);
3081 const struct lu_fid *cfid = lfsck_dto2fid(child);
3082 struct lu_fid *pfid = &info->lti_fid2;
3083 struct lfsck_namespace *ns = com->lc_file_ram;
3084 struct lfsck_instance *lfsck = com->lc_lfsck;
3085 struct lustre_handle lh = { 0 };
3086 struct linkea_data ldata = { NULL };
3087 bool unknown = false;
3090 enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
3094 LASSERT(!dt_object_remote(child));
3096 if (flags & LNTF_UNCERTAIN_LMV) {
3097 if (flags & LNTF_RECHECK_NAME_HASH) {
3098 rc = lfsck_namespace_scan_shard(env, com, child);
3102 ns->ln_striped_shards_scanned++;
3104 ns->ln_striped_shards_skipped++;
3108 flags &= ~(LNTF_RECHECK_NAME_HASH | LNTF_UNCERTAIN_LMV);
3112 if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) &&
3113 !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
3114 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
3115 "the namespace LFSCK, then the LFSCK cannot guarantee"
3116 "all the name entries have been verified in first-stage"
3117 "scanning. So have to skip orphan related handling for"
3118 "the directory object "DFID" with remote name entry\n",
3119 lfsck_lfsck2name(lfsck), PFID(cfid));
3124 if (unlikely(!dt_try_as_dir(env, child)))
3125 GOTO(out, rc = -ENOTDIR);
3127 /* We only take ldlm lock on the @child when required. When the
3128 * logic comes here for the first time, it is always false. */
3132 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
3133 MDS_INODELOCK_UPDATE |
3134 MDS_INODELOCK_XATTR, LCK_EX);
3139 dt_read_lock(env, child, 0);
3140 if (unlikely(lfsck_is_dead_obj(child))) {
3141 dt_read_unlock(env, child);
3146 rc = dt_lookup(env, child, (struct dt_rec *)pfid,
3147 (const struct dt_key *)dotdot);
3149 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
3150 dt_read_unlock(env, child);
3155 if (!lustre_handle_is_used(&lh)) {
3156 dt_read_unlock(env, child);
3161 } else if (lfsck->li_lpf_obj != NULL &&
3162 lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
3164 } else if (unlikely(!fid_is_sane(pfid))) {
3168 rc = lfsck_links_read(env, child, &ldata);
3169 dt_read_unlock(env, child);
3171 if (rc != -ENODATA && rc != -EINVAL)
3174 if (!lustre_handle_is_used(&lh))
3177 if (rc == -EINVAL && !fid_is_zero(pfid)) {
3178 /* Remove the corrupted linkEA. */
3179 rc = lfsck_namespace_links_remove(env, com, child);
3181 /* Here, because of the crashed linkEA, we
3182 * cannot know whether there is some parent
3183 * that references the child directory via
3184 * some name entry or not. So keep it there,
3185 * when the LFSCK run next time, if there is
3186 * some parent that references this object,
3187 * then the LFSCK can rebuild the linkEA;
3188 * otherwise, this object will be handled
3189 * as orphan as above. */
3192 /* 1. If we have neither ".." nor linkEA,
3193 * then it is an orphan.
3195 * 2. If we only have the ".." name entry,
3196 * but no parent references this child
3197 * directory, then handle it as orphan. */
3198 lfsck_ibits_unlock(&lh, LCK_EX);
3199 type = LNIT_MUL_REF;
3201 /* If the LFSCK is marked as LF_INCOMPLETE,
3202 * then means some MDT has ever tried to
3203 * verify some remote MDT-object that resides
3204 * on this MDT, but this MDT failed to respond
3205 * such request. So means there may be some
3206 * remote name entry on other MDT that
3207 * references this object with another name,
3208 * so we cannot know whether this linkEA is
3209 * valid or not. So keep it there and maybe
3210 * resolved when next LFSCK run. */
3211 if (ns->ln_flags & LF_INCOMPLETE)
3214 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
3215 "-"DFID, PFID(pfid));
3216 rc = lfsck_namespace_insert_orphan(env, com, child,
3217 info->lti_tmpbuf, "D", NULL);
3223 linkea_first_entry(&ldata);
3224 /* This is the most common case: the object has unique linkEA entry. */
3225 if (ldata.ld_leh->leh_reccount == 1) {
3226 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
3227 &lh, &type, &retry, &unknown);
3229 LASSERT(!lustre_handle_is_used(&lh));
3238 if (!lustre_handle_is_used(&lh))
3241 if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
3242 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
3248 /* When we come here, the cases usually like that:
3249 * 1) The directory object has a corrupted linkEA entry. During the
3250 * first-stage scanning, the LFSCK cannot know such corruption,
3251 * then it appends the right linkEA entry according to the found
3252 * name entry after the bad one.
3254 * 2) The directory object has a right linkEA entry. During the
3255 * first-stage scanning, the LFSCK finds some bad name entry,
3256 * but the LFSCK cannot aware that at that time, then it adds
3257 * the bad linkEA entry for further processing. */
3258 rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
3259 &lh, &type, lpf, &unknown);
3264 lfsck_ibits_unlock(&lh, LCK_EX);
3267 case LNIT_BAD_LINKEA:
3268 ns->ln_linkea_repaired++;
3270 case LNIT_UNMATCHED_PAIRS:
3271 ns->ln_unmatched_pairs_repaired++;
3274 ns->ln_mul_ref_repaired++;
3282 ns->ln_unknown_inconsistency++;
3288 * Double scan the MDT-object for namespace LFSCK.
3290 * If the MDT-object contains invalid or repeated linkEA entries, then drop
3291 * those entries from the linkEA; if the linkEA becomes empty or the object
3292 * has no linkEA, then it is an orphan and will be added into the directory
3293 * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
3294 * the remote parent; if the name entry corresponding to some linkEA entry
3295 * is lost, then add the name entry back to the namespace.
3297 * \param[in] env pointer to the thread context
3298 * \param[in] com pointer to the lfsck component
3299 * \param[in] child pointer to the dt_object to be handled
3300 * \param[in] flags some hints to indicate how the @child should be handled
3302 * \retval positive number for repaired cases
3303 * \retval 0 if nothing to be repaired
3304 * \retval negative error number on failure
3306 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
3307 struct lfsck_component *com,
3308 struct dt_object *child, __u8 flags)
3310 struct lfsck_thread_info *info = lfsck_env_info(env);
3311 struct lu_attr *la = &info->lti_la;
3312 struct lu_name *cname = &info->lti_name;
3313 struct lu_fid *pfid = &info->lti_fid;
3314 struct lu_fid *cfid = &info->lti_fid2;
3315 struct lfsck_instance *lfsck = com->lc_lfsck;
3316 struct lfsck_namespace *ns = com->lc_file_ram;
3317 struct dt_object *parent = NULL;
3318 struct linkea_data ldata = { NULL };
3319 bool repaired = false;
3324 dt_read_lock(env, child, 0);
3325 if (unlikely(lfsck_is_dead_obj(child))) {
3326 dt_read_unlock(env, child);
3331 if (S_ISDIR(lfsck_object_type(child))) {
3332 dt_read_unlock(env, child);
3333 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
3338 rc = lfsck_links_read(env, child, &ldata);
3339 dt_read_unlock(env, child);
3341 if (rc == -EINVAL) {
3342 struct lustre_handle lh = { 0 };
3344 rc = lfsck_ibits_lock(env, com->lc_lfsck, child, &lh,
3345 MDS_INODELOCK_UPDATE |
3346 MDS_INODELOCK_XATTR, LCK_EX);
3348 rc = lfsck_namespace_links_remove(env, com, child);
3349 lfsck_ibits_unlock(&lh, LCK_EX);
3352 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
3358 linkea_first_entry(&ldata);
3359 while (ldata.ld_lee != NULL) {
3360 rc = lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
3362 sizeof(info->lti_key));
3363 /* Invalid PFID in the linkEA entry. */
3365 rc = lfsck_namespace_shrink_linkea(env, com, child,
3366 &ldata, cname, pfid, true);
3376 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
3378 /* Found repeated linkEA entries */
3380 rc = lfsck_namespace_shrink_linkea(env, com, child,
3381 &ldata, cname, pfid, false);
3393 parent = lfsck_object_find_bottom(env, lfsck, pfid);
3395 GOTO(out, rc = PTR_ERR(parent));
3397 if (!dt_object_exists(parent)) {
3400 if (ldata.ld_leh->leh_reccount > 1) {
3401 /* If it is NOT the last linkEA entry, then
3402 * there is still other chance to make the
3403 * child to be visible via other parent, then
3404 * remove this linkEA entry. */
3405 rc = lfsck_namespace_shrink_linkea(env, com,
3406 child, &ldata, cname, pfid, true);
3408 /* If the LFSCK is marked as LF_INCOMPLETE,
3409 * then means some MDT has ever tried to
3410 * verify some remote MDT-object that resides
3411 * on this MDT, but this MDT failed to respond
3412 * such request. So means there may be some
3413 * remote name entry on other MDT that
3414 * references this object with another name,
3415 * so we cannot know whether this linkEA is
3416 * valid or not. So keep it there and maybe
3417 * resolved when next LFSCK run. */
3418 if (ns->ln_flags & LF_INCOMPLETE) {
3419 lfsck_object_put(env, parent);
3424 /* Create the lost parent as an orphan. */
3425 rc = lfsck_namespace_create_orphan_dir(env, com,
3428 lfsck_object_put(env, parent);
3436 /* Add the missing name entry to the parent. */
3437 rc = lfsck_namespace_insert_normal(env, com,
3438 parent, child, cname->ln_name);
3439 if (unlikely(rc == -EEXIST))
3440 /* Unfortunately, someone reused the
3441 * name under the parent by race. So we
3442 * have to remove the linkEA entry from
3443 * current child object. It means that
3444 * the LFSCK cannot recover the system
3445 * totally back to its original status,
3446 * but it is necessary to make the
3447 * current system to be consistent. */
3448 rc = lfsck_namespace_shrink_linkea(env,
3452 linkea_next_entry(&ldata);
3455 lfsck_object_put(env, parent);
3463 } /* !dt_object_exists(parent) */
3465 /* The linkEA entry with bad parent will be removed. */
3466 if (unlikely(!dt_try_as_dir(env, parent))) {
3467 lfsck_object_put(env, parent);
3468 rc = lfsck_namespace_shrink_linkea(env, com, child,
3469 &ldata, cname, pfid, true);
3479 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
3480 (const struct dt_key *)cname->ln_name);
3481 if (rc != 0 && rc != -ENOENT) {
3482 lfsck_object_put(env, parent);
3488 if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
3489 /* It is the most common case that we
3490 * find the name entry corresponding
3491 * to the linkEA entry. */
3492 lfsck_object_put(env, parent);
3493 linkea_next_entry(&ldata);
3495 /* The name entry references another
3496 * MDT-object that may be created by
3497 * the LFSCK for repairing dangling
3498 * name entry. Try to replace it. */
3499 rc = lfsck_namespace_replace_cond(env, com,
3500 parent, child, cfid, cname);
3501 lfsck_object_put(env, parent);
3507 linkea_next_entry(&ldata);
3509 rc = lfsck_namespace_shrink_linkea(env,
3523 /* The following handles -ENOENT case */
3525 rc = dt_attr_get(env, child, la);
3529 /* If there is no name entry in the parent dir and the object
3530 * link count is less than the linkea entries count, then the
3531 * linkea entry should be removed. */
3532 if (ldata.ld_leh->leh_reccount > la->la_nlink) {
3533 rc = lfsck_namespace_shrink_linkea_cond(env, com,
3534 parent, child, &ldata, cname, pfid);
3535 lfsck_object_put(env, parent);
3545 /* If the LFSCK is marked as LF_INCOMPLETE, then means some
3546 * MDT has ever tried to verify some remote MDT-object that
3547 * resides on this MDT, but this MDT failed to respond such
3548 * request. So means there may be some remote name entry on
3549 * other MDT that references this object with another name,
3550 * so we cannot know whether this linkEA is valid or not.
3551 * So keep it there and maybe resolved when next LFSCK run. */
3552 if (ns->ln_flags & LF_INCOMPLETE) {
3553 lfsck_object_put(env, parent);
3558 rc = lfsck_namespace_check_name(env, parent, child, cname);
3563 lfsck_object_put(env, parent);
3568 /* It is an invalid name entry, drop it. */
3569 if (unlikely(rc > 0)) {
3570 lfsck_object_put(env, parent);
3571 rc = lfsck_namespace_shrink_linkea(env, com, child,
3572 &ldata, cname, pfid, true);
3582 /* Add the missing name entry back to the namespace. */
3583 rc = lfsck_namespace_insert_normal(env, com, parent, child,
3585 if (unlikely(rc == -ESTALE))
3586 /* It may happen when the remote object has been
3587 * removed, but the local MDT is not aware of that. */
3590 if (unlikely(rc == -EEXIST))
3591 /* Unfortunately, someone reused the name under the
3592 * parent by race. So we have to remove the linkEA
3593 * entry from current child object. It means that the
3594 * LFSCK cannot recover the system totally back to
3595 * its original status, but it is necessary to make
3596 * the current system to be consistent.
3598 * It also may be because of the LFSCK found some
3599 * internal status of create operation. Under such
3600 * case, nothing to be done. */
3601 rc = lfsck_namespace_shrink_linkea_cond(env, com,
3602 parent, child, &ldata, cname, pfid);
3604 linkea_next_entry(&ldata);
3606 lfsck_object_put(env, parent);
3617 if (rc < 0 && rc != -ENODATA)
3620 if (rc == 0 && ldata.ld_leh != NULL)
3621 count = ldata.ld_leh->leh_reccount;
3624 /* If the LFSCK is marked as LF_INCOMPLETE, then means some
3625 * MDT has ever tried to verify some remote MDT-object that
3626 * resides on this MDT, but this MDT failed to respond such
3627 * request. So means there may be some remote name entry on
3628 * other MDT that references this object with another name,
3629 * so we cannot know whether this linkEA is valid or not.
3630 * So keep it there and maybe resolved when next LFSCK run. */
3631 if (!(ns->ln_flags & LF_INCOMPLETE)) {
3632 /* If the child becomes orphan, then insert it into
3633 * the global .lustre/lost+found/MDTxxxx directory. */
3634 rc = lfsck_namespace_insert_orphan(env, com, child,
3640 ns->ln_mul_ref_repaired++;
3645 rc = dt_attr_get(env, child, la);
3649 if (la->la_nlink != 0 && la->la_nlink != count) {
3650 if (unlikely(!S_ISREG(lfsck_object_type(child)) &&
3651 !S_ISLNK(lfsck_object_type(child)))) {
3652 CDEBUG(D_LFSCK, "%s: namespace LFSCK finds "
3653 "the object "DFID"'s nlink count %d "
3654 "does not match linkEA count %d, "
3655 "type %o, skip it.\n",
3656 lfsck_lfsck2name(lfsck),
3657 PFID(lfsck_dto2fid(child)),
3658 la->la_nlink, count,
3659 lfsck_object_type(child));
3661 rc = lfsck_namespace_repair_nlink(env, com,
3664 ns->ln_objs_nlink_repaired++;
3672 if (la->la_nlink > 1)
3673 ns->ln_mul_linked_repaired++;
3682 static void lfsck_namespace_dump_statistics(struct seq_file *m,
3683 struct lfsck_namespace *ns,
3684 __u64 checked_phase1,
3685 __u64 checked_phase2,
3689 seq_printf(m, "checked_phase1: %llu\n"
3690 "checked_phase2: %llu\n"
3691 "updated_phase1: %llu\n"
3692 "updated_phase2: %llu\n"
3693 "failed_phase1: %llu\n"
3694 "failed_phase2: %llu\n"
3695 "directories: %llu\n"
3696 "dirent_repaired: %llu\n"
3697 "linkea_repaired: %llu\n"
3698 "nlinks_repaired: %llu\n"
3699 "multiple_linked_checked: %llu\n"
3700 "multiple_linked_repaired: %llu\n"
3701 "unknown_inconsistency: %llu\n"
3702 "unmatched_pairs_repaired: %llu\n"
3703 "dangling_repaired: %llu\n"
3704 "multiple_referenced_repaired: %llu\n"
3705 "bad_file_type_repaired: %llu\n"
3706 "lost_dirent_repaired: %llu\n"
3707 "local_lost_found_scanned: %llu\n"
3708 "local_lost_found_moved: %llu\n"
3709 "local_lost_found_skipped: %llu\n"
3710 "local_lost_found_failed: %llu\n"
3711 "striped_dirs_scanned: %llu\n"
3712 "striped_dirs_repaired: %llu\n"
3713 "striped_dirs_failed: %llu\n"
3714 "striped_dirs_disabled: %llu\n"
3715 "striped_dirs_skipped: %llu\n"
3716 "striped_shards_scanned: %llu\n"
3717 "striped_shards_repaired: %llu\n"
3718 "striped_shards_failed: %llu\n"
3719 "striped_shards_skipped: %llu\n"
3720 "name_hash_repaired: %llu\n"
3721 "success_count: %u\n"
3722 "run_time_phase1: %u seconds\n"
3723 "run_time_phase2: %u seconds\n",
3726 ns->ln_items_repaired,
3727 ns->ln_objs_repaired_phase2,
3728 ns->ln_items_failed,
3729 ns->ln_objs_failed_phase2,
3730 ns->ln_dirs_checked,
3731 ns->ln_dirent_repaired,
3732 ns->ln_linkea_repaired,
3733 ns->ln_objs_nlink_repaired,
3734 ns->ln_mul_linked_checked,
3735 ns->ln_mul_linked_repaired,
3736 ns->ln_unknown_inconsistency,
3737 ns->ln_unmatched_pairs_repaired,
3738 ns->ln_dangling_repaired,
3739 ns->ln_mul_ref_repaired,
3740 ns->ln_bad_type_repaired,
3741 ns->ln_lost_dirent_repaired,
3742 ns->ln_local_lpf_scanned,
3743 ns->ln_local_lpf_moved,
3744 ns->ln_local_lpf_skipped,
3745 ns->ln_local_lpf_failed,
3746 ns->ln_striped_dirs_scanned,
3747 ns->ln_striped_dirs_repaired,
3748 ns->ln_striped_dirs_failed,
3749 ns->ln_striped_dirs_disabled,
3750 ns->ln_striped_dirs_skipped,
3751 ns->ln_striped_shards_scanned,
3752 ns->ln_striped_shards_repaired,
3753 ns->ln_striped_shards_failed,
3754 ns->ln_striped_shards_skipped,
3755 ns->ln_name_hash_repaired,
3756 ns->ln_success_count,
3761 static void lfsck_namespace_release_lmv(const struct lu_env *env,
3762 struct lfsck_component *com)
3764 struct lfsck_instance *lfsck = com->lc_lfsck;
3765 struct lfsck_namespace *ns = com->lc_file_ram;
3767 while (!list_empty(&lfsck->li_list_lmv)) {
3768 struct lfsck_lmv_unit *llu;
3769 struct lfsck_lmv *llmv;
3771 llu = list_entry(lfsck->li_list_lmv.next,
3772 struct lfsck_lmv_unit, llu_link);
3773 llmv = &llu->llu_lmv;
3775 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
3776 "still in using: %u\n",
3777 atomic_read(&llmv->ll_ref));
3779 ns->ln_striped_dirs_skipped++;
3780 lfsck_lmv_put(env, llmv);
3784 static int lfsck_namespace_check_for_double_scan(const struct lu_env *env,
3785 struct lfsck_component *com,
3786 struct dt_object *obj)
3788 struct lu_attr *la = &lfsck_env_info(env)->lti_la;
3791 rc = dt_attr_get(env, obj, la);
3795 /* zero-linkEA object may be orphan, but it also maybe because
3796 * of upgrading. Currently, we cannot record it for double scan.
3797 * Because it may cause the LFSCK trace file to be too large. */
3799 /* "la_ctime" == 1 means that it has ever been removed from
3800 * backend /lost+found directory but not been added back to
3801 * the normal namespace yet. */
3803 if ((S_ISREG(lfsck_object_type(obj)) && la->la_nlink > 1) ||
3804 unlikely(la->la_ctime == 1))
3805 rc = lfsck_namespace_trace_update(env, com, lfsck_dto2fid(obj),
3806 LNTF_CHECK_LINKEA, true);
3811 /* namespace APIs */
3813 static int lfsck_namespace_reset(const struct lu_env *env,
3814 struct lfsck_component *com, bool init)
3816 struct lfsck_instance *lfsck = com->lc_lfsck;
3817 struct lfsck_namespace *ns = com->lc_file_ram;
3818 struct lfsck_assistant_data *lad = com->lc_data;
3819 struct dt_object *root;
3820 struct dt_object *dto;
3824 root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
3826 GOTO(log, rc = PTR_ERR(root));
3828 if (unlikely(!dt_try_as_dir(env, root)))
3829 GOTO(put, rc = -ENOTDIR);
3831 down_write(&com->lc_sem);
3833 memset(ns, 0, sizeof(*ns));
3835 __u32 count = ns->ln_success_count;
3836 __u64 last_time = ns->ln_time_last_complete;
3838 memset(ns, 0, sizeof(*ns));
3839 ns->ln_success_count = count;
3840 ns->ln_time_last_complete = last_time;
3842 ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
3843 ns->ln_status = LS_INIT;
3845 lfsck_object_put(env, com->lc_obj);
3847 dto = lfsck_namespace_load_one_trace_file(env, com, root,
3848 LFSCK_NAMESPACE, true);
3850 GOTO(out, rc = PTR_ERR(dto));
3853 rc = lfsck_namespace_load_sub_trace_files(env, com, true);
3857 lad->lad_incomplete = 0;
3858 CFS_RESET_BITMAP(lad->lad_bitmap);
3860 rc = lfsck_namespace_store(env, com);
3865 up_write(&com->lc_sem);
3868 lfsck_object_put(env, root);
3870 CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
3871 lfsck_lfsck2name(lfsck), rc);
3876 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
3879 struct lfsck_namespace *ns = com->lc_file_ram;
3881 down_write(&com->lc_sem);
3883 com->lc_new_checked++;
3884 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3885 up_write(&com->lc_sem);
3888 static void lfsck_namespace_close_dir(const struct lu_env *env,
3889 struct lfsck_component *com)
3891 struct lfsck_namespace *ns = com->lc_file_ram;
3892 struct lfsck_assistant_data *lad = com->lc_data;
3893 struct lfsck_assistant_object *lso = NULL;
3894 struct lfsck_instance *lfsck = com->lc_lfsck;
3895 struct lfsck_lmv *llmv = lfsck->li_lmv;
3896 struct lfsck_namespace_req *lnr;
3898 sizeof(*lnr) + LFSCK_TMPBUF_LEN;
3899 bool wakeup = false;
3905 OBD_ALLOC(lnr, size);
3907 ns->ln_striped_dirs_skipped++;
3912 lso = lfsck_assistant_object_init(env, lfsck_dto2fid(lfsck->li_obj_dir),
3913 NULL, lfsck->li_pos_current.lp_oit_cookie, true);
3915 OBD_FREE(lnr, size);
3916 ns->ln_striped_dirs_skipped++;
3921 /* Generate a dummy request to indicate that all shards' name entry
3922 * in this striped directory has been scanned for the first time. */
3923 INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
3924 lnr->lnr_lar.lar_parent = lso;
3925 lnr->lnr_lmv = lfsck_lmv_get(llmv);
3926 lnr->lnr_fid = *lfsck_dto2fid(lfsck->li_obj_dir);
3927 lnr->lnr_dir_cookie = MDS_DIR_END_OFF;
3928 lnr->lnr_size = size;
3930 spin_lock(&lad->lad_lock);
3931 if (lad->lad_assistant_status < 0) {
3932 spin_unlock(&lad->lad_lock);
3933 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
3934 ns->ln_striped_dirs_skipped++;
3939 list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
3940 if (lad->lad_prefetched == 0)
3943 lad->lad_prefetched++;
3944 spin_unlock(&lad->lad_lock);
3946 wake_up_all(&lad->lad_thread.t_ctl_waitq);
3951 static int lfsck_namespace_open_dir(const struct lu_env *env,
3952 struct lfsck_component *com)
3954 struct lfsck_instance *lfsck = com->lc_lfsck;
3955 struct lfsck_namespace *ns = com->lc_file_ram;
3956 struct lfsck_lmv *llmv = lfsck->li_lmv;
3963 if (llmv->ll_lmv_master) {
3964 struct lmv_mds_md_v1 *lmv = &llmv->ll_lmv;
3966 if (lmv->lmv_master_mdt_index != lfsck_dev_idx(lfsck)) {
3967 lmv->lmv_master_mdt_index =
3968 lfsck_dev_idx(lfsck);
3969 ns->ln_flags |= LF_INCONSISTENT;
3970 llmv->ll_lmv_updated = 1;
3973 rc = lfsck_namespace_verify_stripe_slave(env, com,
3974 lfsck->li_obj_dir, llmv);
3977 RETURN(rc > 0 ? 0 : rc);
3980 static int lfsck_namespace_checkpoint(const struct lu_env *env,
3981 struct lfsck_component *com, bool init)
3983 struct lfsck_instance *lfsck = com->lc_lfsck;
3984 struct lfsck_namespace *ns = com->lc_file_ram;
3988 rc = lfsck_checkpoint_generic(env, com);
3993 down_write(&com->lc_sem);
3995 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
3997 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
3998 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3999 HALF_SEC - lfsck->li_time_last_checkpoint);
4000 ns->ln_time_last_checkpoint = cfs_time_current_sec();
4001 ns->ln_items_checked += com->lc_new_checked;
4002 com->lc_new_checked = 0;
4005 rc = lfsck_namespace_store(env, com);
4006 up_write(&com->lc_sem);
4009 CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos [%llu"
4010 ", "DFID", %#llx], status = %d: rc = %d\n",
4011 lfsck_lfsck2name(lfsck), lfsck->li_pos_current.lp_oit_cookie,
4012 PFID(&lfsck->li_pos_current.lp_dir_parent),
4013 lfsck->li_pos_current.lp_dir_cookie, ns->ln_status, rc);
4015 return rc > 0 ? 0 : rc;
4018 static int lfsck_namespace_prep(const struct lu_env *env,
4019 struct lfsck_component *com,
4020 struct lfsck_start_param *lsp)
4022 struct lfsck_instance *lfsck = com->lc_lfsck;
4023 struct lfsck_namespace *ns = com->lc_file_ram;
4024 struct lfsck_position *pos = &com->lc_pos_start;
4027 rc = lfsck_namespace_load_bitmap(env, com);
4028 if (rc != 0 || ns->ln_status == LS_COMPLETED) {
4029 rc = lfsck_namespace_reset(env, com, false);
4031 rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
4034 CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
4035 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4041 down_write(&com->lc_sem);
4042 ns->ln_time_latest_start = cfs_time_current_sec();
4043 spin_lock(&lfsck->li_lock);
4045 if (ns->ln_flags & LF_SCANNED_ONCE) {
4046 if (!lfsck->li_drop_dryrun ||
4047 lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
4048 ns->ln_status = LS_SCANNING_PHASE2;
4049 list_move_tail(&com->lc_link,
4050 &lfsck->li_list_double_scan);
4051 if (!list_empty(&com->lc_link_dir))
4052 list_del_init(&com->lc_link_dir);
4053 lfsck_pos_set_zero(pos);
4055 ns->ln_status = LS_SCANNING_PHASE1;
4056 ns->ln_run_time_phase1 = 0;
4057 ns->ln_run_time_phase2 = 0;
4058 ns->ln_items_checked = 0;
4059 ns->ln_items_repaired = 0;
4060 ns->ln_items_failed = 0;
4061 ns->ln_dirs_checked = 0;
4062 ns->ln_objs_checked_phase2 = 0;
4063 ns->ln_objs_repaired_phase2 = 0;
4064 ns->ln_objs_failed_phase2 = 0;
4065 ns->ln_objs_nlink_repaired = 0;
4066 ns->ln_dirent_repaired = 0;
4067 ns->ln_linkea_repaired = 0;
4068 ns->ln_mul_linked_checked = 0;
4069 ns->ln_mul_linked_repaired = 0;
4070 ns->ln_unknown_inconsistency = 0;
4071 ns->ln_unmatched_pairs_repaired = 0;
4072 ns->ln_dangling_repaired = 0;
4073 ns->ln_mul_ref_repaired = 0;
4074 ns->ln_bad_type_repaired = 0;
4075 ns->ln_lost_dirent_repaired = 0;
4076 ns->ln_striped_dirs_scanned = 0;
4077 ns->ln_striped_dirs_repaired = 0;
4078 ns->ln_striped_dirs_failed = 0;
4079 ns->ln_striped_dirs_disabled = 0;
4080 ns->ln_striped_dirs_skipped = 0;
4081 ns->ln_striped_shards_scanned = 0;
4082 ns->ln_striped_shards_repaired = 0;
4083 ns->ln_striped_shards_failed = 0;
4084 ns->ln_striped_shards_skipped = 0;
4085 ns->ln_name_hash_repaired = 0;
4086 fid_zero(&ns->ln_fid_latest_scanned_phase2);
4087 if (list_empty(&com->lc_link_dir))
4088 list_add_tail(&com->lc_link_dir,
4089 &lfsck->li_list_dir);
4090 *pos = ns->ln_pos_first_inconsistent;
4093 ns->ln_status = LS_SCANNING_PHASE1;
4094 if (list_empty(&com->lc_link_dir))
4095 list_add_tail(&com->lc_link_dir,
4096 &lfsck->li_list_dir);
4097 if (!lfsck->li_drop_dryrun ||
4098 lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
4099 *pos = ns->ln_pos_last_checkpoint;
4100 pos->lp_oit_cookie++;
4102 *pos = ns->ln_pos_first_inconsistent;
4106 spin_unlock(&lfsck->li_lock);
4107 up_write(&com->lc_sem);
4109 rc = lfsck_start_assistant(env, com, lsp);
4111 CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos [%llu, "
4112 DFID", %#llx]: rc = %d\n",
4113 lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
4114 PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
4119 static int lfsck_namespace_exec_oit(const struct lu_env *env,
4120 struct lfsck_component *com,
4121 struct dt_object *obj)
4123 struct lfsck_thread_info *info = lfsck_env_info(env);
4124 struct lfsck_namespace *ns = com->lc_file_ram;
4125 struct lfsck_instance *lfsck = com->lc_lfsck;
4126 const struct lu_fid *fid = lfsck_dto2fid(obj);
4127 struct lu_fid *pfid = &info->lti_fid2;
4128 struct lu_name *cname = &info->lti_name;
4129 struct lu_seq_range *range = &info->lti_range;
4130 struct seq_server_site *ss = lfsck_dev_site(lfsck);
4131 struct linkea_data ldata = { NULL };
4132 __u32 idx = lfsck_dev_idx(lfsck);
4136 rc = lfsck_links_read(env, obj, &ldata);
4140 /* -EINVAL means crashed linkEA, should be verified. */
4141 if (rc == -EINVAL) {
4142 rc = lfsck_namespace_trace_update(env, com, fid,
4143 LNTF_CHECK_LINKEA, true);
4145 struct lustre_handle lh = { 0 };
4147 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
4148 MDS_INODELOCK_UPDATE |
4149 MDS_INODELOCK_XATTR, LCK_EX);
4151 rc = lfsck_namespace_links_remove(env, com,
4153 lfsck_ibits_unlock(&lh, LCK_EX);
4157 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
4160 if (rc == -ENODATA) {
4161 rc = lfsck_namespace_check_for_double_scan(env, com, obj);
4169 /* Record multiple-linked object. */
4170 if (ldata.ld_leh->leh_reccount > 1) {
4171 rc = lfsck_namespace_trace_update(env, com, fid,
4172 LNTF_CHECK_LINKEA, true);
4177 linkea_first_entry(&ldata);
4178 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
4179 if (!fid_is_sane(pfid)) {
4180 rc = lfsck_namespace_trace_update(env, com, fid,
4181 LNTF_CHECK_PARENT, true);
4183 fld_range_set_mdt(range);
4184 rc = fld_local_lookup(env, ss->ss_server_fld,
4185 fid_seq(pfid), range);
4186 if ((rc == -ENOENT) ||
4187 (rc == 0 && range->lsr_index != idx))
4188 rc = lfsck_namespace_trace_update(env, com, fid,
4189 LNTF_CHECK_LINKEA, true);
4191 rc = lfsck_namespace_check_for_double_scan(env, com,
4198 down_write(&com->lc_sem);
4199 if (S_ISDIR(lfsck_object_type(obj)))
4200 ns->ln_dirs_checked++;
4202 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
4203 up_write(&com->lc_sem);
4208 static int lfsck_namespace_exec_dir(const struct lu_env *env,
4209 struct lfsck_component *com,
4210 struct lfsck_assistant_object *lso,
4211 struct lu_dirent *ent, __u16 type)
4213 struct lfsck_assistant_data *lad = com->lc_data;
4214 struct lfsck_instance *lfsck = com->lc_lfsck;
4215 struct lfsck_namespace_req *lnr;
4216 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4217 struct ptlrpc_thread *mthread = &lfsck->li_thread;
4218 struct ptlrpc_thread *athread = &lad->lad_thread;
4219 struct l_wait_info lwi = { 0 };
4220 bool wakeup = false;
4222 l_wait_event(mthread->t_ctl_waitq,
4223 lad->lad_prefetched < bk->lb_async_windows ||
4224 !thread_is_running(mthread) ||
4225 thread_is_stopped(athread),
4228 if (unlikely(!thread_is_running(mthread)) ||
4229 thread_is_stopped(athread))
4232 if (unlikely(lfsck_is_dead_obj(lfsck->li_obj_dir)))
4235 lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, lso, ent, type);
4237 struct lfsck_namespace *ns = com->lc_file_ram;
4239 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
4240 return PTR_ERR(lnr);
4243 spin_lock(&lad->lad_lock);
4244 if (lad->lad_assistant_status < 0) {
4245 spin_unlock(&lad->lad_lock);
4246 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
4247 return lad->lad_assistant_status;
4250 list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
4251 if (lad->lad_prefetched == 0)
4254 lad->lad_prefetched++;
4255 spin_unlock(&lad->lad_lock);
4257 wake_up_all(&lad->lad_thread.t_ctl_waitq);
4259 down_write(&com->lc_sem);
4260 com->lc_new_checked++;
4261 up_write(&com->lc_sem);
4266 static int lfsck_namespace_post(const struct lu_env *env,
4267 struct lfsck_component *com,
4268 int result, bool init)
4270 struct lfsck_instance *lfsck = com->lc_lfsck;
4271 struct lfsck_namespace *ns = com->lc_file_ram;
4275 lfsck_post_generic(env, com, &result);
4277 down_write(&com->lc_sem);
4278 lfsck_namespace_release_lmv(env, com);
4280 spin_lock(&lfsck->li_lock);
4282 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
4284 ns->ln_status = LS_SCANNING_PHASE2;
4285 ns->ln_flags |= LF_SCANNED_ONCE;
4286 ns->ln_flags &= ~LF_UPGRADE;
4287 list_del_init(&com->lc_link_dir);
4288 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
4289 } else if (result == 0) {
4290 if (lfsck->li_status != 0)
4291 ns->ln_status = lfsck->li_status;
4293 ns->ln_status = LS_STOPPED;
4294 if (ns->ln_status != LS_PAUSED) {
4295 list_del_init(&com->lc_link_dir);
4296 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4299 ns->ln_status = LS_FAILED;
4300 list_del_init(&com->lc_link_dir);
4301 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4303 spin_unlock(&lfsck->li_lock);
4306 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4307 HALF_SEC - lfsck->li_time_last_checkpoint);
4308 ns->ln_time_last_checkpoint = cfs_time_current_sec();
4309 ns->ln_items_checked += com->lc_new_checked;
4310 com->lc_new_checked = 0;
4313 rc = lfsck_namespace_store(env, com);
4314 up_write(&com->lc_sem);
4316 CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
4317 lfsck_lfsck2name(lfsck), rc);
4323 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
4326 struct lfsck_instance *lfsck = com->lc_lfsck;
4327 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4328 struct lfsck_namespace *ns = com->lc_file_ram;
4330 down_read(&com->lc_sem);
4331 seq_printf(m, "name: lfsck_namespace\n"
4337 lfsck_status2name(ns->ln_status));
4339 lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
4341 lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
4343 lfsck_time_dump(m, ns->ln_time_last_complete, "last_completed");
4345 lfsck_time_dump(m, ns->ln_time_latest_start, "latest_start");
4347 lfsck_time_dump(m, ns->ln_time_last_checkpoint, "last_checkpoint");
4349 lfsck_pos_dump(m, &ns->ln_pos_latest_start, "latest_start_position");
4351 lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
4352 "last_checkpoint_position");
4354 lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
4355 "first_failure_position");
4357 if (ns->ln_status == LS_SCANNING_PHASE1) {
4358 struct lfsck_position pos;
4359 const struct dt_it_ops *iops;
4360 cfs_duration_t duration = cfs_time_current() -
4361 lfsck->li_time_last_checkpoint;
4362 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
4363 __u64 speed = checked;
4364 __u64 new_checked = com->lc_new_checked *
4365 msecs_to_jiffies(MSEC_PER_SEC);
4366 __u32 rtime = ns->ln_run_time_phase1 +
4367 cfs_duration_sec(duration + HALF_SEC);
4370 do_div(new_checked, duration);
4373 do_div(speed, rtime);
4375 lfsck_namespace_dump_statistics(m, ns, checked, 0, rtime, 0);
4376 seq_printf(m, "average_speed_phase1: %llu items/sec\n"
4377 "average_speed_phase2: N/A\n"
4378 "average_speed_total: %llu items/sec\n"
4379 "real_time_speed_phase1: %llu items/sec\n"
4380 "real_time_speed_phase2: N/A\n",
4385 LASSERT(lfsck->li_di_oit != NULL);
4387 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
4389 /* The low layer otable-based iteration position may NOT
4390 * exactly match the namespace-based directory traversal
4391 * cookie. Generally, it is not a serious issue. But the
4392 * caller should NOT make assumption on that. */
4393 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
4394 if (!lfsck->li_current_oit_processed)
4395 pos.lp_oit_cookie--;
4397 spin_lock(&lfsck->li_lock);
4398 if (lfsck->li_di_dir != NULL) {
4399 pos.lp_dir_cookie = lfsck->li_cookie_dir;
4400 if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
4401 fid_zero(&pos.lp_dir_parent);
4402 pos.lp_dir_cookie = 0;
4405 *lfsck_dto2fid(lfsck->li_obj_dir);
4408 fid_zero(&pos.lp_dir_parent);
4409 pos.lp_dir_cookie = 0;
4411 spin_unlock(&lfsck->li_lock);
4412 lfsck_pos_dump(m, &pos, "current_position");
4413 } else if (ns->ln_status == LS_SCANNING_PHASE2) {
4414 cfs_duration_t duration = cfs_time_current() -
4415 com->lc_time_last_checkpoint;
4416 __u64 checked = ns->ln_objs_checked_phase2 +
4417 com->lc_new_checked;
4418 __u64 speed1 = ns->ln_items_checked;
4419 __u64 speed2 = checked;
4420 __u64 speed0 = speed1 + speed2;
4421 __u64 new_checked = com->lc_new_checked *
4422 msecs_to_jiffies(MSEC_PER_SEC);
4423 __u32 rtime = ns->ln_run_time_phase2 +
4424 cfs_duration_sec(duration + HALF_SEC);
4425 __u32 time0 = ns->ln_run_time_phase1 + rtime;
4428 do_div(new_checked, duration);
4430 if (ns->ln_run_time_phase1 != 0)
4431 do_div(speed1, ns->ln_run_time_phase1);
4432 else if (ns->ln_items_checked != 0)
4436 do_div(speed2, rtime);
4437 else if (checked != 0)
4441 do_div(speed0, time0);
4443 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
4445 ns->ln_run_time_phase1, rtime);
4446 seq_printf(m, "average_speed_phase1: %llu items/sec\n"
4447 "average_speed_phase2: %llu objs/sec\n"
4448 "average_speed_total: %llu items/sec\n"
4449 "real_time_speed_phase1: N/A\n"
4450 "real_time_speed_phase2: %llu objs/sec\n"
4451 "current_position: "DFID"\n",
4456 PFID(&ns->ln_fid_latest_scanned_phase2));
4458 __u64 speed1 = ns->ln_items_checked;
4459 __u64 speed2 = ns->ln_objs_checked_phase2;
4460 __u64 speed0 = speed1 + speed2;
4461 __u32 time0 = ns->ln_run_time_phase1 + ns->ln_run_time_phase2;
4463 if (ns->ln_run_time_phase1 != 0)
4464 do_div(speed1, ns->ln_run_time_phase1);
4465 else if (ns->ln_items_checked != 0)
4468 if (ns->ln_run_time_phase2 != 0)
4469 do_div(speed2, ns->ln_run_time_phase2);
4470 else if (ns->ln_objs_checked_phase2 != 0)
4474 do_div(speed0, time0);
4476 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
4477 ns->ln_objs_checked_phase2,
4478 ns->ln_run_time_phase1,
4479 ns->ln_run_time_phase2);
4480 seq_printf(m, "average_speed_phase1: %llu items/sec\n"
4481 "average_speed_phase2: %llu objs/sec\n"
4482 "average_speed_total: %llu items/sec\n"
4483 "real_time_speed_phase1: N/A\n"
4484 "real_time_speed_phase2: N/A\n"
4485 "current_position: N/A\n",
4491 up_read(&com->lc_sem);
4494 static int lfsck_namespace_double_scan(const struct lu_env *env,
4495 struct lfsck_component *com)
4497 struct lfsck_namespace *ns = com->lc_file_ram;
4498 struct lfsck_assistant_data *lad = com->lc_data;
4499 struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
4500 struct lfsck_tgt_desc *ltd;
4501 struct lfsck_tgt_desc *next;
4504 rc = lfsck_double_scan_generic(env, com, ns->ln_status);
4505 if (thread_is_stopped(&lad->lad_thread)) {
4506 LASSERT(list_empty(&lad->lad_req_list));
4507 LASSERT(list_empty(&lad->lad_mdt_phase1_list));
4509 spin_lock(<ds->ltd_lock);
4510 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4511 ltd_namespace_phase_list) {
4512 list_del_init(<d->ltd_namespace_phase_list);
4514 spin_unlock(<ds->ltd_lock);
4520 static void lfsck_namespace_data_release(const struct lu_env *env,
4521 struct lfsck_component *com)
4523 struct lfsck_assistant_data *lad = com->lc_data;
4524 struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
4525 struct lfsck_tgt_desc *ltd;
4526 struct lfsck_tgt_desc *next;
4528 LASSERT(lad != NULL);
4529 LASSERT(thread_is_init(&lad->lad_thread) ||
4530 thread_is_stopped(&lad->lad_thread));
4531 LASSERT(list_empty(&lad->lad_req_list));
4533 com->lc_data = NULL;
4534 lfsck_namespace_release_lmv(env, com);
4536 spin_lock(<ds->ltd_lock);
4537 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
4538 ltd_namespace_phase_list) {
4539 list_del_init(<d->ltd_namespace_phase_list);
4541 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4542 ltd_namespace_phase_list) {
4543 list_del_init(<d->ltd_namespace_phase_list);
4545 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
4546 ltd_namespace_list) {
4547 list_del_init(<d->ltd_namespace_list);
4549 spin_unlock(<ds->ltd_lock);
4551 if (likely(lad->lad_bitmap != NULL))
4552 CFS_FREE_BITMAP(lad->lad_bitmap);
4557 static void lfsck_namespace_quit(const struct lu_env *env,
4558 struct lfsck_component *com)
4560 struct lfsck_assistant_data *lad = com->lc_data;
4561 struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
4562 struct lfsck_tgt_desc *ltd;
4563 struct lfsck_tgt_desc *next;
4565 LASSERT(lad != NULL);
4567 lfsck_quit_generic(env, com);
4569 LASSERT(thread_is_init(&lad->lad_thread) ||
4570 thread_is_stopped(&lad->lad_thread));
4571 LASSERT(list_empty(&lad->lad_req_list));
4573 lfsck_namespace_release_lmv(env, com);
4575 spin_lock(<ds->ltd_lock);
4576 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
4577 ltd_namespace_phase_list) {
4578 list_del_init(<d->ltd_namespace_phase_list);
4580 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4581 ltd_namespace_phase_list) {
4582 list_del_init(<d->ltd_namespace_phase_list);
4584 spin_unlock(<ds->ltd_lock);
4587 static int lfsck_namespace_in_notify(const struct lu_env *env,
4588 struct lfsck_component *com,
4589 struct lfsck_request *lr,
4592 struct lfsck_instance *lfsck = com->lc_lfsck;
4593 struct lfsck_namespace *ns = com->lc_file_ram;
4594 struct lfsck_assistant_data *lad = com->lc_data;
4595 struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
4596 struct lfsck_tgt_desc *ltd;
4601 switch (lr->lr_event) {
4602 case LE_SKIP_NLINK_DECLARE: {
4603 struct dt_object *obj;
4604 struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
4608 LASSERT(th != NULL);
4610 idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
4611 mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4612 obj = com->lc_sub_trace_objs[idx].lsto_obj;
4613 if (unlikely(obj == NULL)) {
4614 mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4618 lfsck_object_get(obj);
4619 fid_cpu_to_be(key, &lr->lr_fid);
4620 rc = dt_declare_delete(env, obj,
4621 (const struct dt_key *)key, th);
4623 rc = dt_declare_insert(env, obj,
4624 (const struct dt_rec *)&flags,
4625 (const struct dt_key *)key, th);
4626 mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4627 lfsck_object_put(env, obj);
4631 case LE_SKIP_NLINK: {
4632 struct dt_object *obj;
4633 struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
4639 LASSERT(th != NULL);
4641 idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
4642 mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4643 obj = com->lc_sub_trace_objs[idx].lsto_obj;
4644 if (unlikely(obj == NULL)) {
4645 mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4649 lfsck_object_get(obj);
4650 fid_cpu_to_be(key, &lr->lr_fid);
4651 rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
4652 (const struct dt_key *)key);
4654 if (flags & LNTF_SKIP_NLINK) {
4656 &com->lc_sub_trace_objs[idx].lsto_mutex);
4657 lfsck_object_put(env, obj);
4663 } else if (rc != -ENOENT) {
4667 flags |= LNTF_SKIP_NLINK;
4669 rc = dt_delete(env, obj, (const struct dt_key *)key,
4675 rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
4676 (const struct dt_key *)key, th, 1);
4681 mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
4682 lfsck_object_put(env, obj);
4683 CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
4684 " to be skipped for namespace double scan: rc = %d\n",
4685 lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
4688 /* If we cannot record this object in the LFSCK tracing,
4689 * we have to mark the LFSC as LF_INCOMPLETE, then the
4690 * LFSCK will skip nlink attribute verification for
4692 ns->ln_flags |= LF_INCOMPLETE;
4696 case LE_SET_LMV_MASTER: {
4697 struct dt_object *obj;
4699 obj = lfsck_object_find_bottom(env, lfsck, &lr->lr_fid);
4701 RETURN(PTR_ERR(obj));
4703 if (likely(dt_object_exists(obj)))
4704 rc = lfsck_namespace_notify_lmv_master_local(env, com,
4707 lfsck_object_put(env, obj);
4709 RETURN(rc > 0 ? 0 : rc);
4711 case LE_SET_LMV_SLAVE: {
4712 if (!(lr->lr_flags & LEF_RECHECK_NAME_HASH))
4713 ns->ln_striped_shards_repaired++;
4715 rc = lfsck_namespace_trace_update(env, com, &lr->lr_fid,
4716 LNTF_RECHECK_NAME_HASH, true);
4718 RETURN(rc > 0 ? 0 : rc);
4720 case LE_PHASE1_DONE:
4721 case LE_PHASE2_DONE:
4728 CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
4729 "status %d, flags %x\n", lfsck_lfsck2name(lfsck), lr->lr_event,
4730 lr->lr_index, lr->lr_status, lr->lr_flags2);
4732 spin_lock(<ds->ltd_lock);
4733 ltd = lfsck_ltd2tgt(ltds, lr->lr_index);
4735 spin_unlock(<ds->ltd_lock);
4740 list_del_init(<d->ltd_namespace_phase_list);
4741 switch (lr->lr_event) {
4742 case LE_PHASE1_DONE:
4743 if (lr->lr_status <= 0) {
4744 ltd->ltd_namespace_done = 1;
4745 list_del_init(<d->ltd_namespace_list);
4746 CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
4747 "phase1 for namespace LFSCK: rc = %d.\n",
4748 lfsck_lfsck2name(lfsck),
4749 ltd->ltd_index, lr->lr_status);
4750 ns->ln_flags |= LF_INCOMPLETE;
4755 if (lr->lr_flags2 & LF_INCOMPLETE)
4756 ns->ln_flags |= LF_INCOMPLETE;
4758 if (list_empty(<d->ltd_namespace_list))
4759 list_add_tail(<d->ltd_namespace_list,
4760 &lad->lad_mdt_list);
4761 list_add_tail(<d->ltd_namespace_phase_list,
4762 &lad->lad_mdt_phase2_list);
4764 case LE_PHASE2_DONE:
4765 ltd->ltd_namespace_done = 1;
4766 list_del_init(<d->ltd_namespace_list);
4770 ltd->ltd_namespace_done = 1;
4771 list_del_init(<d->ltd_namespace_list);
4772 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
4774 "%s: the peer MDT %x exit namespace LFSCK\n",
4775 lfsck_lfsck2name(lfsck), ltd->ltd_index);
4776 ns->ln_flags |= LF_INCOMPLETE;
4782 spin_unlock(<ds->ltd_lock);
4784 if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
4785 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
4787 memset(stop, 0, sizeof(*stop));
4788 stop->ls_status = lr->lr_status;
4789 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
4790 lfsck_stop(env, lfsck->li_bottom, stop);
4791 } else if (lfsck_phase2_next_ready(lad)) {
4792 wake_up_all(&lad->lad_thread.t_ctl_waitq);
4798 static void lfsck_namespace_repaired(struct lfsck_namespace *ns, __u64 *count)
4800 *count += ns->ln_objs_nlink_repaired;
4801 *count += ns->ln_dirent_repaired;
4802 *count += ns->ln_linkea_repaired;
4803 *count += ns->ln_mul_linked_repaired;
4804 *count += ns->ln_unmatched_pairs_repaired;
4805 *count += ns->ln_dangling_repaired;
4806 *count += ns->ln_mul_ref_repaired;
4807 *count += ns->ln_bad_type_repaired;
4808 *count += ns->ln_lost_dirent_repaired;
4809 *count += ns->ln_striped_dirs_disabled;
4810 *count += ns->ln_striped_dirs_repaired;
4811 *count += ns->ln_striped_shards_repaired;
4812 *count += ns->ln_name_hash_repaired;
4813 *count += ns->ln_local_lpf_moved;
4816 static int lfsck_namespace_query_all(const struct lu_env *env,
4817 struct lfsck_component *com,
4818 __u32 *mdts_count, __u64 *repaired)
4820 struct lfsck_namespace *ns = com->lc_file_ram;
4821 struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
4822 struct lfsck_tgt_desc *ltd;
4827 rc = lfsck_query_all(env, com);
4831 down_read(<ds->ltd_rw_sem);
4832 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
4833 ltd = lfsck_ltd2tgt(ltds, idx);
4834 LASSERT(ltd != NULL);
4836 mdts_count[ltd->ltd_namespace_status]++;
4837 *repaired += ltd->ltd_namespace_repaired;
4839 up_read(<ds->ltd_rw_sem);
4841 down_read(&com->lc_sem);
4842 mdts_count[ns->ln_status]++;
4843 lfsck_namespace_repaired(ns, repaired);
4844 up_read(&com->lc_sem);
4849 static int lfsck_namespace_query(const struct lu_env *env,
4850 struct lfsck_component *com,
4851 struct lfsck_request *req,
4852 struct lfsck_reply *rep,
4853 struct lfsck_query *que, int idx)
4855 struct lfsck_namespace *ns = com->lc_file_ram;
4859 LASSERT(com->lc_lfsck->li_master);
4861 rc = lfsck_namespace_query_all(env, com,
4862 que->lu_mdts_count[idx],
4863 &que->lu_repaired[idx]);
4865 down_read(&com->lc_sem);
4866 rep->lr_status = ns->ln_status;
4867 if (req->lr_flags & LEF_QUERY_ALL)
4868 lfsck_namespace_repaired(ns, &rep->lr_repaired);
4869 up_read(&com->lc_sem);
4875 static struct lfsck_operations lfsck_namespace_ops = {
4876 .lfsck_reset = lfsck_namespace_reset,
4877 .lfsck_fail = lfsck_namespace_fail,
4878 .lfsck_close_dir = lfsck_namespace_close_dir,
4879 .lfsck_open_dir = lfsck_namespace_open_dir,
4880 .lfsck_checkpoint = lfsck_namespace_checkpoint,
4881 .lfsck_prep = lfsck_namespace_prep,
4882 .lfsck_exec_oit = lfsck_namespace_exec_oit,
4883 .lfsck_exec_dir = lfsck_namespace_exec_dir,
4884 .lfsck_post = lfsck_namespace_post,
4885 .lfsck_dump = lfsck_namespace_dump,
4886 .lfsck_double_scan = lfsck_namespace_double_scan,
4887 .lfsck_data_release = lfsck_namespace_data_release,
4888 .lfsck_quit = lfsck_namespace_quit,
4889 .lfsck_in_notify = lfsck_namespace_in_notify,
4890 .lfsck_query = lfsck_namespace_query,
4894 * Repair dangling name entry.
4896 * For the name entry with dangling reference, we need to repare the
4897 * inconsistency according to the LFSCK sponsor's requirement:
4899 * 1) Keep the inconsistency there and report the inconsistency case,
4900 * then give the chance to the application to find related issues,
4901 * and the users can make the decision about how to handle it with
4902 * more human knownledge. (by default)
4904 * 2) Re-create the missing MDT-object with the FID information.
4906 * \param[in] env pointer to the thread context
4907 * \param[in] com pointer to the lfsck component
4908 * \param[in] parent pointer to the dir object that contains the dangling
4910 * \param[in] child pointer to the object corresponding to the dangling
4912 * \param[in] lnr pointer to the namespace request that contains the
4913 * name's name, parent object, parent's LMV, and ect.
4915 * \retval positive number if no need to repair
4916 * \retval zero for repaired successfully
4917 * \retval negative error number on failure
4919 int lfsck_namespace_repair_dangling(const struct lu_env *env,
4920 struct lfsck_component *com,
4921 struct dt_object *parent,
4922 struct dt_object *child,
4923 struct lfsck_namespace_req *lnr)
4925 struct lfsck_thread_info *info = lfsck_env_info(env);
4926 struct lu_attr *la = &info->lti_la;
4927 struct dt_allocation_hint *hint = &info->lti_hint;
4928 struct dt_object_format *dof = &info->lti_dof;
4929 struct dt_insert_rec *rec = &info->lti_dt_rec;
4930 struct lmv_mds_md_v1 *lmv2 = &info->lti_lmv2;
4931 const struct lu_name *cname;
4932 const struct lu_fid *pfid = lfsck_dto2fid(parent);
4933 const struct lu_fid *cfid = lfsck_dto2fid(child);
4934 struct linkea_data ldata = { NULL };
4935 struct lfsck_lock_handle *llh = &info->lti_llh;
4936 struct lu_buf linkea_buf;
4937 struct lu_buf lmv_buf;
4938 struct lfsck_instance *lfsck = com->lc_lfsck;
4939 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4940 struct dt_device *dev = lfsck->li_next;
4941 struct thandle *th = NULL;
4943 __u16 type = lnr->lnr_type;
4947 cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
4948 if (bk->lb_param & LPF_CREATE_MDTOBJ)
4953 if (!create || bk->lb_param & LPF_DRYRUN)
4956 /* We may need to create the sub-objects of the @child via LOD,
4957 * so make the modification based on lfsck->li_next. */
4959 parent = lfsck_object_locate(dev, parent);
4961 GOTO(log, rc = PTR_ERR(parent));
4963 if (unlikely(!dt_try_as_dir(env, parent)))
4964 GOTO(log, rc = -ENOTDIR);
4966 child = lfsck_object_locate(dev, child);
4968 GOTO(log, rc = PTR_ERR(child));
4970 rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
4974 rc = linkea_add_buf(&ldata, cname, pfid);
4978 rc = lfsck_lock(env, lfsck, parent, lnr->lnr_name, llh,
4979 MDS_INODELOCK_UPDATE, LCK_PR);
4983 rc = lfsck_namespace_check_exist(env, parent, child, lnr->lnr_name);
4987 /* Set the ctime as zero, then others can know it is created for
4988 * repairing dangling name entry by LFSCK. And if the LFSCK made
4989 * wrong decision and the real MDT-object has been found later,
4990 * then the LFSCK has chance to fix the incosistency properly. */
4991 memset(la, 0, sizeof(*la));
4992 la->la_mode = (type & S_IFMT) | 0600;
4993 la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
4994 LA_ATIME | LA_MTIME | LA_CTIME;
4996 child->do_ops->do_ah_init(env, hint, parent, child,
4997 la->la_mode & S_IFMT);
4999 memset(dof, 0, sizeof(*dof));
5000 dof->dof_type = dt_mode_to_dft(type);
5001 /* If the target is a regular file, then the LFSCK will only create
5002 * the MDT-object without stripes (dof->dof_reg.striped = 0). related
5003 * OST-objects will be created when write open. */
5005 th = dt_trans_create(env, dev);
5007 GOTO(log, rc = PTR_ERR(th));
5009 /* 1a. create child. */
5010 rc = dt_declare_create(env, child, la, hint, dof, th);
5014 if (S_ISDIR(type)) {
5015 if (unlikely(!dt_try_as_dir(env, child)))
5016 GOTO(stop, rc = -ENOTDIR);
5018 /* 2a. increase child nlink */
5019 rc = dt_declare_ref_add(env, child, th);
5023 /* 3a. insert dot into child dir */
5024 rec->rec_type = S_IFDIR;
5025 rec->rec_fid = cfid;
5026 rc = dt_declare_insert(env, child,
5027 (const struct dt_rec *)rec,
5028 (const struct dt_key *)dot, th);
5032 /* 4a. insert dotdot into child dir */
5033 rec->rec_fid = pfid;
5034 rc = dt_declare_insert(env, child,
5035 (const struct dt_rec *)rec,
5036 (const struct dt_key *)dotdot, th);
5040 /* 5a. generate slave LMV EA. */
5041 if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
5044 idx = lfsck_shard_name_to_index(env,
5045 lnr->lnr_name, lnr->lnr_namelen,
5047 if (unlikely(idx < 0))
5048 GOTO(stop, rc = idx);
5050 *lmv2 = lnr->lnr_lmv->ll_lmv;
5051 lmv2->lmv_magic = LMV_MAGIC_STRIPE;
5052 lmv2->lmv_master_mdt_index = idx;
5054 lfsck_lmv_header_cpu_to_le(lmv2, lmv2);
5055 lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
5056 rc = dt_declare_xattr_set(env, child, &lmv_buf,
5057 XATTR_NAME_LMV, 0, th);
5063 /* 6a. insert linkEA for child */
5064 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
5065 ldata.ld_leh->leh_len);
5066 rc = dt_declare_xattr_set(env, child, &linkea_buf,
5067 XATTR_NAME_LINK, 0, th);
5071 rc = dt_trans_start_local(env, dev, th);
5073 GOTO(stop, rc = (rc == -EEXIST ? 1 : rc));
5075 dt_write_lock(env, child, 0);
5076 /* 1b. create child */
5077 rc = dt_create(env, child, la, hint, dof, th);
5079 GOTO(unlock, rc = (rc == -EEXIST ? 1 : rc));
5081 if (S_ISDIR(type)) {
5082 /* 2b. increase child nlink */
5083 rc = dt_ref_add(env, child, th);
5087 /* 3b. insert dot into child dir */
5088 rec->rec_type = S_IFDIR;
5089 rec->rec_fid = cfid;
5090 rc = dt_insert(env, child, (const struct dt_rec *)rec,
5091 (const struct dt_key *)dot, th, 1);
5095 /* 4b. insert dotdot into child dir */
5096 rec->rec_fid = pfid;
5097 rc = dt_insert(env, child, (const struct dt_rec *)rec,
5098 (const struct dt_key *)dotdot, th, 1);
5102 /* 5b. generate slave LMV EA. */
5103 if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
5104 rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV,
5111 /* 6b. insert linkEA for child. */
5112 rc = dt_xattr_set(env, child, &linkea_buf,
5113 XATTR_NAME_LINK, 0, th);
5118 dt_write_unlock(env, child);
5121 dt_trans_stop(env, dev, th);
5125 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling "
5126 "reference for: parent "DFID", child "DFID", type %u, "
5127 "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck),
5128 PFID(pfid), PFID(cfid), type, cname->ln_name,
5129 create ? "Create the lost MDT-object as required" :
5130 "Keep the MDT-object there by default", rc);
5133 struct lfsck_namespace *ns = com->lc_file_ram;
5135 ns->ln_flags |= LF_INCONSISTENT;
5141 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
5142 struct lfsck_component *com,
5143 struct lfsck_assistant_req *lar)
5145 struct lfsck_thread_info *info = lfsck_env_info(env);
5146 struct lu_attr *la = &info->lti_la;
5147 struct lfsck_instance *lfsck = com->lc_lfsck;
5148 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
5149 struct lfsck_namespace *ns = com->lc_file_ram;
5150 struct lfsck_assistant_data *lad = com->lc_data;
5151 struct linkea_data ldata = { NULL };
5152 const struct lu_name *cname;
5153 struct thandle *handle = NULL;
5154 struct lfsck_namespace_req *lnr =
5155 container_of0(lar, struct lfsck_namespace_req, lnr_lar);
5156 struct dt_object *dir = NULL;
5157 struct dt_object *obj = NULL;
5158 struct lfsck_assistant_object *lso = lar->lar_parent;
5159 const struct lu_fid *pfid = &lso->lso_fid;
5160 struct dt_device *dev = NULL;
5161 struct lustre_handle lh = { 0 };
5162 bool repaired = false;
5163 bool dtlocked = false;
5167 bool bad_hash = false;
5168 bool bad_linkea = false;
5172 enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
5179 if (lnr->lnr_attr & LUDA_UPGRADE) {
5180 ns->ln_flags |= LF_UPGRADE;
5181 ns->ln_dirent_repaired++;
5183 } else if (lnr->lnr_attr & LUDA_REPAIR) {
5184 ns->ln_flags |= LF_INCONSISTENT;
5185 ns->ln_dirent_repaired++;
5189 if (unlikely(fid_is_zero(&lnr->lnr_fid))) {
5190 if (strcmp(lnr->lnr_name, dotdot) != 0)
5193 rc = lfsck_namespace_trace_update(env, com, pfid,
5194 LNTF_CHECK_PARENT, true);
5199 if (unlikely(!fid_is_sane(&lnr->lnr_fid))) {
5200 CDEBUG(D_LFSCK, "%s: dir scan find invalid FID "DFID
5201 " for the name entry %.*s under "DFID"\n",
5202 lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
5203 lnr->lnr_namelen, lnr->lnr_name, PFID(pfid));
5205 if (strcmp(lnr->lnr_name, dotdot) != 0)
5206 /* invalid FID means bad name entry, remove it. */
5207 type = LNIT_BAD_DIRENT;
5209 /* If the parent FID is invalid, we cannot remove
5210 * the ".." entry directly. */
5211 rc = lfsck_namespace_trace_update(env, com, pfid,
5212 LNTF_CHECK_PARENT, true);
5217 if (unlikely(lnr->lnr_dir_cookie == MDS_DIR_END_OFF)) {
5218 rc = lfsck_namespace_striped_dir_rescan(env, com, lnr);
5223 if (fid_seq_is_dot(fid_seq(&lnr->lnr_fid)))
5226 if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
5227 rc = lfsck_namespace_handle_striped_master(env, com, lnr);
5232 idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
5234 GOTO(out, rc = idx);
5236 if (idx == lfsck_dev_idx(lfsck)) {
5237 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
5240 dev = lfsck->li_bottom;
5242 struct lfsck_tgt_desc *ltd;
5244 /* Usually, some local filesystem consistency verification
5245 * tools can guarantee the local namespace tree consistenct.
5246 * So the LFSCK will only verify the remote directory. */
5247 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
5248 rc = lfsck_namespace_trace_update(env, com, pfid,
5249 LNTF_CHECK_PARENT, true);
5254 ltd = lfsck_ltd2tgt(&lfsck->li_mdt_descs, idx);
5255 if (unlikely(ltd == NULL)) {
5256 CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
5257 "did not join the namespace LFSCK\n",
5258 lfsck_lfsck2name(lfsck), idx);
5259 lfsck_lad_set_bitmap(env, com, idx);
5261 GOTO(out, rc = -ENODEV);
5267 obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
5269 GOTO(out, rc = PTR_ERR(obj));
5271 cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
5272 if (dt_object_exists(obj) == 0) {
5276 dir = lfsck_assistant_object_load(env, lfsck, lso);
5280 GOTO(trace, rc == -ENOENT ? 0 : rc);
5284 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
5286 if (!lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv,
5287 lnr->lnr_name, lnr->lnr_namelen)) {
5288 type = LNIT_BAD_DIRENT;
5293 type = LNIT_DANGLING;
5294 rc = lfsck_namespace_repair_dangling(env, com, dir,
5303 if (!(bk->lb_param & LPF_DRYRUN) && lad->lad_advance_lock) {
5306 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
5307 MDS_INODELOCK_UPDATE |
5308 MDS_INODELOCK_XATTR, LCK_EX);
5312 handle = dt_trans_create(env, dev);
5314 GOTO(out, rc = PTR_ERR(handle));
5316 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
5320 rc = dt_trans_start_local(env, dev, handle);
5324 dt_write_lock(env, obj, 0);
5328 rc = lfsck_links_read(env, obj, &ldata);
5329 if (unlikely(rc == -ENOENT)) {
5330 if (handle != NULL) {
5331 dt_write_unlock(env, obj);
5334 dt_trans_stop(env, dev, handle);
5337 lfsck_ibits_unlock(&lh, LCK_EX);
5340 /* It may happen when the remote object has been removed,
5341 * but the local MDT is not aware of that. */
5343 } else if (rc == 0) {
5344 count = ldata.ld_leh->leh_reccount;
5345 rc = linkea_links_find(&ldata, cname, pfid);
5347 (count == 1 || !S_ISDIR(lfsck_object_type(obj)))) {
5348 if ((lfsck_object_type(obj) & S_IFMT) !=
5350 ns->ln_flags |= LF_INCONSISTENT;
5351 type = LNIT_BAD_TYPE;
5357 /* If the name entry hash does not match the slave striped
5358 * directory, and the name entry does not match also, then
5359 * it is quite possible that name entry is corrupted. */
5360 if (!lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv,
5361 lnr->lnr_name, lnr->lnr_namelen)) {
5362 ns->ln_flags |= LF_INCONSISTENT;
5363 type = LNIT_BAD_DIRENT;
5368 /* If the file type stored in the name entry does not match
5369 * the file type claimed by the object, and the object does
5370 * not recognize the name entry, then it is quite possible
5371 * that the name entry is corrupted. */
5372 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) {
5373 ns->ln_flags |= LF_INCONSISTENT;
5374 type = LNIT_BAD_DIRENT;
5379 /* For sub-dir object, we cannot make sure whether the sub-dir
5380 * back references the parent via ".." name entry correctly or
5381 * not in the LFSCK first-stage scanning. It may be that the
5382 * (remote) sub-dir ".." name entry has no parent FID after
5383 * file-level backup/restore and its linkEA may be wrong.
5384 * So under such case, we should replace the linkEA according
5385 * to current name entry. But this needs to be done during the
5386 * LFSCK second-stage scanning. The LFSCK will record the name
5387 * entry for further possible using. */
5391 } else if (unlikely(rc == -EINVAL)) {
5392 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type)
5393 type = LNIT_BAD_TYPE;
5396 /* The magic crashed, we are not sure whether there are more
5397 * corrupt data in the linkea, so remove all linkea entries. */
5401 } else if (rc == -ENODATA) {
5402 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type)
5403 type = LNIT_BAD_TYPE;
5410 if (bk->lb_param & LPF_DRYRUN) {
5412 ns->ln_flags |= LF_UPGRADE;
5414 ns->ln_flags |= LF_INCONSISTENT;
5415 ns->ln_linkea_repaired++;
5421 if (!lustre_handle_is_used(&lh)) {
5430 dir = lfsck_assistant_object_load(env, lfsck, lso);
5434 GOTO(stop, rc == -ENOENT ? 0 : rc);
5438 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
5443 if (!remove && newdata)
5444 ns->ln_flags |= LF_UPGRADE;
5445 else if (remove || !(ns->ln_flags & LF_UPGRADE))
5446 ns->ln_flags |= LF_INCONSISTENT;
5451 rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle);
5452 if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
5457 rc = linkea_data_new(&ldata,
5458 &lfsck_env_info(env)->lti_linkea_buf);
5463 rc = linkea_add_buf(&ldata, cname, pfid);
5467 rc = lfsck_links_write(env, obj, &ldata, handle);
5468 if (unlikely(rc == -ENOSPC) &&
5469 S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) {
5470 if (handle != NULL) {
5471 LASSERT(dt_write_locked(env, obj));
5473 dt_write_unlock(env, obj);
5476 dt_trans_stop(env, dev, handle);
5479 lfsck_ibits_unlock(&lh, LCK_EX);
5482 rc = lfsck_namespace_trace_update(env, com,
5483 &lnr->lnr_fid, LNTF_SKIP_NLINK, true);
5485 /* If we cannot record this object in the
5486 * LFSCK tracing, we have to mark the LFSCK
5487 * as LF_INCOMPLETE, then the LFSCK will
5488 * skip nlink attribute verification for
5490 ns->ln_flags |= LF_INCOMPLETE;
5498 count = ldata.ld_leh->leh_reccount;
5499 if (!S_ISDIR(lfsck_object_type(obj)) ||
5500 !dt_object_remote(obj)) {
5501 ns->ln_linkea_repaired++;
5511 dt_write_unlock(env, obj);
5513 if (handle != NULL && !IS_ERR(handle))
5514 dt_trans_stop(env, dev, handle);
5517 lfsck_ibits_unlock(&lh, LCK_EX);
5519 if (!name_is_dot_or_dotdot(lnr->lnr_name, lnr->lnr_namelen) &&
5520 !lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv,
5521 lnr->lnr_name, lnr->lnr_namelen) &&
5522 type != LNIT_BAD_DIRENT) {
5523 ns->ln_flags |= LF_INCONSISTENT;
5527 dir = lfsck_assistant_object_load(env, lfsck, lso);
5531 GOTO(trace, rc == -ENOENT ? 0 : rc);
5535 rc = lfsck_namespace_repair_bad_name_hash(env, com, dir,
5536 lnr->lnr_lmv, lnr->lnr_name);
5542 if (type != LNIT_NONE && dir == NULL) {
5543 dir = lfsck_assistant_object_load(env, lfsck, lso);
5547 GOTO(trace, rc == -ENOENT ? 0 : rc);
5554 rc = lfsck_namespace_repair_dirent(env, com, dir,
5555 obj, lnr->lnr_name, lnr->lnr_name,
5556 lnr->lnr_type, true, false);
5560 case LNIT_BAD_DIRENT:
5562 /* XXX: This is a bad dirent, we do not know whether
5563 * the original name entry reference a regular
5564 * file or a directory, then keep the parent's
5565 * nlink count unchanged here. */
5566 rc = lfsck_namespace_repair_dirent(env, com, dir,
5567 obj, lnr->lnr_name, lnr->lnr_name,
5568 lnr->lnr_type, false, false);
5576 if (obj != NULL && count == 1 &&
5577 S_ISREG(lfsck_object_type(obj)))
5578 dt_attr_get(env, obj, la);
5582 down_write(&com->lc_sem);
5584 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
5585 "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
5586 lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid), PFID(pfid),
5587 lnr->lnr_namelen, lnr->lnr_name, rc);
5589 lfsck_namespace_record_failure(env, lfsck, ns);
5590 if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG ||
5591 rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
5592 rc == -EHOSTUNREACH || rc == -EINPROGRESS) &&
5593 dev != NULL && dev != lfsck->li_bottom)
5594 lfsck_lad_set_bitmap(env, com, idx);
5596 if (!(bk->lb_param & LPF_FAILOUT))
5600 ns->ln_items_repaired++;
5602 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
5603 "repaired the entry: "DFID", parent "DFID
5604 ", name %.*s\n", lfsck_lfsck2name(lfsck),
5605 PFID(&lnr->lnr_fid), PFID(pfid),
5606 lnr->lnr_namelen, lnr->lnr_name);
5610 ns->ln_dangling_repaired++;
5613 ns->ln_bad_type_repaired++;
5615 case LNIT_BAD_DIRENT:
5616 ns->ln_dirent_repaired++;
5622 if (bk->lb_param & LPF_DRYRUN &&
5623 lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
5624 lfsck_pos_fill(env, lfsck,
5625 &ns->ln_pos_first_inconsistent,
5630 ns->ln_name_hash_repaired++;
5632 /* Not count repeatedly. */
5634 ns->ln_items_repaired++;
5636 CDEBUG(D_LFSCK, "%s: namespace LFSCK "
5637 "assistant repaired the entry: "
5640 lfsck_lfsck2name(lfsck),
5641 PFID(&lnr->lnr_fid), PFID(pfid),
5642 lnr->lnr_namelen, lnr->lnr_name);
5645 if (bk->lb_param & LPF_DRYRUN &&
5646 lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
5647 lfsck_pos_fill(env, lfsck,
5648 &ns->ln_pos_first_inconsistent,
5655 if (count > 1 || la->la_nlink > 1)
5656 ns->ln_mul_linked_checked++;
5658 up_write(&com->lc_sem);
5660 if (obj != NULL && !IS_ERR(obj))
5661 lfsck_object_put(env, obj);
5663 if (dir != NULL && !IS_ERR(dir))
5664 lfsck_object_put(env, dir);
5666 lad->lad_advance_lock = bad_linkea;
5672 * Handle one orphan under the backend /lost+found directory
5674 * Insert the orphan FID into the namespace LFSCK trace file for further
5675 * processing (via the subsequent namespace LFSCK second-stage scanning).
5676 * At the same time, remove the orphan name entry from backend /lost+found
5677 * directory. There is an interval between the orphan name entry removed
5678 * from the backend /lost+found directory and the orphan FID in the LFSCK
5679 * trace file handled. In such interval, the LFSCK can be reset, then
5680 * all the FIDs recorded in the namespace LFSCK trace file will be dropped.
5681 * To guarantee that the orphans can be found when LFSCK run next time
5682 * without e2fsck again, when remove the orphan name entry, the LFSCK
5683 * will set the orphan's ctime attribute as 1. Since normal applications
5684 * cannot change the object's ctime attribute as 1. Then when LFSCK run
5685 * next time, it can record the object (that ctime is 1) in the namespace
5686 * LFSCK trace file during the first-stage scanning.
5688 * \param[in] env pointer to the thread context
5689 * \param[in] com pointer to the lfsck component
5690 * \param[in] parent pointer to the object for the backend /lost+found
5691 * \param[in] ent pointer to the name entry for the target under the
5692 * backend /lost+found
5694 * \retval positive for repaired
5695 * \retval 0 if needs to repair nothing
5696 * \retval negative error number on failure
5698 static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env,
5699 struct lfsck_component *com,
5700 struct dt_object *parent,
5701 struct lu_dirent *ent)
5703 struct lfsck_thread_info *info = lfsck_env_info(env);
5704 struct lu_fid *key = &info->lti_fid;
5705 struct lu_attr *la = &info->lti_la;
5706 struct lfsck_instance *lfsck = com->lc_lfsck;
5707 struct dt_object *obj;
5708 struct dt_device *dev = lfsck->li_bottom;
5709 struct dt_object *child = NULL;
5710 struct thandle *th = NULL;
5717 child = lfsck_object_find_by_dev(env, dev, &ent->lde_fid);
5719 RETURN(PTR_ERR(child));
5721 LASSERT(dt_object_exists(child));
5722 LASSERT(!dt_object_remote(child));
5724 idx = lfsck_sub_trace_file_fid2idx(&ent->lde_fid);
5725 obj = com->lc_sub_trace_objs[idx].lsto_obj;
5726 fid_cpu_to_be(key, &ent->lde_fid);
5727 rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
5728 (const struct dt_key *)key);
5731 flags |= LNTF_CHECK_ORPHAN;
5732 } else if (rc == -ENOENT) {
5733 flags = LNTF_CHECK_ORPHAN;
5738 th = dt_trans_create(env, dev);
5740 GOTO(out, rc = PTR_ERR(th));
5742 /* a1. remove name entry from backend /lost+found */
5743 rc = dt_declare_delete(env, parent,
5744 (const struct dt_key *)ent->lde_name, th);
5748 if (S_ISDIR(lfsck_object_type(child))) {
5749 /* a2. decrease parent's nlink */
5750 rc = dt_declare_ref_del(env, parent, th);
5756 /* a3. remove child's FID from the LFSCK trace file. */
5757 rc = dt_declare_delete(env, obj,
5758 (const struct dt_key *)key, th);
5762 /* a4. set child's ctime as 1 */
5763 memset(la, 0, sizeof(*la));
5765 la->la_valid = LA_CTIME;
5766 rc = dt_declare_attr_set(env, child, la, th);
5771 /* a5. insert child's FID into the LFSCK trace file. */
5772 rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
5773 (const struct dt_key *)key, th);
5777 rc = dt_trans_start_local(env, dev, th);
5781 /* b1. remove name entry from backend /lost+found */
5782 rc = dt_delete(env, parent, (const struct dt_key *)ent->lde_name, th);
5786 if (S_ISDIR(lfsck_object_type(child))) {
5787 /* b2. decrease parent's nlink */
5788 dt_write_lock(env, parent, 0);
5789 rc = dt_ref_del(env, parent, th);
5790 dt_write_unlock(env, parent);
5796 /* a3. remove child's FID from the LFSCK trace file. */
5797 rc = dt_delete(env, obj, (const struct dt_key *)key, th);
5801 /* b4. set child's ctime as 1 */
5802 rc = dt_attr_set(env, child, la, th);
5807 /* b5. insert child's FID into the LFSCK trace file. */
5808 rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
5809 (const struct dt_key *)key, th, 1);
5811 GOTO(stop, rc = (rc == 0 ? 1 : rc));
5814 dt_trans_stop(env, dev, th);
5817 lfsck_object_put(env, child);
5823 * Handle orphans under the backend /lost+found directory
5825 * Some backend checker, such as e2fsck for ldiskfs may find some orphans
5826 * and put them under the backend /lost+found directory that is invisible
5827 * to client. The LFSCK will scan such directory, for the original client
5828 * visible orphans, add their fids into the namespace LFSCK trace file,
5829 * then the subsenquent namespace LFSCK second-stage scanning can handle
5830 * them as other objects to be double scanned: either move back to normal
5831 * namespace, or to the global visible orphan directory:
5832 * /ROOT/.lustre/lost+found/MDTxxxx/
5834 * \param[in] env pointer to the thread context
5835 * \param[in] com pointer to the lfsck component
5837 static void lfsck_namespace_scan_local_lpf(const struct lu_env *env,
5838 struct lfsck_component *com)
5840 struct lfsck_thread_info *info = lfsck_env_info(env);
5841 struct lu_dirent *ent =
5842 (struct lu_dirent *)info->lti_key;
5843 struct lu_seq_range *range = &info->lti_range;
5844 struct lfsck_instance *lfsck = com->lc_lfsck;
5845 struct ptlrpc_thread *thread = &lfsck->li_thread;
5846 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
5847 struct lfsck_namespace *ns = com->lc_file_ram;
5848 struct dt_object *parent;
5849 const struct dt_it_ops *iops;
5851 struct seq_server_site *ss = lfsck_dev_site(lfsck);
5853 __u32 idx = lfsck_dev_idx(lfsck);
5858 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
5859 &LU_BACKEND_LPF_FID);
5860 if (IS_ERR(parent)) {
5861 CERROR("%s: fail to find backend /lost+found: rc = %ld\n",
5862 lfsck_lfsck2name(lfsck), PTR_ERR(parent));
5866 /* It is normal that the /lost+found does not exist for ZFS backend. */
5867 if (!dt_object_exists(parent))
5870 if (unlikely(!dt_try_as_dir(env, parent)))
5871 GOTO(out, rc = -ENOTDIR);
5873 CDEBUG(D_LFSCK, "%s: start to scan backend /lost+found\n",
5874 lfsck_lfsck2name(lfsck));
5876 com->lc_new_scanned = 0;
5877 iops = &parent->do_index_ops->dio_it;
5878 di = iops->init(env, parent, LUDA_64BITHASH | LUDA_TYPE);
5880 GOTO(out, rc = PTR_ERR(di));
5882 rc = iops->load(env, di, 0);
5884 rc = iops->next(env, di);
5889 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
5890 unlikely(!thread_is_running(thread)))
5893 rc = iops->rec(env, di, (struct dt_rec *)ent,
5894 LUDA_64BITHASH | LUDA_TYPE);
5896 rc = lfsck_unpack_ent(ent, &cookie, &type);
5898 if (unlikely(rc != 0)) {
5899 CDEBUG(D_LFSCK, "%s: fail to iterate backend "
5900 "/lost+found: rc = %d\n",
5901 lfsck_lfsck2name(lfsck), rc);
5906 /* skip dot and dotdot entries */
5907 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
5910 if (!fid_seq_in_fldb(fid_seq(&ent->lde_fid)))
5913 if (fid_is_norm(&ent->lde_fid)) {
5914 fld_range_set_mdt(range);
5915 rc = fld_local_lookup(env, ss->ss_server_fld,
5916 fid_seq(&ent->lde_fid), range);
5919 } else if (idx != 0) {
5920 /* If the returned FID is IGIF, then there are three
5923 * 1) The object is upgraded from old Lustre-1.8 with
5924 * IGIF assigned to such object.
5925 * 2) The object is a backend local object and is
5926 * invisible to client.
5927 * 3) The object lost its LMV EA, and since there is
5928 * no FID-in-dirent for the orphan in the backend
5929 * /lost+found directory, then the low layer will
5930 * return IGIF for such object.
5932 * For MDTx (x != 0), it is either case 2) or case 3),
5933 * but from the LFSCK view, they are indistinguishable.
5934 * To be safe, the LFSCK will keep it there and report
5935 * some message, then the adminstrator can handle that
5938 * For MDT0, it is more possible the case 1). The LFSCK
5939 * will handle the orphan as an upgraded object. */
5940 CDEBUG(D_LFSCK, "%s: the orphan %.*s with IGIF "DFID
5941 "in the backend /lost+found on the MDT %04x, "
5942 "to be safe, skip it.\n",
5943 lfsck_lfsck2name(lfsck), ent->lde_namelen,
5944 ent->lde_name, PFID(&ent->lde_fid), idx);
5948 rc = lfsck_namespace_scan_local_lpf_one(env, com, parent, ent);
5951 down_write(&com->lc_sem);
5952 com->lc_new_scanned++;
5953 ns->ln_local_lpf_scanned++;
5955 ns->ln_local_lpf_moved++;
5957 ns->ln_local_lpf_skipped++;
5959 ns->ln_local_lpf_failed++;
5960 up_write(&com->lc_sem);
5962 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
5966 lfsck_control_speed_by_self(com);
5967 if (unlikely(!thread_is_running(thread))) {
5972 rc = iops->next(env, di);
5976 iops->fini(env, di);
5981 CDEBUG(D_LFSCK, "%s: stop to scan backend /lost+found: rc = %d\n",
5982 lfsck_lfsck2name(lfsck), rc);
5984 lfsck_object_put(env, parent);
5988 * Rescan the striped directory after the master LMV EA reset.
5990 * Sometimes, the master LMV EA of the striped directory maybe lost, so when
5991 * the namespace LFSCK engine scan the striped directory for the first time,
5992 * it will be regarded as a normal directory. As the LFSCK processing, some
5993 * other LFSCK instance on other MDT will find the shard of this striped dir,
5994 * and find that the master MDT-object of the striped directory lost its LMV
5995 * EA, then such remote LFSCK instance will regenerate the master LMV EA and
5996 * notify the LFSCK instance on this MDT to rescan the striped directory.
5998 * \param[in] env pointer to the thread context
5999 * \param[in] com pointer to the lfsck component
6000 * \param[in] llu the lfsck_lmv_unit that contains the striped directory
6003 * \retval positive number for success
6004 * \retval 0 for LFSCK stopped/paused
6005 * \retval negative error number on failure
6007 static int lfsck_namespace_rescan_striped_dir(const struct lu_env *env,
6008 struct lfsck_component *com,
6009 struct lfsck_lmv_unit *llu)
6011 struct lfsck_thread_info *info = lfsck_env_info(env);
6012 struct lfsck_instance *lfsck = com->lc_lfsck;
6013 struct lfsck_assistant_data *lad = com->lc_data;
6014 struct dt_object *dir;
6015 const struct dt_it_ops *iops;
6017 struct lu_dirent *ent =
6018 (struct lu_dirent *)info->lti_key;
6019 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
6020 struct ptlrpc_thread *thread = &lfsck->li_thread;
6021 struct lfsck_assistant_object *lso = NULL;
6022 struct lfsck_namespace_req *lnr;
6023 struct lfsck_assistant_req *lar;
6028 LASSERT(list_empty(&lad->lad_req_list));
6030 lfsck->li_lmv = &llu->llu_lmv;
6031 lfsck->li_obj_dir = lfsck_object_get(llu->llu_obj);
6032 rc = lfsck_open_dir(env, lfsck, 0);
6036 dir = lfsck->li_obj_dir;
6037 di = lfsck->li_di_dir;
6038 iops = &dir->do_index_ops->dio_it;
6040 rc = iops->rec(env, di, (struct dt_rec *)ent,
6041 lfsck->li_args_dir);
6043 rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir,
6047 if (bk->lb_param & LPF_FAILOUT)
6053 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
6057 lso = lfsck_assistant_object_init(env,
6058 lfsck_dto2fid(dir), NULL,
6059 lfsck->li_pos_current.lp_oit_cookie, true);
6061 if (bk->lb_param & LPF_FAILOUT)
6062 GOTO(out, rc = PTR_ERR(lso));
6069 lnr = lfsck_namespace_assistant_req_init(lfsck, lso, ent, type);
6071 if (bk->lb_param & LPF_FAILOUT)
6072 GOTO(out, rc = PTR_ERR(lnr));
6077 lar = &lnr->lnr_lar;
6078 rc = lfsck_namespace_assistant_handler_p1(env, com, lar);
6079 lfsck_namespace_assistant_req_fini(env, lar);
6080 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
6083 if (unlikely(!thread_is_running(thread)))
6087 rc = iops->next(env, di);
6091 if (lso != NULL && !IS_ERR(lso))
6092 lfsck_assistant_object_put(env, lso);
6094 lfsck_close_dir(env, lfsck, rc);
6098 /* The close_dir() may insert a dummy lnr in the lad->lad_req_list. */
6099 if (list_empty(&lad->lad_req_list))
6102 spin_lock(&lad->lad_lock);
6103 lar = list_entry(lad->lad_req_list.next, struct lfsck_assistant_req,
6105 list_del_init(&lar->lar_list);
6106 spin_unlock(&lad->lad_lock);
6108 rc = lfsck_namespace_assistant_handler_p1(env, com, lar);
6109 lfsck_namespace_assistant_req_fini(env, lar);
6111 RETURN(rc == 0 ? 1 : rc);
6115 lfsck_namespace_double_scan_one_trace_file(const struct lu_env *env,
6116 struct lfsck_component *com,
6117 struct dt_object *obj, bool first)
6119 struct lfsck_instance *lfsck = com->lc_lfsck;
6120 struct ptlrpc_thread *thread = &lfsck->li_thread;
6121 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
6122 struct lfsck_namespace *ns = com->lc_file_ram;
6123 const struct dt_it_ops *iops = &obj->do_index_ops->dio_it;
6124 struct dt_object *target;
6132 di = iops->init(env, obj, 0);
6134 RETURN(PTR_ERR(di));
6137 fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
6140 rc = iops->get(env, di, (const struct dt_key *)&fid);
6145 /* The start one either has been processed or does not exist,
6147 rc = iops->next(env, di);
6153 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
6154 unlikely(!thread_is_running(thread)))
6157 key = iops->key(env, di);
6166 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
6167 if (!fid_is_sane(&fid)) {
6172 target = lfsck_object_find_bottom(env, lfsck, &fid);
6173 if (IS_ERR(target)) {
6174 rc = PTR_ERR(target);
6178 if (dt_object_exists(target)) {
6179 rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
6181 rc = lfsck_namespace_double_scan_one(env, com,
6188 lfsck_object_put(env, target);
6191 down_write(&com->lc_sem);
6192 com->lc_new_checked++;
6193 com->lc_new_scanned++;
6194 if (rc >= 0 && fid_is_sane(&fid))
6195 ns->ln_fid_latest_scanned_phase2 = fid;
6197 ns->ln_objs_repaired_phase2++;
6199 ns->ln_objs_failed_phase2++;
6200 up_write(&com->lc_sem);
6202 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
6205 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
6206 cfs_time_current())) &&
6207 com->lc_new_checked != 0) {
6208 down_write(&com->lc_sem);
6209 ns->ln_run_time_phase2 +=
6210 cfs_duration_sec(cfs_time_current() +
6211 HALF_SEC - com->lc_time_last_checkpoint);
6212 ns->ln_time_last_checkpoint = cfs_time_current_sec();
6213 ns->ln_objs_checked_phase2 += com->lc_new_checked;
6214 com->lc_new_checked = 0;
6215 rc = lfsck_namespace_store(env, com);
6216 up_write(&com->lc_sem);
6220 com->lc_time_last_checkpoint = cfs_time_current();
6221 com->lc_time_next_checkpoint =
6222 com->lc_time_last_checkpoint +
6223 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
6226 lfsck_control_speed_by_self(com);
6227 if (unlikely(!thread_is_running(thread)))
6230 rc = iops->next(env, di);
6239 iops->fini(env, di);
6244 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
6245 struct lfsck_component *com)
6247 struct lfsck_instance *lfsck = com->lc_lfsck;
6248 struct lfsck_namespace *ns = com->lc_file_ram;
6253 while (!list_empty(&lfsck->li_list_lmv)) {
6254 struct lfsck_lmv_unit *llu;
6256 spin_lock(&lfsck->li_lock);
6257 llu = list_entry(lfsck->li_list_lmv.next,
6258 struct lfsck_lmv_unit, llu_link);
6259 list_del_init(&llu->llu_link);
6260 spin_unlock(&lfsck->li_lock);
6262 rc = lfsck_namespace_rescan_striped_dir(env, com, llu);
6267 CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
6268 lfsck_lfsck2name(lfsck));
6270 lfsck_namespace_scan_local_lpf(env, com);
6272 com->lc_new_checked = 0;
6273 com->lc_new_scanned = 0;
6274 com->lc_time_last_checkpoint = cfs_time_current();
6275 com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
6276 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
6278 i = lfsck_sub_trace_file_fid2idx(&ns->ln_fid_latest_scanned_phase2);
6279 rc = lfsck_namespace_double_scan_one_trace_file(env, com,
6280 com->lc_sub_trace_objs[i].lsto_obj, true);
6281 while (rc > 0 && ++i < LFSCK_STF_COUNT)
6282 rc = lfsck_namespace_double_scan_one_trace_file(env, com,
6283 com->lc_sub_trace_objs[i].lsto_obj, false);
6285 CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop at the No. %d "
6286 "trace file: rc = %d\n", lfsck_lfsck2name(lfsck), i, rc);
6291 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
6292 struct lfsck_component *com,
6293 struct lfsck_position *pos)
6295 struct lfsck_assistant_data *lad = com->lc_data;
6296 struct lfsck_namespace_req *lnr;
6298 if (((struct lfsck_namespace *)(com->lc_file_ram))->ln_status !=
6302 if (list_empty(&lad->lad_req_list))
6305 lnr = list_entry(lad->lad_req_list.next,
6306 struct lfsck_namespace_req,
6308 pos->lp_oit_cookie = lnr->lnr_lar.lar_parent->lso_oit_cookie;
6309 pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
6310 pos->lp_dir_parent = lnr->lnr_lar.lar_parent->lso_fid;
6313 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
6314 struct lfsck_component *com,
6317 struct lfsck_instance *lfsck = com->lc_lfsck;
6318 struct lfsck_namespace *ns = com->lc_file_ram;
6320 down_write(&com->lc_sem);
6321 ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
6322 HALF_SEC - com->lc_time_last_checkpoint);
6323 ns->ln_time_last_checkpoint = cfs_time_current_sec();
6324 ns->ln_objs_checked_phase2 += com->lc_new_checked;
6325 com->lc_new_checked = 0;
6328 if (ns->ln_flags & LF_INCOMPLETE)
6329 ns->ln_status = LS_PARTIAL;
6331 ns->ln_status = LS_COMPLETED;
6332 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
6333 ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
6334 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
6335 ns->ln_success_count++;
6336 } else if (rc == 0) {
6337 if (lfsck->li_status != 0)
6338 ns->ln_status = lfsck->li_status;
6340 ns->ln_status = LS_STOPPED;
6342 ns->ln_status = LS_FAILED;
6345 rc = lfsck_namespace_store(env, com);
6346 up_write(&com->lc_sem);
6352 lfsck_namespace_assistant_sync_failures_interpret(const struct lu_env *env,
6353 struct ptlrpc_request *req,
6357 struct lfsck_async_interpret_args *laia = args;
6358 struct lfsck_tgt_desc *ltd = laia->laia_ltd;
6360 ltd->ltd_synced_failures = 1;
6367 * Notify remote LFSCK instances about former failures.
6369 * The local LFSCK instance has recorded which MDTs have ever failed to respond
6370 * some LFSCK verification requests (maybe because of network issues or the MDT
6371 * itself trouble). During the respond gap the MDT may missed some name entries
6372 * verification, then the MDT cannot know whether related MDT-objects have been
6373 * referenced by related name entries or not, then in the second-stage scanning,
6374 * these MDT-objects will be regarded as orphan, if the MDT-object contains bad
6375 * linkEA for back reference, then it will misguide the LFSCK to generate wrong
6376 * name entry for repairing the orphan.
6378 * To avoid above trouble, when layout LFSCK finishes the first-stage scanning,
6379 * it will scan the bitmap for the ever failed MDTs, and notify them that they
6380 * have ever missed some name entries verification and should skip the handling
6381 * for orphan MDT-objects.
6383 * \param[in] env pointer to the thread context
6384 * \param[in] com pointer to the lfsck component
6385 * \param[in] lr pointer to the lfsck request
6387 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
6388 struct lfsck_component *com,
6389 struct lfsck_request *lr)
6391 struct lfsck_async_interpret_args *laia =
6392 &lfsck_env_info(env)->lti_laia2;
6393 struct lfsck_assistant_data *lad = com->lc_data;
6394 struct lfsck_namespace *ns = com->lc_file_ram;
6395 struct lfsck_instance *lfsck = com->lc_lfsck;
6396 struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
6397 struct lfsck_tgt_desc *ltd;
6398 struct ptlrpc_request_set *set;
6403 if (!lad->lad_incomplete)
6406 set = ptlrpc_prep_set();
6408 GOTO(out, rc = -ENOMEM);
6410 lr->lr_flags2 = ns->ln_flags | LF_INCOMPLETE;
6411 memset(laia, 0, sizeof(*laia));
6412 lad->lad_touch_gen++;
6414 down_read(<ds->ltd_rw_sem);
6415 cfs_foreach_bit(lad->lad_bitmap, idx) {
6416 ltd = lfsck_ltd2tgt(ltds, idx);
6417 LASSERT(ltd != NULL);
6419 laia->laia_ltd = ltd;
6420 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
6421 lfsck_namespace_assistant_sync_failures_interpret,
6422 laia, LFSCK_NOTIFY);
6424 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
6425 "to sync failure with MDT %x: rc = %d\n",
6426 lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
6428 up_read(<ds->ltd_rw_sem);
6430 rc = ptlrpc_set_wait(set);
6431 ptlrpc_set_destroy(set);
6437 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
6438 "to sync failure with MDTs, and related MDTs "
6439 "may handle orphan improperly: rc = %d\n",
6440 lfsck_lfsck2name(lfsck), rc);
6445 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
6446 .la_handler_p1 = lfsck_namespace_assistant_handler_p1,
6447 .la_handler_p2 = lfsck_namespace_assistant_handler_p2,
6448 .la_fill_pos = lfsck_namespace_assistant_fill_pos,
6449 .la_double_scan_result = lfsck_namespace_double_scan_result,
6450 .la_req_fini = lfsck_namespace_assistant_req_fini,
6451 .la_sync_failures = lfsck_namespace_assistant_sync_failures,
6455 * Verify the specified linkEA entry for the given directory object.
6456 * If the object has no such linkEA entry or it has more other linkEA
6457 * entries, then re-generate the linkEA with the given information.
6459 * \param[in] env pointer to the thread context
6460 * \param[in] obj pointer to the dt_object to be handled
6461 * \param[in] cname the name for the child in the parent directory
6462 * \param[in] pfid the parent directory's FID for the linkEA
6464 * \retval 0 for success
6465 * \retval negative error number on failure
6467 int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj,
6468 const struct lu_name *cname, const struct lu_fid *pfid)
6470 struct dt_device *dev = lfsck_obj2dev(obj);
6471 struct linkea_data ldata = { NULL };
6472 struct lu_buf linkea_buf;
6475 int fl = LU_XATTR_CREATE;
6479 LASSERT(S_ISDIR(lfsck_object_type(obj)));
6481 rc = lfsck_links_read(env, obj, &ldata);
6482 if (rc == -ENODATA) {
6484 } else if (rc == 0) {
6485 fl = LU_XATTR_REPLACE;
6486 if (ldata.ld_leh->leh_reccount != 1) {
6489 rc = linkea_links_find(&ldata, cname, pfid);
6498 rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
6502 rc = linkea_add_buf(&ldata, cname, pfid);
6506 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
6507 ldata.ld_leh->leh_len);
6508 th = dt_trans_create(env, dev);
6510 RETURN(PTR_ERR(th));
6512 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
6513 XATTR_NAME_LINK, fl, th);
6517 rc = dt_trans_start_local(env, dev, th);
6521 dt_write_lock(env, obj, 0);
6522 rc = dt_xattr_set(env, obj, &linkea_buf,
6523 XATTR_NAME_LINK, fl, th);
6524 dt_write_unlock(env, obj);
6529 dt_trans_stop(env, dev, th);
6534 * Get the name and parent directory's FID from the first linkEA entry.
6536 * \param[in] env pointer to the thread context
6537 * \param[in] obj pointer to the object which get linkEA from
6538 * \param[out] name pointer to the buffer to hold the name
6539 * in the first linkEA entry
6540 * \param[out] pfid pointer to the buffer to hold the parent
6541 * directory's FID in the first linkEA entry
6543 * \retval 0 for success
6544 * \retval negative error number on failure
6546 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
6547 char *name, struct lu_fid *pfid)
6549 struct lu_name *cname = &lfsck_env_info(env)->lti_name;
6550 struct linkea_data ldata = { NULL };
6553 rc = lfsck_links_read(env, obj, &ldata);
6557 linkea_first_entry(&ldata);
6558 if (ldata.ld_lee == NULL)
6561 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
6562 /* To guarantee the 'name' is terminated with '0'. */
6563 memcpy(name, cname->ln_name, cname->ln_namelen);
6564 name[cname->ln_namelen] = 0;
6570 * Update the object's name entry with the given FID.
6572 * \param[in] env pointer to the thread context
6573 * \param[in] lfsck pointer to the lfsck instance
6574 * \param[in] dir pointer to the directory that holds
6576 * \param[in] name the name for the entry to be updated
6577 * \param[in] fid the new FID for the name entry referenced
6578 * \param[in] type the type for the name entry to be updated
6580 * \retval 0 for success
6581 * \retval negative error number on failure
6583 int lfsck_update_name_entry(const struct lu_env *env,
6584 struct lfsck_instance *lfsck,
6585 struct dt_object *dir, const char *name,
6586 const struct lu_fid *fid, __u32 type)
6588 struct lfsck_thread_info *info = lfsck_env_info(env);
6589 struct dt_insert_rec *rec = &info->lti_dt_rec;
6590 struct lfsck_lock_handle *llh = &info->lti_llh;
6591 struct dt_device *dev = lfsck_obj2dev(dir);
6597 rc = lfsck_lock(env, lfsck, dir, name, llh,
6598 MDS_INODELOCK_UPDATE, LCK_PW);
6602 th = dt_trans_create(env, dev);
6604 GOTO(unlock, rc = PTR_ERR(th));
6606 rc = dt_declare_delete(env, dir, (const struct dt_key *)name, th);
6610 rec->rec_type = type;
6612 rc = dt_declare_insert(env, dir, (const struct dt_rec *)rec,
6613 (const struct dt_key *)name, th);
6617 rc = dt_declare_ref_add(env, dir, th);
6621 rc = dt_trans_start_local(env, dev, th);
6625 rc = dt_delete(env, dir, (const struct dt_key *)name, th);
6626 if (rc == -ENOENT) {
6634 rc = dt_insert(env, dir, (const struct dt_rec *)rec,
6635 (const struct dt_key *)name, th, 1);
6636 if (rc == 0 && S_ISDIR(type) && !exists) {
6637 dt_write_lock(env, dir, 0);
6638 rc = dt_ref_add(env, dir, th);
6639 dt_write_unlock(env, dir);
6645 dt_trans_stop(env, dev, th);
6649 CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
6650 " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
6651 PFID(lfsck_dto2fid(dir)), name, PFID(fid), type, rc);
6656 int lfsck_namespace_setup(const struct lu_env *env,
6657 struct lfsck_instance *lfsck)
6659 struct lfsck_component *com;
6660 struct lfsck_namespace *ns;
6661 struct dt_object *root = NULL;
6662 struct dt_object *obj;
6667 LASSERT(lfsck->li_master);
6673 INIT_LIST_HEAD(&com->lc_link);
6674 INIT_LIST_HEAD(&com->lc_link_dir);
6675 init_rwsem(&com->lc_sem);
6676 atomic_set(&com->lc_ref, 1);
6677 com->lc_lfsck = lfsck;
6678 com->lc_type = LFSCK_TYPE_NAMESPACE;
6679 com->lc_ops = &lfsck_namespace_ops;
6680 com->lc_data = lfsck_assistant_data_init(
6681 &lfsck_namespace_assistant_ops,
6683 if (com->lc_data == NULL)
6684 GOTO(out, rc = -ENOMEM);
6686 com->lc_file_size = sizeof(struct lfsck_namespace);
6687 OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
6688 if (com->lc_file_ram == NULL)
6689 GOTO(out, rc = -ENOMEM);
6691 OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
6692 if (com->lc_file_disk == NULL)
6693 GOTO(out, rc = -ENOMEM);
6695 for (i = 0; i < LFSCK_STF_COUNT; i++)
6696 mutex_init(&com->lc_sub_trace_objs[i].lsto_mutex);
6698 root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
6700 GOTO(out, rc = PTR_ERR(root));
6702 if (unlikely(!dt_try_as_dir(env, root)))
6703 GOTO(out, rc = -ENOTDIR);
6705 obj = local_index_find_or_create(env, lfsck->li_los, root,
6707 S_IFREG | S_IRUGO | S_IWUSR,
6708 &dt_lfsck_features);
6710 GOTO(out, rc = PTR_ERR(obj));
6713 rc = lfsck_namespace_load(env, com);
6715 rc = lfsck_namespace_init(env, com);
6717 rc = lfsck_namespace_reset(env, com, true);
6719 rc = lfsck_namespace_load_sub_trace_files(env, com, false);
6723 ns = com->lc_file_ram;
6724 switch (ns->ln_status) {
6729 spin_lock(&lfsck->li_lock);
6730 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
6731 spin_unlock(&lfsck->li_lock);
6734 CERROR("%s: unknown lfsck_namespace status %d\n",
6735 lfsck_lfsck2name(lfsck), ns->ln_status);
6737 case LS_SCANNING_PHASE1:
6738 case LS_SCANNING_PHASE2:
6739 /* No need to store the status to disk right now.
6740 * If the system crashed before the status stored,
6741 * it will be loaded back when next time. */
6742 ns->ln_status = LS_CRASHED;
6746 spin_lock(&lfsck->li_lock);
6747 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
6748 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
6749 spin_unlock(&lfsck->li_lock);
6756 if (root != NULL && !IS_ERR(root))
6757 lfsck_object_put(env, root);
6759 lfsck_component_cleanup(env, com);
6760 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
6761 lfsck_lfsck2name(lfsck), rc);