4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2012, 2013, Intel Corporation.
26 * lustre/lfsck/lfsck_namespace.c
28 * Author: Fan, Yong <fan.yong@intel.com>
31 #define DEBUG_SUBSYSTEM S_LFSCK
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
42 #include "lfsck_internal.h"
44 #define LFSCK_NAMESPACE_MAGIC 0xA0629D03
46 enum lfsck_nameentry_check {
47 LFSCK_NAMEENTRY_DEAD = 1, /* The object has been unlinked. */
48 LFSCK_NAMEENTRY_REMOVED = 2, /* The entry has been removed. */
49 LFSCK_NAMEENTRY_RECREATED = 3, /* The entry has been recreated. */
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
54 static struct lfsck_namespace_req *
55 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
56 struct lu_dirent *ent, __u16 type)
58 struct lfsck_namespace_req *lnr;
61 size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
64 return ERR_PTR(-ENOMEM);
66 INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
67 lu_object_get(&lfsck->li_obj_dir->do_lu);
68 lnr->lnr_obj = lfsck->li_obj_dir;
69 lnr->lnr_fid = ent->lde_fid;
70 lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
71 lnr->lnr_dir_cookie = ent->lde_hash;
72 lnr->lnr_attr = ent->lde_attrs;
75 lnr->lnr_namelen = ent->lde_namelen;
76 memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
81 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
82 struct lfsck_assistant_req *lar)
84 struct lfsck_namespace_req *lnr =
85 container_of0(lar, struct lfsck_namespace_req, lnr_lar);
87 lu_object_put(env, &lnr->lnr_obj->do_lu);
88 OBD_FREE(lnr, lnr->lnr_size);
91 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
92 struct lfsck_namespace *src)
94 dst->ln_magic = le32_to_cpu(src->ln_magic);
95 dst->ln_status = le32_to_cpu(src->ln_status);
96 dst->ln_flags = le32_to_cpu(src->ln_flags);
97 dst->ln_success_count = le32_to_cpu(src->ln_success_count);
98 dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
99 dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
100 dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
101 dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
102 dst->ln_time_last_checkpoint =
103 le64_to_cpu(src->ln_time_last_checkpoint);
104 lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
105 &src->ln_pos_latest_start);
106 lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
107 &src->ln_pos_last_checkpoint);
108 lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
109 &src->ln_pos_first_inconsistent);
110 dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
111 dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
112 dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
113 dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
114 dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
115 dst->ln_objs_repaired_phase2 =
116 le64_to_cpu(src->ln_objs_repaired_phase2);
117 dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
118 dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
119 fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
120 &src->ln_fid_latest_scanned_phase2);
121 dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
122 dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
123 dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
124 dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
125 dst->ln_unknown_inconsistency =
126 le64_to_cpu(src->ln_unknown_inconsistency);
127 dst->ln_unmatched_pairs_repaired =
128 le64_to_cpu(src->ln_unmatched_pairs_repaired);
129 dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
130 dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
131 dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
132 dst->ln_lost_dirent_repaired =
133 le64_to_cpu(src->ln_lost_dirent_repaired);
136 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
137 struct lfsck_namespace *src)
139 dst->ln_magic = cpu_to_le32(src->ln_magic);
140 dst->ln_status = cpu_to_le32(src->ln_status);
141 dst->ln_flags = cpu_to_le32(src->ln_flags);
142 dst->ln_success_count = cpu_to_le32(src->ln_success_count);
143 dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
144 dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
145 dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
146 dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
147 dst->ln_time_last_checkpoint =
148 cpu_to_le64(src->ln_time_last_checkpoint);
149 lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
150 &src->ln_pos_latest_start);
151 lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
152 &src->ln_pos_last_checkpoint);
153 lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
154 &src->ln_pos_first_inconsistent);
155 dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
156 dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
157 dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
158 dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
159 dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
160 dst->ln_objs_repaired_phase2 =
161 cpu_to_le64(src->ln_objs_repaired_phase2);
162 dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
163 dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
164 fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
165 &src->ln_fid_latest_scanned_phase2);
166 dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
167 dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
168 dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
169 dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
170 dst->ln_unknown_inconsistency =
171 cpu_to_le64(src->ln_unknown_inconsistency);
172 dst->ln_unmatched_pairs_repaired =
173 cpu_to_le64(src->ln_unmatched_pairs_repaired);
174 dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
175 dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
176 dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
177 dst->ln_lost_dirent_repaired =
178 cpu_to_le64(src->ln_lost_dirent_repaired);
181 static void lfsck_namespace_record_failure(const struct lu_env *env,
182 struct lfsck_instance *lfsck,
183 struct lfsck_namespace *ns)
185 struct lfsck_position pos;
187 ns->ln_items_failed++;
188 lfsck_pos_fill(env, lfsck, &pos, false);
189 if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
190 lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
191 ns->ln_pos_first_inconsistent = pos;
193 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
194 "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
195 lfsck_lfsck2name(lfsck),
196 ns->ln_pos_first_inconsistent.lp_oit_cookie,
197 PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
198 ns->ln_pos_first_inconsistent.lp_dir_cookie);
203 * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
204 * \retval 0: succeed.
205 * \retval -ve: failed cases.
207 static int lfsck_namespace_load(const struct lu_env *env,
208 struct lfsck_component *com)
210 int len = com->lc_file_size;
213 rc = dt_xattr_get(env, com->lc_obj,
214 lfsck_buf_get(env, com->lc_file_disk, len),
215 XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
217 struct lfsck_namespace *ns = com->lc_file_ram;
219 lfsck_namespace_le_to_cpu(ns,
220 (struct lfsck_namespace *)com->lc_file_disk);
221 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
222 CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
223 "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
224 ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
229 } else if (rc != -ENODATA) {
230 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
231 "expected = %d: rc = %d\n",
232 lfsck_lfsck2name(com->lc_lfsck), len, rc);
239 static int lfsck_namespace_store(const struct lu_env *env,
240 struct lfsck_component *com, bool init)
242 struct dt_object *obj = com->lc_obj;
243 struct lfsck_instance *lfsck = com->lc_lfsck;
244 struct thandle *handle;
245 int len = com->lc_file_size;
249 lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
250 (struct lfsck_namespace *)com->lc_file_ram);
251 handle = dt_trans_create(env, lfsck->li_bottom);
253 GOTO(log, rc = PTR_ERR(handle));
255 rc = dt_declare_xattr_set(env, obj,
256 lfsck_buf_get(env, com->lc_file_disk, len),
257 XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
261 rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
265 rc = dt_xattr_set(env, obj,
266 lfsck_buf_get(env, com->lc_file_disk, len),
267 XATTR_NAME_LFSCK_NAMESPACE,
268 init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
269 handle, BYPASS_CAPA);
274 dt_trans_stop(env, lfsck->li_bottom, handle);
278 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
279 lfsck_lfsck2name(lfsck), rc);
283 static int lfsck_namespace_init(const struct lu_env *env,
284 struct lfsck_component *com)
286 struct lfsck_namespace *ns = com->lc_file_ram;
289 memset(ns, 0, sizeof(*ns));
290 ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
291 ns->ln_status = LS_INIT;
292 down_write(&com->lc_sem);
293 rc = lfsck_namespace_store(env, com, true);
294 up_write(&com->lc_sem);
299 * Update the namespace LFSCK tracing file for the given @fid
301 * \param[in] env pointer to the thread context
302 * \param[in] com pointer to the lfsck component
303 * \param[in] fid the fid which flags to be updated in the lfsck
305 * \param[in] add true if add new flags, otherwise remove flags
307 * \retval 0 for succeed or nothing to be done
308 * \retval negative error number on failure
310 int lfsck_namespace_trace_update(const struct lu_env *env,
311 struct lfsck_component *com,
312 const struct lu_fid *fid,
313 const __u8 flags, bool add)
315 struct lfsck_instance *lfsck = com->lc_lfsck;
316 struct dt_object *obj = com->lc_obj;
317 struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
318 struct dt_device *dev = lfsck->li_bottom;
319 struct thandle *th = NULL;
327 down_write(&com->lc_sem);
328 fid_cpu_to_be(key, fid);
329 rc = dt_lookup(env, obj, (struct dt_rec *)&old,
330 (const struct dt_key *)key, BYPASS_CAPA);
333 GOTO(unlock, rc = 0);
337 } else if (rc == 0) {
339 if ((old & flags) == flags)
340 GOTO(unlock, rc = 0);
344 if ((old & flags) == 0)
345 GOTO(unlock, rc = 0);
353 th = dt_trans_create(env, dev);
355 GOTO(log, rc = PTR_ERR(th));
358 rc = dt_declare_delete(env, obj,
359 (const struct dt_key *)key, th);
365 rc = dt_declare_insert(env, obj,
366 (const struct dt_rec *)&new,
367 (const struct dt_key *)key, th);
372 rc = dt_trans_start_local(env, dev, th);
377 rc = dt_delete(env, obj, (const struct dt_key *)key,
384 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
385 (const struct dt_key *)key, th, BYPASS_CAPA, 1);
393 if (th != NULL && !IS_ERR(th))
394 dt_trans_stop(env, dev, th);
396 CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
397 "tracing file, flags %x, old %x, new %x: rc = %d\n",
398 lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
399 (__u32)flags, (__u32)old, (__u32)new, rc);
402 up_write(&com->lc_sem);
407 static int lfsck_namespace_check_exist(const struct lu_env *env,
408 struct dt_object *dir,
409 struct dt_object *obj, const char *name)
411 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
415 if (unlikely(lfsck_is_dead_obj(obj)))
416 RETURN(LFSCK_NAMEENTRY_DEAD);
418 rc = dt_lookup(env, dir, (struct dt_rec *)fid,
419 (const struct dt_key *)name, BYPASS_CAPA);
421 RETURN(LFSCK_NAMEENTRY_REMOVED);
426 if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
427 RETURN(LFSCK_NAMEENTRY_RECREATED);
432 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
433 struct dt_object *obj,
434 struct thandle *handle)
438 /* For destroying all invalid linkEA entries. */
439 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
443 /* For insert new linkEA entry. */
444 rc = dt_declare_xattr_set(env, obj,
445 lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
446 XATTR_NAME_LINK, 0, handle);
450 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
451 struct linkea_data *ldata)
455 if (ldata->ld_buf->lb_buf == NULL)
458 if (!dt_object_exists(obj))
461 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
463 /* Buf was too small, figure out what we need. */
464 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
469 lu_buf_realloc(ldata->ld_buf, rc);
470 if (ldata->ld_buf->lb_buf == NULL)
473 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
478 rc = linkea_init(ldata);
484 * Remove linkEA for the given object.
486 * The caller should take the ldlm lock before the calling.
488 * \param[in] env pointer to the thread context
489 * \param[in] com pointer to the lfsck component
490 * \param[in] obj pointer to the dt_object to be handled
492 * \retval 0 for repaired cases
493 * \retval negative error number on failure
495 static int lfsck_namespace_links_remove(const struct lu_env *env,
496 struct lfsck_component *com,
497 struct dt_object *obj)
499 struct lfsck_instance *lfsck = com->lc_lfsck;
500 struct dt_device *dev = lfsck->li_bottom;
501 struct thandle *th = NULL;
505 LASSERT(dt_object_remote(obj) == 0);
507 th = dt_trans_create(env, dev);
509 GOTO(log, rc = PTR_ERR(th));
511 rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
515 rc = dt_trans_start_local(env, dev, th);
519 dt_write_lock(env, obj, 0);
520 if (unlikely(lfsck_is_dead_obj(obj)))
521 GOTO(unlock, rc = -ENOENT);
523 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
524 GOTO(unlock, rc = 0);
526 rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
531 dt_write_unlock(env, obj);
534 dt_trans_stop(env, dev, th);
537 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
538 "for the object "DFID": rc = %d\n",
539 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
542 struct lfsck_namespace *ns = com->lc_file_ram;
544 ns->ln_flags |= LF_INCONSISTENT;
550 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
551 struct linkea_data *ldata, struct thandle *handle)
553 const struct lu_buf *buf = lfsck_buf_get_const(env,
554 ldata->ld_buf->lb_buf,
555 ldata->ld_leh->leh_len);
557 return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
561 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
562 struct lu_name *cname,
566 linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
567 /* To guarantee the 'name' is terminated with '0'. */
568 memcpy(buf, cname->ln_name, cname->ln_namelen);
569 buf[cname->ln_namelen] = 0;
570 cname->ln_name = buf;
573 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
574 struct lu_name *cname,
578 struct link_ea_entry *oldlee;
582 oldlee = ldata->ld_lee;
583 oldlen = ldata->ld_reclen;
584 linkea_next_entry(ldata);
585 while (ldata->ld_lee != NULL) {
586 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
587 ldata->ld_lee->lee_reclen[1];
588 if (unlikely(ldata->ld_reclen == oldlen &&
589 memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
594 linkea_del_buf(ldata, cname);
596 linkea_next_entry(ldata);
599 ldata->ld_lee = oldlee;
600 ldata->ld_reclen = oldlen;
606 * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
608 * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
609 * with the given type to generate the name, the detailed rules for name
610 * have been described as following.
612 * The function also generates the linkEA corresponding to the name entry
613 * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
615 * \param[in] env pointer to the thread context
616 * \param[in] com pointer to the lfsck component
617 * \param[in] orphan pointer to the orphan MDT-object
618 * \param[in] infix additional information for the orphan name, such as
619 * the FID for original
620 * \param[in] type the type for describing why the orphan MDT-object is
621 * created. The rules are as following:
623 * type "D": The MDT-object is a directory, it may knows its parent
624 * but because there is no valid linkEA, the LFSCK cannot
625 * know where to put it back to the namespace.
626 * type "O": The MDT-object has no linkEA, and there is no name
627 * entry that references the MDT-object.
629 * \see lfsck_layout_recreate_parent() for more types.
631 * The orphan name will be like:
632 * ${FID}-${infix}-${type}-${conflict_version}
634 * \param[out] count if some others inserted some linkEA entries by race,
635 * then return the linkEA entries count.
637 * \retval positive number for repaired cases
638 * \retval 0 if needs to repair nothing
639 * \retval negative error number on failure
641 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
642 struct lfsck_component *com,
643 struct dt_object *orphan,
644 const char *infix, const char *type,
647 struct lfsck_thread_info *info = lfsck_env_info(env);
648 struct lu_name *cname = &info->lti_name;
649 struct dt_insert_rec *rec = &info->lti_dt_rec;
650 struct lu_fid *tfid = &info->lti_fid5;
651 const struct lu_fid *cfid = lfsck_dto2fid(orphan);
652 const struct lu_fid *pfid;
653 struct lfsck_instance *lfsck = com->lc_lfsck;
654 struct dt_device *dev = lfsck->li_bottom;
655 struct dt_object *parent;
656 struct thandle *th = NULL;
657 struct lustre_handle plh = { 0 };
658 struct lustre_handle clh = { 0 };
659 struct linkea_data ldata = { 0 };
660 struct lu_buf linkea_buf;
667 cname->ln_name = NULL;
668 /* Create .lustre/lost+found/MDTxxxx when needed. */
669 if (unlikely(lfsck->li_lpf_obj == NULL)) {
670 rc = lfsck_create_lpf(env, lfsck);
675 parent = lfsck->li_lpf_obj;
676 pfid = lfsck_dto2fid(parent);
678 /* Hold update lock on the parent to prevent others to access. */
679 rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
680 MDS_INODELOCK_UPDATE, LCK_EX);
685 namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
686 PFID(cfid), infix, type, idx++);
687 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
688 (const struct dt_key *)info->lti_key,
690 if (rc != 0 && rc != -ENOENT)
693 if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid)))
695 } while (rc == 0 && !exist);
697 cname->ln_name = info->lti_key;
698 cname->ln_namelen = namelen;
699 rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
703 rc = linkea_add_buf(&ldata, cname, pfid);
707 rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
708 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
713 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
714 ldata.ld_leh->leh_len);
715 th = dt_trans_create(env, dev);
717 GOTO(log, rc = PTR_ERR(th));
719 if (S_ISDIR(lfsck_object_type(orphan))) {
720 rc = dt_declare_delete(env, orphan,
721 (const struct dt_key *)dotdot, th);
725 rec->rec_type = S_IFDIR;
727 rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
728 (const struct dt_key *)dotdot, th);
733 rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
734 XATTR_NAME_LINK, 0, th);
739 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
741 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
742 (const struct dt_key *)cname->ln_name,
747 if (S_ISDIR(rec->rec_type)) {
748 rc = dt_declare_ref_add(env, parent, th);
754 rc = dt_trans_start_local(env, dev, th);
758 dt_write_lock(env, orphan, 0);
759 rc = lfsck_links_read(env, orphan, &ldata);
760 if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
761 (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
762 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
763 GOTO(unlock, rc = 1);
765 if (S_ISDIR(lfsck_object_type(orphan))) {
766 rc = dt_delete(env, orphan,
767 (const struct dt_key *)dotdot, th,
772 rec->rec_type = S_IFDIR;
774 rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
775 (const struct dt_key *)dotdot, th,
781 rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
784 if (rc == 0 && count != NULL)
785 *count = ldata.ld_leh->leh_reccount;
789 dt_write_unlock(env, orphan);
791 if (rc == 0 && !exist) {
792 rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
794 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
795 (const struct dt_key *)cname->ln_name,
797 if (rc == 0 && S_ISDIR(rec->rec_type)) {
798 dt_write_lock(env, parent, 0);
799 rc = dt_ref_add(env, parent, th);
800 dt_write_unlock(env, parent);
804 GOTO(stop, rc = (rc == 0 ? 1 : rc));
807 dt_write_unlock(env, orphan);
810 dt_trans_stop(env, dev, th);
813 lfsck_ibits_unlock(&clh, LCK_EX);
814 lfsck_ibits_unlock(&plh, LCK_EX);
815 CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
816 "object "DFID", name = %s: rc = %d\n",
817 lfsck_lfsck2name(lfsck), PFID(cfid),
818 cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
821 struct lfsck_namespace *ns = com->lc_file_ram;
823 ns->ln_flags |= LF_INCONSISTENT;
830 * Add the specified name entry back to namespace.
832 * If there is a linkEA entry that back references a name entry under
833 * some parent directory, but such parent directory does not have the
834 * claimed name entry. On the other hand, the linkEA entries count is
835 * not larger than the MDT-object's hard link count. Under such case,
836 * it is quite possible that the name entry is lost. Then the LFSCK
837 * should add the name entry back to the namespace.
839 * \param[in] env pointer to the thread context
840 * \param[in] com pointer to the lfsck component
841 * \param[in] parent pointer to the directory under which the name entry
842 * will be inserted into
843 * \param[in] child pointer to the object referenced by the name entry
844 * that to be inserted into the parent
845 * \param[in] name the name for the child in the parent directory
847 * \retval positive number for repaired cases
848 * \retval 0 if nothing to be repaired
849 * \retval negative error number on failure
851 static int lfsck_namespace_insert_normal(const struct lu_env *env,
852 struct lfsck_component *com,
853 struct dt_object *parent,
854 struct dt_object *child,
857 struct lfsck_thread_info *info = lfsck_env_info(env);
858 struct lu_attr *la = &info->lti_la;
859 struct dt_insert_rec *rec = &info->lti_dt_rec;
860 struct lfsck_instance *lfsck = com->lc_lfsck;
861 struct dt_device *dev = lfsck->li_next;
862 struct thandle *th = NULL;
863 struct lustre_handle lh = { 0 };
867 if (unlikely(!dt_try_as_dir(env, parent)))
868 GOTO(log, rc = -ENOTDIR);
870 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
873 /* Hold update lock on the parent to prevent others to access. */
874 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
875 MDS_INODELOCK_UPDATE, LCK_EX);
879 th = dt_trans_create(env, dev);
881 GOTO(unlock, rc = PTR_ERR(th));
883 rec->rec_type = lfsck_object_type(child) & S_IFMT;
884 rec->rec_fid = lfsck_dto2fid(child);
885 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
886 (const struct dt_key *)name, th);
890 if (S_ISDIR(rec->rec_type)) {
891 rc = dt_declare_ref_add(env, parent, th);
896 memset(la, 0, sizeof(*la));
897 la->la_ctime = cfs_time_current_sec();
898 la->la_valid = LA_CTIME;
899 rc = dt_declare_attr_set(env, parent, la, th);
903 rc = dt_trans_start_local(env, dev, th);
907 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
908 (const struct dt_key *)name, th, BYPASS_CAPA, 1);
912 if (S_ISDIR(rec->rec_type)) {
913 dt_write_lock(env, parent, 0);
914 rc = dt_ref_add(env, parent, th);
915 dt_write_unlock(env, parent);
920 la->la_ctime = cfs_time_current_sec();
921 rc = dt_attr_set(env, parent, la, th, BYPASS_CAPA);
923 GOTO(stop, rc = (rc == 0 ? 1 : rc));
926 dt_trans_stop(env, dev, th);
929 lfsck_ibits_unlock(&lh, LCK_EX);
932 CDEBUG(D_LFSCK, "%s: namespace LFSCK insert object "DFID" with "
933 "the name %s and type %o to the parent "DFID": rc = %d\n",
934 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), name,
935 lfsck_object_type(child) & S_IFMT,
936 PFID(lfsck_dto2fid(parent)), rc);
939 struct lfsck_namespace *ns = com->lc_file_ram;
941 ns->ln_flags |= LF_INCONSISTENT;
943 ns->ln_lost_dirent_repaired++;
949 static int lfsck_namespace_create_orphan(const struct lu_env *env,
950 struct lfsck_component *com,
951 struct dt_object *orphan)
958 * Remove the specified entry from the linkEA.
960 * Locate the linkEA entry with the given @cname and @pfid, then
961 * remove this entry or the other entries those are repeated with
964 * \param[in] env pointer to the thread context
965 * \param[in] com pointer to the lfsck component
966 * \param[in] obj pointer to the dt_object to be handled
967 * \param[in,out]ldata pointer to the buffer that holds the linkEA
968 * \param[in] cname the name for the child in the parent directory
969 * \param[in] pfid the parent directory's FID for the linkEA
970 * \param[in] next if true, then remove the first found linkEA
971 * entry, and move the ldata->ld_lee to next entry
973 * \retval positive number for repaired cases
974 * \retval 0 if nothing to be repaired
975 * \retval negative error number on failure
977 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
978 struct lfsck_component *com,
979 struct dt_object *obj,
980 struct linkea_data *ldata,
981 struct lu_name *cname,
985 struct lfsck_instance *lfsck = com->lc_lfsck;
986 struct dt_device *dev = lfsck->li_bottom;
987 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
988 struct thandle *th = NULL;
989 struct lustre_handle lh = { 0 };
990 struct linkea_data ldata_new = { 0 };
991 struct lu_buf linkea_buf;
995 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
996 MDS_INODELOCK_UPDATE |
997 MDS_INODELOCK_XATTR, LCK_EX);
1002 linkea_del_buf(ldata, cname);
1004 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
1006 lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1007 ldata->ld_leh->leh_len);
1010 th = dt_trans_create(env, dev);
1012 GOTO(unlock1, rc = PTR_ERR(th));
1014 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1015 XATTR_NAME_LINK, 0, th);
1019 rc = dt_trans_start_local(env, dev, th);
1023 dt_write_lock(env, obj, 0);
1024 if (unlikely(lfsck_is_dead_obj(obj)))
1025 GOTO(unlock2, rc = -ENOENT);
1027 rc = lfsck_links_read2(env, obj, &ldata_new);
1031 /* The specified linkEA entry has been removed by race. */
1032 rc = linkea_links_find(&ldata_new, cname, pfid);
1034 GOTO(unlock2, rc = 0);
1036 if (bk->lb_param & LPF_DRYRUN)
1037 GOTO(unlock2, rc = 1);
1040 linkea_del_buf(&ldata_new, cname);
1042 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
1045 if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
1046 dt_write_unlock(env, obj);
1047 dt_trans_stop(env, dev, th);
1048 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1049 ldata_new.ld_leh->leh_len);
1053 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
1054 ldata_new.ld_leh->leh_len);
1055 rc = dt_xattr_set(env, obj, &linkea_buf,
1056 XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1058 GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1061 dt_write_unlock(env, obj);
1064 dt_trans_stop(env, dev, th);
1067 lfsck_ibits_unlock(&lh, LCK_EX);
1070 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
1071 "for the object: "DFID", parent "DFID", name %.*s\n",
1072 lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
1073 PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
1077 struct lfsck_namespace *ns = com->lc_file_ram;
1079 ns->ln_flags |= LF_INCONSISTENT;
1086 * Conditionally remove the specified entry from the linkEA.
1088 * Take the parent lock firstly, then check whether the specified
1089 * name entry exists or not: if yes, do nothing; otherwise, call
1090 * lfsck_namespace_shrink_linkea() to remove the linkea entry.
1092 * \param[in] env pointer to the thread context
1093 * \param[in] com pointer to the lfsck component
1094 * \param[in] parent pointer to the parent directory
1095 * \param[in] child pointer to the child object that holds the linkEA
1096 * \param[in,out]ldata pointer to the buffer that holds the linkEA
1097 * \param[in] cname the name for the child in the parent directory
1098 * \param[in] pfid the parent directory's FID for the linkEA
1100 * \retval positive number for repaired cases
1101 * \retval 0 if nothing to be repaired
1102 * \retval negative error number on failure
1104 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
1105 struct lfsck_component *com,
1106 struct dt_object *parent,
1107 struct dt_object *child,
1108 struct linkea_data *ldata,
1109 struct lu_name *cname,
1110 struct lu_fid *pfid)
1112 struct lu_fid *cfid = &lfsck_env_info(env)->lti_fid3;
1113 struct lustre_handle lh = { 0 };
1117 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
1118 MDS_INODELOCK_UPDATE, LCK_EX);
1122 dt_read_lock(env, parent, 0);
1123 if (unlikely(lfsck_is_dead_obj(parent))) {
1124 dt_read_unlock(env, parent);
1125 lfsck_ibits_unlock(&lh, LCK_EX);
1126 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1132 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1133 (const struct dt_key *)cname->ln_name,
1135 dt_read_unlock(env, parent);
1137 /* It is safe to release the ldlm lock, because when the logic come
1138 * here, we have got all the needed information above whether the
1139 * linkEA entry is valid or not. It is not important that others
1140 * may add new linkEA entry after the ldlm lock released. If other
1141 * has removed the specified linkEA entry by race, then it is OK,
1142 * because the subsequent lfsck_namespace_shrink_linkea() can handle
1144 lfsck_ibits_unlock(&lh, LCK_EX);
1145 if (rc == -ENOENT) {
1146 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
1155 /* The LFSCK just found some internal status of cross-MDTs
1156 * create operation. That is normal. */
1157 if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
1158 linkea_next_entry(ldata);
1163 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
1170 * Conditionally replace name entry in the parent.
1172 * As required, the LFSCK may re-create the lost MDT-object for dangling
1173 * name entry, but such repairing may be wrong because of bad FID in the
1174 * name entry. As the LFSCK processing, the real MDT-object may be found,
1175 * then the LFSCK should check whether the former re-created MDT-object
1176 * has been modified or not, if not, then destroy it and update the name
1177 * entry in the parent to reference the real MDT-object.
1179 * \param[in] env pointer to the thread context
1180 * \param[in] com pointer to the lfsck component
1181 * \param[in] parent pointer to the parent directory
1182 * \param[in] child pointer to the MDT-object that may be the real
1183 * MDT-object corresponding to the name entry in parent
1184 * \param[in] cfid the current FID in the name entry
1185 * \param[in] cname contains the name of the child in the parent directory
1187 * \retval positive number for repaired cases
1188 * \retval 0 if nothing to be repaired
1189 * \retval negative error number on failure
1191 static int lfsck_namespace_replace_cond(const struct lu_env *env,
1192 struct lfsck_component *com,
1193 struct dt_object *parent,
1194 struct dt_object *child,
1195 const struct lu_fid *cfid,
1196 const struct lu_name *cname)
1198 struct lfsck_thread_info *info = lfsck_env_info(env);
1199 struct lu_fid *tfid = &info->lti_fid5;
1200 struct lu_attr *la = &info->lti_la;
1201 struct dt_insert_rec *rec = &info->lti_dt_rec;
1202 struct lfsck_instance *lfsck = com->lc_lfsck;
1203 struct dt_device *dev = lfsck->li_next;
1204 const char *name = cname->ln_name;
1205 struct dt_object *obj = NULL;
1206 struct lustre_handle plh = { 0 };
1207 struct lustre_handle clh = { 0 };
1208 struct linkea_data ldata = { 0 };
1209 struct thandle *th = NULL;
1214 rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
1215 MDS_INODELOCK_UPDATE, LCK_EX);
1219 if (!fid_is_sane(cfid)) {
1224 obj = lfsck_object_find(env, lfsck, cfid);
1227 if (rc == -ENOENT) {
1235 if (!dt_object_exists(obj)) {
1240 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1241 (const struct dt_key *)name, BYPASS_CAPA);
1242 if (rc == -ENOENT) {
1250 /* Someone changed the name entry, cannot replace it. */
1251 if (!lu_fid_eq(cfid, tfid))
1254 /* lock the object to be destroyed. */
1255 rc = lfsck_ibits_lock(env, lfsck, obj, &clh,
1256 MDS_INODELOCK_UPDATE |
1257 MDS_INODELOCK_XATTR, LCK_EX);
1261 if (unlikely(lfsck_is_dead_obj(obj))) {
1266 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1270 /* The object has been modified by other(s), or it is not created by
1271 * LFSCK, the two cases are indistinguishable. So cannot replace it. */
1272 if (la->la_ctime != 0)
1275 if (S_ISREG(la->la_mode)) {
1276 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
1278 /* If someone has created related OST-object(s),
1280 if ((rc > 0) || (rc < 0 && rc != -ENODATA))
1281 GOTO(log, rc = (rc > 0 ? 0 : rc));
1285 dt_read_lock(env, child, 0);
1286 rc = lfsck_links_read2(env, child, &ldata);
1287 dt_read_unlock(env, child);
1289 /* Someone changed the child, no need to replace. */
1296 rc = linkea_links_find(&ldata, cname, lfsck_dto2fid(parent));
1297 /* Someone moved the child, no need to replace. */
1301 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1304 th = dt_trans_create(env, dev);
1306 GOTO(log, rc = PTR_ERR(th));
1309 rc = dt_declare_destroy(env, obj, th);
1314 rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1318 rec->rec_type = S_IFDIR;
1319 rec->rec_fid = lfsck_dto2fid(child);
1320 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
1321 (const struct dt_key *)name, th);
1325 rc = dt_trans_start(env, dev, th);
1330 rc = dt_destroy(env, obj, th);
1335 /* The old name entry maybe not exist. */
1336 dt_delete(env, parent, (const struct dt_key *)name, th,
1339 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
1340 (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1342 GOTO(stop, rc = (rc == 0 ? 1 : rc));
1345 dt_trans_stop(env, dev, th);
1348 lfsck_ibits_unlock(&clh, LCK_EX);
1349 lfsck_ibits_unlock(&plh, LCK_EX);
1350 if (obj != NULL && !IS_ERR(obj))
1351 lfsck_object_put(env, obj);
1353 CDEBUG(D_LFSCK, "%s: namespace LFSCK conditionally destroy the "
1354 "object "DFID" because of conflict with the object "DFID
1355 " under the parent "DFID" with name %s: rc = %d\n",
1356 lfsck_lfsck2name(lfsck), PFID(cfid),
1357 PFID(lfsck_dto2fid(child)), PFID(lfsck_dto2fid(parent)),
1364 * Overwrite the linkEA for the object with the given ldata.
1366 * The caller should take the ldlm lock before the calling.
1368 * \param[in] env pointer to the thread context
1369 * \param[in] com pointer to the lfsck component
1370 * \param[in] obj pointer to the dt_object to be handled
1371 * \param[in] ldata pointer to the new linkEA data
1373 * \retval positive number for repaired cases
1374 * \retval 0 if nothing to be repaired
1375 * \retval negative error number on failure
1377 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
1378 struct lfsck_component *com,
1379 struct dt_object *obj,
1380 struct linkea_data *ldata)
1382 struct lfsck_instance *lfsck = com->lc_lfsck;
1383 struct dt_device *dev = lfsck->li_bottom;
1384 struct thandle *th = NULL;
1385 struct lu_buf linkea_buf;
1389 LASSERT(!dt_object_remote(obj));
1391 th = dt_trans_create(env, dev);
1393 GOTO(log, rc = PTR_ERR(th));
1395 lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
1396 ldata->ld_leh->leh_len);
1397 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1398 XATTR_NAME_LINK, 0, th);
1402 rc = dt_trans_start_local(env, dev, th);
1406 dt_write_lock(env, obj, 0);
1407 if (unlikely(lfsck_is_dead_obj(obj)))
1408 GOTO(unlock, rc = 0);
1410 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1411 GOTO(unlock, rc = 1);
1413 rc = dt_xattr_set(env, obj, &linkea_buf,
1414 XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1416 GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1419 dt_write_unlock(env, obj);
1422 dt_trans_stop(env, dev, th);
1425 CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
1426 "object "DFID": rc = %d\n",
1427 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
1430 struct lfsck_namespace *ns = com->lc_file_ram;
1432 ns->ln_flags |= LF_INCONSISTENT;
1439 * Repair invalid name entry.
1441 * If the name entry contains invalid information, such as bad file type
1442 * or (and) corrupted object FID, then either remove the name entry or
1443 * udpate the name entry with the given (right) information.
1445 * \param[in] env pointer to the thread context
1446 * \param[in] com pointer to the lfsck component
1447 * \param[in] parent pointer to the parent directory
1448 * \param[in] child pointer to the object referenced by the name entry
1449 * \param[in] name the old name of the child under the parent directory
1450 * \param[in] name2 the new name of the child under the parent directory
1451 * \param[in] type the type claimed by the name entry
1452 * \param[in] update update the name entry if true; otherwise, remove it
1453 * \param[in] dec decrease the parent nlink count if true
1455 * \retval positive number for repaired successfully
1456 * \retval 0 if nothing to be repaired
1457 * \retval negative error number on failure
1459 int lfsck_namespace_repair_dirent(const struct lu_env *env,
1460 struct lfsck_component *com,
1461 struct dt_object *parent,
1462 struct dt_object *child,
1463 const char *name, const char *name2,
1464 __u16 type, bool update, bool dec)
1466 struct lfsck_thread_info *info = lfsck_env_info(env);
1467 struct dt_insert_rec *rec = &info->lti_dt_rec;
1468 const struct lu_fid *cfid = lfsck_dto2fid(child);
1469 struct lu_fid *tfid = &info->lti_fid5;
1470 struct lfsck_instance *lfsck = com->lc_lfsck;
1471 struct dt_device *dev = lfsck->li_next;
1472 struct thandle *th = NULL;
1473 struct lustre_handle lh = { 0 };
1477 if (unlikely(!dt_try_as_dir(env, parent)))
1478 GOTO(log, rc = -ENOTDIR);
1480 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1481 MDS_INODELOCK_UPDATE, LCK_EX);
1485 th = dt_trans_create(env, dev);
1487 GOTO(unlock1, rc = PTR_ERR(th));
1489 rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1494 rec->rec_type = lfsck_object_type(child) & S_IFMT;
1495 rec->rec_fid = cfid;
1496 rc = dt_declare_insert(env, parent,
1497 (const struct dt_rec *)rec,
1498 (const struct dt_key *)name2, th);
1504 rc = dt_declare_ref_del(env, parent, th);
1509 rc = dt_trans_start(env, dev, th);
1513 dt_write_lock(env, parent, 0);
1514 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1515 (const struct dt_key *)name, BYPASS_CAPA);
1516 /* Someone has removed the bad name entry by race. */
1518 GOTO(unlock2, rc = 0);
1523 /* Someone has removed the bad name entry and reused it for other
1524 * object by race. */
1525 if (!lu_fid_eq(tfid, cfid))
1526 GOTO(unlock2, rc = 0);
1528 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1529 GOTO(unlock2, rc = 1);
1531 rc = dt_delete(env, parent, (const struct dt_key *)name, th,
1537 rc = dt_insert(env, parent,
1538 (const struct dt_rec *)rec,
1539 (const struct dt_key *)name2, th,
1546 rc = dt_ref_del(env, parent, th);
1551 GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
1554 dt_write_unlock(env, parent);
1557 dt_trans_stop(env, dev, th);
1559 /* We are not sure whether the child will become orphan or not.
1560 * Record it in the LFSCK tracing file for further checking in
1561 * the second-stage scanning. */
1562 if (!update && !dec && rc == 0)
1563 lfsck_namespace_trace_update(env, com, cfid,
1564 LNTF_CHECK_LINKEA, true);
1567 lfsck_ibits_unlock(&lh, LCK_EX);
1570 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found bad name "
1571 "entry for: parent "DFID", child "DFID", name %s, type "
1572 "in name entry %o, type claimed by child %o. repair it "
1573 "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck),
1574 PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
1575 name, type, update ? lfsck_object_type(child) : 0,
1576 update ? "updating" : "removing", name2, rc);
1579 struct lfsck_namespace *ns = com->lc_file_ram;
1581 ns->ln_flags |= LF_INCONSISTENT;
1588 * Update the ".." name entry for the given object.
1590 * The object's ".." is corrupted, this function will update the ".." name
1591 * entry with the given pfid, and the linkEA with the given ldata.
1593 * The caller should take the ldlm lock before the calling.
1595 * \param[in] env pointer to the thread context
1596 * \param[in] com pointer to the lfsck component
1597 * \param[in] obj pointer to the dt_object to be handled
1598 * \param[in] pfid the new fid for the object's ".." name entry
1599 * \param[in] cname the name for the @obj in the parent directory
1601 * \retval positive number for repaired cases
1602 * \retval 0 if nothing to be repaired
1603 * \retval negative error number on failure
1605 static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
1606 struct lfsck_component *com,
1607 struct dt_object *obj,
1608 const struct lu_fid *pfid,
1609 struct lu_name *cname)
1611 struct lfsck_thread_info *info = lfsck_env_info(env);
1612 struct dt_insert_rec *rec = &info->lti_dt_rec;
1613 struct lfsck_instance *lfsck = com->lc_lfsck;
1614 struct dt_device *dev = lfsck->li_bottom;
1615 struct thandle *th = NULL;
1616 struct linkea_data ldata = { 0 };
1617 struct lu_buf linkea_buf;
1621 LASSERT(!dt_object_remote(obj));
1622 LASSERT(S_ISDIR(lfsck_object_type(obj)));
1624 rc = linkea_data_new(&ldata, &info->lti_big_buf);
1628 rc = linkea_add_buf(&ldata, cname, pfid);
1632 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1633 ldata.ld_leh->leh_len);
1635 th = dt_trans_create(env, dev);
1637 GOTO(log, rc = PTR_ERR(th));
1639 rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
1643 rec->rec_type = S_IFDIR;
1644 rec->rec_fid = pfid;
1645 rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
1646 (const struct dt_key *)dotdot, th);
1650 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1651 XATTR_NAME_LINK, 0, th);
1655 rc = dt_trans_start_local(env, dev, th);
1659 dt_write_lock(env, obj, 0);
1660 if (unlikely(lfsck_is_dead_obj(obj)))
1661 GOTO(unlock, rc = 0);
1663 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
1664 GOTO(unlock, rc = 1);
1666 /* The old ".." name entry maybe not exist. */
1667 dt_delete(env, obj, (const struct dt_key *)dotdot, th,
1670 rc = dt_insert(env, obj, (const struct dt_rec *)rec,
1671 (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
1675 rc = dt_xattr_set(env, obj, &linkea_buf,
1676 XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1678 GOTO(unlock, rc = (rc == 0 ? 1 : rc));
1681 dt_write_unlock(env, obj);
1684 dt_trans_stop(env, dev, th);
1687 CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
1688 "the object "DFID", new parent "DFID": rc = %d\n",
1689 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
1693 struct lfsck_namespace *ns = com->lc_file_ram;
1695 ns->ln_flags |= LF_INCONSISTENT;
1702 * Handle orphan @obj during Double Scan Directory.
1704 * Remove the @obj's current (invalid) linkEA entries, and insert
1705 * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
1706 * ${FID}-${PFID}-D-${conflict_version}
1708 * The caller should take the ldlm lock before the calling.
1710 * \param[in] env pointer to the thread context
1711 * \param[in] com pointer to the lfsck component
1712 * \param[in] obj pointer to the orphan object to be handled
1713 * \param[in] pfid the new fid for the object's ".." name entry
1714 * \param[in,out] lh ldlm lock handler for the given @obj
1715 * \param[out] type to tell the caller what the inconsistency is
1717 * \retval positive number for repaired cases
1718 * \retval 0 if nothing to be repaired
1719 * \retval negative error number on failure
1722 lfsck_namespace_dsd_orphan(const struct lu_env *env,
1723 struct lfsck_component *com,
1724 struct dt_object *obj,
1725 const struct lu_fid *pfid,
1726 struct lustre_handle *lh,
1727 enum lfsck_namespace_inconsistency_type *type)
1729 struct lfsck_thread_info *info = lfsck_env_info(env);
1733 /* Remove the unrecognized linkEA. */
1734 rc = lfsck_namespace_links_remove(env, com, obj);
1735 lfsck_ibits_unlock(lh, LCK_EX);
1736 if (rc < 0 && rc != -ENODATA)
1739 *type = LNIT_MUL_REF;
1740 /* The unique linkEA is invalid, even if the ".." name entry may be
1741 * valid, we still cannot know via which name entry this directory
1742 * will be referenced. Then handle it as pure orphan. */
1743 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
1744 "-"DFID, PFID(pfid));
1745 rc = lfsck_namespace_insert_orphan(env, com, obj,
1746 info->lti_tmpbuf, "D", NULL);
1752 * Double Scan Directory object for single linkEA entry case.
1754 * The given @child has unique linkEA entry. If the linkEA entry is valid,
1755 * then check whether the name is in the namespace or not, if not, add the
1756 * missing name entry back to namespace. If the linkEA entry is invalid,
1757 * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
1760 * \param[in] env pointer to the thread context
1761 * \param[in] com pointer to the lfsck component
1762 * \param[in] child pointer to the directory to be double scanned
1763 * \param[in] pfid the FID corresponding to the ".." entry
1764 * \param[in] ldata pointer to the linkEA data for the given @child
1765 * \param[in,out] lh ldlm lock handler for the given @child
1766 * \param[out] type to tell the caller what the inconsistency is
1767 * \param[in] retry if found inconsistency, but the caller does not hold
1768 * ldlm lock on the @child, then set @retry as true
1770 * \retval positive number for repaired cases
1771 * \retval 0 if nothing to be repaired
1772 * \retval negative error number on failure
1775 lfsck_namespace_dsd_single(const struct lu_env *env,
1776 struct lfsck_component *com,
1777 struct dt_object *child,
1778 const struct lu_fid *pfid,
1779 struct linkea_data *ldata,
1780 struct lustre_handle *lh,
1781 enum lfsck_namespace_inconsistency_type *type,
1784 struct lfsck_thread_info *info = lfsck_env_info(env);
1785 struct lu_name *cname = &info->lti_name;
1786 const struct lu_fid *cfid = lfsck_dto2fid(child);
1787 struct lu_fid *tfid = &info->lti_fid3;
1788 struct lfsck_instance *lfsck = com->lc_lfsck;
1789 struct dt_object *parent = NULL;
1793 lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key);
1794 /* The unique linkEA entry with bad parent will be handled as orphan. */
1795 if (!fid_is_sane(tfid)) {
1796 if (!lustre_handle_is_used(lh) && retry != NULL)
1799 rc = lfsck_namespace_dsd_orphan(env, com, child,
1805 parent = lfsck_object_find_bottom(env, lfsck, tfid);
1807 GOTO(out, rc = PTR_ERR(parent));
1809 /* We trust the unique linkEA entry in spite of whether it matches the
1810 * ".." name entry or not. Because even if the linkEA entry is wrong
1811 * and the ".." name entry is right, we still cannot know via which
1812 * name entry the child will be referenced, since all known entries
1813 * have been verified during the first-stage scanning. */
1814 if (!dt_object_exists(parent)) {
1815 if (!lustre_handle_is_used(lh) && retry != NULL) {
1821 lfsck_ibits_unlock(lh, LCK_EX);
1822 /* Create the lost parent as an orphan. */
1823 rc = lfsck_namespace_create_orphan(env, com, parent);
1825 /* Add the missing name entry to the parent. */
1826 rc = lfsck_namespace_insert_normal(env, com, parent,
1827 child, cname->ln_name);
1828 if (unlikely(rc == -EEXIST)) {
1829 /* Unfortunately, someone reused the name
1830 * under the parent by race. So we have
1831 * to remove the linkEA entry from
1832 * current child object. It means that the
1833 * LFSCK cannot recover the system
1834 * totally back to its original status,
1835 * but it is necessary to make the
1836 * current system to be consistent. */
1837 rc = lfsck_namespace_shrink_linkea(env,
1841 snprintf(info->lti_tmpbuf,
1842 sizeof(info->lti_tmpbuf),
1843 "-"DFID, PFID(pfid));
1844 rc = lfsck_namespace_insert_orphan(env,
1845 com, child, info->lti_tmpbuf,
1854 /* The unique linkEA entry with bad parent will be handled as orphan. */
1855 if (unlikely(!dt_try_as_dir(env, parent))) {
1856 if (!lustre_handle_is_used(lh) && retry != NULL)
1859 rc = lfsck_namespace_dsd_orphan(env, com, child,
1865 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
1866 (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
1867 if (rc == -ENOENT) {
1868 if (!lustre_handle_is_used(lh) && retry != NULL) {
1874 lfsck_ibits_unlock(lh, LCK_EX);
1875 /* Add the missing name entry back to the namespace. */
1876 rc = lfsck_namespace_insert_normal(env, com, parent, child,
1878 if (unlikely(rc == -EEXIST)) {
1879 /* Unfortunately, someone reused the name under the
1880 * parent by race. So we have to remove the linkEA
1881 * entry from current child object. It means that the
1882 * LFSCK cannot recover the system totally back to
1883 * its original status, but it is necessary to make
1884 * the current system to be consistent.
1886 * It also may be because of the LFSCK found some
1887 * internal status of create operation. Under such
1888 * case, nothing to be done. */
1889 rc = lfsck_namespace_shrink_linkea_cond(env, com,
1890 parent, child, ldata, cname, tfid);
1892 snprintf(info->lti_tmpbuf,
1893 sizeof(info->lti_tmpbuf),
1894 "-"DFID, PFID(pfid));
1895 rc = lfsck_namespace_insert_orphan(env, com,
1896 child, info->lti_tmpbuf, "D", NULL);
1906 if (!lu_fid_eq(tfid, cfid)) {
1907 if (!lustre_handle_is_used(lh) && retry != NULL) {
1913 lfsck_ibits_unlock(lh, LCK_EX);
1914 /* The name entry references another MDT-object that
1915 * may be created by the LFSCK for repairing dangling
1916 * name entry. Try to replace it. */
1917 rc = lfsck_namespace_replace_cond(env, com, parent, child,
1920 rc = lfsck_namespace_dsd_orphan(env, com, child,
1926 /* The ".." name entry is wrong, update it. */
1927 if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
1928 if (!lustre_handle_is_used(lh) && retry != NULL) {
1934 *type = LNIT_UNMATCHED_PAIRS;
1935 rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
1936 lfsck_dto2fid(parent), cname);
1942 if (parent != NULL && !IS_ERR(parent))
1943 lfsck_object_put(env, parent);
1949 * Double Scan Directory object for multiple linkEA entries case.
1951 * The given @child has multiple linkEA entries. There is at most one linkEA
1952 * entry will be valid, all the others will be removed. Firstly, the function
1953 * will try to find out the linkEA entry for which the name entry exists under
1954 * the given parent (@pfid). If there is no linkEA entry that matches the given
1955 * ".." name entry, then tries to find out the first linkEA entry that both the
1956 * parent and the name entry exist to rebuild a new ".." name entry.
1958 * \param[in] env pointer to the thread context
1959 * \param[in] com pointer to the lfsck component
1960 * \param[in] child pointer to the directory to be double scanned
1961 * \param[in] pfid the FID corresponding to the ".." entry
1962 * \param[in] ldata pointer to the linkEA data for the given @child
1963 * \param[in,out] lh ldlm lock handler for the given @child
1964 * \param[out] type to tell the caller what the inconsistency is
1965 * \param[in] lpf true if the ".." entry is under lost+found/MDTxxxx/
1967 * \retval positive number for repaired cases
1968 * \retval 0 if nothing to be repaired
1969 * \retval negative error number on failure
1972 lfsck_namespace_dsd_multiple(const struct lu_env *env,
1973 struct lfsck_component *com,
1974 struct dt_object *child,
1975 const struct lu_fid *pfid,
1976 struct linkea_data *ldata,
1977 struct lustre_handle *lh,
1978 enum lfsck_namespace_inconsistency_type *type,
1981 struct lfsck_thread_info *info = lfsck_env_info(env);
1982 struct lu_name *cname = &info->lti_name;
1983 const struct lu_fid *cfid = lfsck_dto2fid(child);
1984 struct lu_fid *tfid = &info->lti_fid3;
1985 struct lu_fid *pfid2 = &info->lti_fid4;
1986 struct lfsck_namespace *ns = com->lc_file_ram;
1987 struct lfsck_instance *lfsck = com->lc_lfsck;
1988 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1989 struct dt_object *parent = NULL;
1990 struct linkea_data ldata_new = { 0 };
1997 while (ldata->ld_lee != NULL) {
1998 lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid,
2000 /* Drop repeated linkEA entries. */
2001 lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true);
2002 /* Drop invalid linkEA entry. */
2003 if (!fid_is_sane(tfid)) {
2004 linkea_del_buf(ldata, cname);
2008 /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
2009 * then it is possible that: the directry object has ever
2010 * been lost, but its name entry was there. In the former
2011 * LFSCK run, during the first-stage scanning, the LFSCK
2012 * found the dangling name entry, but it did not recreate
2013 * the lost object, and when moved to the second-stage
2014 * scanning, some children objects of the lost directory
2015 * object were found, then the LFSCK recreated such lost
2016 * directory object as an orphan.
2018 * When the LFSCK runs again, if the dangling name is still
2019 * there, the LFSCK should move the orphan directory object
2020 * back to the normal namespace. */
2021 if (!lpf && !lu_fid_eq(pfid, tfid) && once) {
2022 linkea_next_entry(ldata);
2026 parent = lfsck_object_find_bottom(env, lfsck, tfid);
2028 RETURN(PTR_ERR(parent));
2030 if (!dt_object_exists(parent)) {
2031 lfsck_object_put(env, parent);
2032 if (ldata->ld_leh->leh_reccount > 1) {
2033 /* If it is NOT the last linkEA entry, then
2034 * there is still other chance to make the
2035 * child to be visible via other parent, then
2036 * remove this linkEA entry. */
2037 linkea_del_buf(ldata, cname);
2044 /* The linkEA entry with bad parent will be removed. */
2045 if (unlikely(!dt_try_as_dir(env, parent))) {
2046 lfsck_object_put(env, parent);
2047 linkea_del_buf(ldata, cname);
2051 rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
2052 (const struct dt_key *)cname->ln_name,
2054 *pfid2 = *lfsck_dto2fid(parent);
2055 if (rc == -ENOENT) {
2056 lfsck_object_put(env, parent);
2057 linkea_next_entry(ldata);
2062 lfsck_object_put(env, parent);
2067 if (lu_fid_eq(tfid, cfid)) {
2068 lfsck_object_put(env, parent);
2069 if (!lu_fid_eq(pfid, pfid2)) {
2070 *type = LNIT_UNMATCHED_PAIRS;
2071 rc = lfsck_namespace_repair_unmatched_pairs(env,
2072 com, child, pfid2, cname);
2078 /* It is the most common case that we find the
2079 * name entry corresponding to the linkEA entry
2080 * that matches the ".." name entry. */
2081 rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
2085 rc = linkea_add_buf(&ldata_new, cname, pfid2);
2089 rc = lfsck_namespace_rebuild_linkea(env, com, child,
2094 linkea_del_buf(ldata, cname);
2095 linkea_first_entry(ldata);
2096 /* There may be some invalid dangling name entries under
2097 * other parent directories, remove all of them. */
2098 while (ldata->ld_lee != NULL) {
2099 lfsck_namespace_unpack_linkea_entry(ldata,
2100 cname, tfid, info->lti_key);
2101 if (!fid_is_sane(tfid))
2104 parent = lfsck_object_find_bottom(env, lfsck,
2106 if (IS_ERR(parent)) {
2107 rc = PTR_ERR(parent);
2108 if (rc != -ENOENT &&
2109 bk->lb_param & LPF_FAILOUT)
2115 if (!dt_object_exists(parent)) {
2116 lfsck_object_put(env, parent);
2120 rc = lfsck_namespace_repair_dirent(env, com,
2121 parent, child, cname->ln_name,
2122 cname->ln_name, S_IFDIR, false, true);
2123 lfsck_object_put(env, parent);
2125 if (bk->lb_param & LPF_FAILOUT)
2134 linkea_del_buf(ldata, cname);
2137 ns->ln_dirent_repaired += count;
2142 lfsck_ibits_unlock(lh, LCK_EX);
2143 /* The name entry references another MDT-object that may be
2144 * created by the LFSCK for repairing dangling name entry.
2145 * Try to replace it. */
2146 rc = lfsck_namespace_replace_cond(env, com, parent, child,
2148 lfsck_object_put(env, parent);
2155 linkea_del_buf(ldata, cname);
2158 if (ldata->ld_leh->leh_reccount == 1) {
2159 rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
2165 /* All linkEA entries are invalid and removed, then handle the @child
2167 if (ldata->ld_leh->leh_reccount == 0) {
2168 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
2174 linkea_first_entry(ldata);
2175 /* If the dangling name entry for the orphan directory object has
2176 * been remvoed, then just check whether the directory object is
2177 * still under the .lustre/lost+found/MDTxxxx/ or not. */
2183 /* There is no linkEA entry that matches the ".." name entry. Find
2184 * the first linkEA entry that both parent and name entry exist to
2185 * rebuild a new ".." name entry. */
2195 * Double scan the directory object for namespace LFSCK.
2197 * This function will verify the <parent, child> pairs in the namespace tree:
2198 * the parent references the child via some name entry that should be in the
2199 * child's linkEA entry, the child should back references the parent via its
2202 * The LFSCK will scan every linkEA entry in turn until find out the first
2203 * matched pairs. If found, then all other linkEA entries will be dropped.
2204 * If all the linkEA entries cannot match the ".." name entry, then there
2205 * are serveral possible cases:
2207 * 1) If there is only one linkEA entry, then trust it as long as the PFID
2208 * in the linkEA entry is valid.
2210 * 2) If there are multiple linkEA entries, then try to find the linkEA
2211 * that matches the ".." name entry. If found, then all other entries
2212 * are invalid; otherwise, it is quite possible that the ".." name entry
2213 * is corrupted. Under such case, the LFSCK will rebuild the ".." name
2214 * entry according to the first valid linkEA entry (both the parent and
2215 * the name entry should exist).
2217 * 3) If the directory object has no (valid) linkEA entry, then the
2218 * directory object will be handled as pure orphan and inserted
2219 * in the .lustre/lost+found/MDTxxxx/ with the name:
2220 * ${self_FID}-${PFID}-D-${conflict_version}
2222 * \param[in] env pointer to the thread context
2223 * \param[in] com pointer to the lfsck component
2224 * \param[in] child pointer to the directory object to be handled
2225 * \param[in] flags to indicate the specical checking on the @child
2227 * \retval positive number for repaired cases
2228 * \retval 0 if nothing to be repaired
2229 * \retval negative error number on failure
2231 static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
2232 struct lfsck_component *com,
2233 struct dt_object *child, __u8 flags)
2235 struct lfsck_thread_info *info = lfsck_env_info(env);
2236 const struct lu_fid *cfid = lfsck_dto2fid(child);
2237 struct lu_fid *pfid = &info->lti_fid2;
2238 struct lfsck_namespace *ns = com->lc_file_ram;
2239 struct lfsck_instance *lfsck = com->lc_lfsck;
2240 struct lustre_handle lh = { 0 };
2241 struct linkea_data ldata = { 0 };
2242 bool unknown = false;
2245 enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
2249 LASSERT(!dt_object_remote(child));
2251 if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
2252 CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
2253 "the namespace LFSCK, then the LFSCK cannot guarantee"
2254 "all the name entries have been verified in first-stage"
2255 "scanning. So have to skip orphan related handling for"
2256 "the directory object "DFID" with remote name entry\n",
2257 lfsck_lfsck2name(lfsck), PFID(cfid));
2262 if (unlikely(!dt_try_as_dir(env, child)))
2263 GOTO(out, rc = -ENOTDIR);
2265 /* We only take ldlm lock on the @child when required. When the
2266 * logic comes here for the first time, it is always false. */
2270 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
2271 MDS_INODELOCK_UPDATE |
2272 MDS_INODELOCK_XATTR, LCK_EX);
2277 dt_read_lock(env, child, 0);
2278 if (unlikely(lfsck_is_dead_obj(child))) {
2279 dt_read_unlock(env, child);
2284 rc = dt_lookup(env, child, (struct dt_rec *)pfid,
2285 (const struct dt_key *)dotdot, BYPASS_CAPA);
2287 if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
2288 dt_read_unlock(env, child);
2293 if (!lustre_handle_is_used(&lh)) {
2294 dt_read_unlock(env, child);
2299 } else if (lfsck->li_lpf_obj != NULL &&
2300 lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
2304 rc = lfsck_links_read(env, child, &ldata);
2305 dt_read_unlock(env, child);
2307 if (rc != -ENODATA && rc != -EINVAL)
2310 if (!lustre_handle_is_used(&lh))
2313 if (rc == -EINVAL && !fid_is_zero(pfid)) {
2314 /* Remove the corrupted linkEA. */
2315 rc = lfsck_namespace_links_remove(env, com, child);
2317 /* Here, because of the crashed linkEA, we
2318 * cannot know whether there is some parent
2319 * that references the child directory via
2320 * some name entry or not. So keep it there,
2321 * when the LFSCK run next time, if there is
2322 * some parent that references this object,
2323 * then the LFSCK can rebuild the linkEA;
2324 * otherwise, this object will be handled
2325 * as orphan as above. */
2328 /* 1. If we have neither ".." nor linkEA,
2329 * then it is an orphan.
2331 * 2. If we only have the ".." name entry,
2332 * but no parent references this child
2333 * directory, then handle it as orphan. */
2334 lfsck_ibits_unlock(&lh, LCK_EX);
2335 type = LNIT_MUL_REF;
2336 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2337 "-"DFID, PFID(pfid));
2338 rc = lfsck_namespace_insert_orphan(env, com, child,
2339 info->lti_tmpbuf, "D", NULL);
2345 linkea_first_entry(&ldata);
2346 /* This is the most common case: the object has unique linkEA entry. */
2347 if (ldata.ld_leh->leh_reccount == 1) {
2348 rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
2349 &lh, &type, &retry);
2351 LASSERT(!lustre_handle_is_used(&lh));
2360 if (!lustre_handle_is_used(&lh))
2363 if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
2364 rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
2370 /* When we come here, the cases usually like that:
2371 * 1) The directory object has a corrupted linkEA entry. During the
2372 * first-stage scanning, the LFSCK cannot know such corruption,
2373 * then it appends the right linkEA entry according to the found
2374 * name entry after the bad one.
2376 * 2) The directory object has a right linkEA entry. During the
2377 * first-stage scanning, the LFSCK finds some bad name entry,
2378 * but the LFSCK cannot aware that at that time, then it adds
2379 * the bad linkEA entry for further processing. */
2380 rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
2386 lfsck_ibits_unlock(&lh, LCK_EX);
2389 case LNIT_BAD_LINKEA:
2390 ns->ln_linkea_repaired++;
2392 case LNIT_UNMATCHED_PAIRS:
2393 ns->ln_unmatched_pairs_repaired++;
2396 ns->ln_mul_ref_repaired++;
2404 ns->ln_unknown_inconsistency++;
2410 * Double scan the MDT-object for namespace LFSCK.
2412 * If the MDT-object contains invalid or repeated linkEA entries, then drop
2413 * those entries from the linkEA; if the linkEA becomes empty or the object
2414 * has no linkEA, then it is an orphan and will be added into the directory
2415 * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
2416 * the remote parent; if the name entry corresponding to some linkEA entry
2417 * is lost, then add the name entry back to the namespace.
2419 * \param[in] env pointer to the thread context
2420 * \param[in] com pointer to the lfsck component
2421 * \param[in] child pointer to the dt_object to be handled
2422 * \param[in] flags some hints to indicate how the @child should be handled
2424 * \retval positive number for repaired cases
2425 * \retval 0 if nothing to be repaired
2426 * \retval negative error number on failure
2428 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
2429 struct lfsck_component *com,
2430 struct dt_object *child, __u8 flags)
2432 struct lfsck_thread_info *info = lfsck_env_info(env);
2433 struct lu_attr *la = &info->lti_la;
2434 struct lu_name *cname = &info->lti_name;
2435 struct lu_fid *pfid = &info->lti_fid;
2436 struct lu_fid *cfid = &info->lti_fid2;
2437 struct lfsck_instance *lfsck = com->lc_lfsck;
2438 struct lfsck_namespace *ns = com->lc_file_ram;
2439 struct dt_object *parent = NULL;
2440 struct linkea_data ldata = { 0 };
2441 bool repaired = false;
2446 dt_read_lock(env, child, 0);
2447 if (unlikely(lfsck_is_dead_obj(child))) {
2448 dt_read_unlock(env, child);
2453 if (S_ISDIR(lfsck_object_type(child))) {
2454 dt_read_unlock(env, child);
2455 rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
2460 rc = lfsck_links_read(env, child, &ldata);
2461 dt_read_unlock(env, child);
2465 linkea_first_entry(&ldata);
2466 while (ldata.ld_lee != NULL) {
2467 lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
2469 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
2471 /* Found repeated linkEA entries */
2473 rc = lfsck_namespace_shrink_linkea(env, com, child,
2474 &ldata, cname, pfid, false);
2486 /* Invalid PFID in the linkEA entry. */
2487 if (!fid_is_sane(pfid)) {
2488 rc = lfsck_namespace_shrink_linkea(env, com, child,
2489 &ldata, cname, pfid, true);
2499 parent = lfsck_object_find_bottom(env, lfsck, pfid);
2501 GOTO(out, rc = PTR_ERR(parent));
2503 if (!dt_object_exists(parent)) {
2504 if (ldata.ld_leh->leh_reccount > 1) {
2505 /* If it is NOT the last linkEA entry, then
2506 * there is still other chance to make the
2507 * child to be visible via other parent, then
2508 * remove this linkEA entry. */
2509 rc = lfsck_namespace_shrink_linkea(env, com,
2510 child, &ldata, cname, pfid, true);
2512 /* Create the lost parent as an orphan. */
2513 rc = lfsck_namespace_create_orphan(env, com,
2516 lfsck_object_put(env, parent);
2524 /* Add the missing name entry to the parent. */
2525 rc = lfsck_namespace_insert_normal(env, com,
2526 parent, child, cname->ln_name);
2527 if (unlikely(rc == -EEXIST))
2528 /* Unfortunately, someone reused the
2529 * name under the parent by race. So we
2530 * have to remove the linkEA entry from
2531 * current child object. It means that
2532 * the LFSCK cannot recover the system
2533 * totally back to its original status,
2534 * but it is necessary to make the
2535 * current system to be consistent. */
2536 rc = lfsck_namespace_shrink_linkea(env,
2540 linkea_next_entry(&ldata);
2543 lfsck_object_put(env, parent);
2553 /* The linkEA entry with bad parent will be removed. */
2554 if (unlikely(!dt_try_as_dir(env, parent))) {
2555 lfsck_object_put(env, parent);
2556 rc = lfsck_namespace_shrink_linkea(env, com, child,
2557 &ldata, cname, pfid, true);
2567 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
2568 (const struct dt_key *)cname->ln_name,
2570 if (rc != 0 && rc != -ENOENT) {
2571 lfsck_object_put(env, parent);
2577 if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
2578 /* It is the most common case that we
2579 * find the name entry corresponding
2580 * to the linkEA entry. */
2581 lfsck_object_put(env, parent);
2582 linkea_next_entry(&ldata);
2584 /* The name entry references another
2585 * MDT-object that may be created by
2586 * the LFSCK for repairing dangling
2587 * name entry. Try to replace it. */
2588 rc = lfsck_namespace_replace_cond(env, com,
2589 parent, child, cfid, cname);
2590 lfsck_object_put(env, parent);
2596 linkea_next_entry(&ldata);
2598 rc = lfsck_namespace_shrink_linkea(env,
2612 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
2616 /* If there is no name entry in the parent dir and the object
2617 * link count is less than the linkea entries count, then the
2618 * linkea entry should be removed. */
2619 if (ldata.ld_leh->leh_reccount > la->la_nlink) {
2620 rc = lfsck_namespace_shrink_linkea_cond(env, com,
2621 parent, child, &ldata, cname, pfid);
2622 lfsck_object_put(env, parent);
2632 /* Add the missing name entry back to the namespace. */
2633 rc = lfsck_namespace_insert_normal(env, com, parent, child,
2635 if (unlikely(rc == -EEXIST))
2636 /* Unfortunately, someone reused the name under the
2637 * parent by race. So we have to remove the linkEA
2638 * entry from current child object. It means that the
2639 * LFSCK cannot recover the system totally back to
2640 * its original status, but it is necessary to make
2641 * the current system to be consistent.
2643 * It also may be because of the LFSCK found some
2644 * internal status of create operation. Under such
2645 * case, nothing to be done. */
2646 rc = lfsck_namespace_shrink_linkea_cond(env, com,
2647 parent, child, &ldata, cname, pfid);
2649 linkea_next_entry(&ldata);
2651 lfsck_object_put(env, parent);
2662 if (rc < 0 && rc != -ENODATA)
2666 LASSERT(ldata.ld_leh != NULL);
2668 count = ldata.ld_leh->leh_reccount;
2672 /* If the child becomes orphan, then insert it into
2673 * the global .lustre/lost+found/MDTxxxx directory. */
2674 rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
2680 ns->ln_mul_ref_repaired++;
2685 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
2689 if (la->la_nlink != count) {
2690 /* XXX: there will be other patch(es) for MDT-object
2691 * hard links verification. */
2695 if (la->la_nlink > 1)
2696 ns->ln_mul_linked_repaired++;
2705 static void lfsck_namespace_dump_statistics(struct seq_file *m,
2706 struct lfsck_namespace *ns,
2707 __u64 checked_phase1,
2708 __u64 checked_phase2,
2712 seq_printf(m, "checked_phase1: "LPU64"\n"
2713 "checked_phase2: "LPU64"\n"
2714 "updated_phase1: "LPU64"\n"
2715 "updated_phase2: "LPU64"\n"
2716 "failed_phase1: "LPU64"\n"
2717 "failed_phase2: "LPU64"\n"
2718 "directories: "LPU64"\n"
2719 "dirent_repaired: "LPU64"\n"
2720 "linkea_repaired: "LPU64"\n"
2721 "nlinks_repaired: "LPU64"\n"
2722 "multiple_linked_checked: "LPU64"\n"
2723 "multiple_linked_repaired: "LPU64"\n"
2724 "unknown_inconsistency: "LPU64"\n"
2725 "unmatched_pairs_repaired: "LPU64"\n"
2726 "dangling_repaired: "LPU64"\n"
2727 "multiple_referenced_repaired: "LPU64"\n"
2728 "bad_file_type_repaired: "LPU64"\n"
2729 "lost_dirent_repaired: "LPU64"\n"
2730 "success_count: %u\n"
2731 "run_time_phase1: %u seconds\n"
2732 "run_time_phase2: %u seconds\n",
2735 ns->ln_items_repaired,
2736 ns->ln_objs_repaired_phase2,
2737 ns->ln_items_failed,
2738 ns->ln_objs_failed_phase2,
2739 ns->ln_dirs_checked,
2740 ns->ln_dirent_repaired,
2741 ns->ln_linkea_repaired,
2742 ns->ln_objs_nlink_repaired,
2743 ns->ln_mul_linked_checked,
2744 ns->ln_mul_linked_repaired,
2745 ns->ln_unknown_inconsistency,
2746 ns->ln_unmatched_pairs_repaired,
2747 ns->ln_dangling_repaired,
2748 ns->ln_mul_ref_repaired,
2749 ns->ln_bad_type_repaired,
2750 ns->ln_lost_dirent_repaired,
2751 ns->ln_success_count,
2756 /* namespace APIs */
2758 static int lfsck_namespace_reset(const struct lu_env *env,
2759 struct lfsck_component *com, bool init)
2761 struct lfsck_instance *lfsck = com->lc_lfsck;
2762 struct lfsck_namespace *ns = com->lc_file_ram;
2763 struct dt_object *root;
2764 struct dt_object *dto;
2768 root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
2770 GOTO(log, rc = PTR_ERR(root));
2772 if (unlikely(!dt_try_as_dir(env, root)))
2773 GOTO(put, rc = -ENOTDIR);
2775 down_write(&com->lc_sem);
2777 memset(ns, 0, sizeof(*ns));
2779 __u32 count = ns->ln_success_count;
2780 __u64 last_time = ns->ln_time_last_complete;
2782 memset(ns, 0, sizeof(*ns));
2783 ns->ln_success_count = count;
2784 ns->ln_time_last_complete = last_time;
2786 ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
2787 ns->ln_status = LS_INIT;
2789 rc = local_object_unlink(env, lfsck->li_bottom, root,
2790 lfsck_namespace_name);
2794 lfsck_object_put(env, com->lc_obj);
2796 dto = local_index_find_or_create(env, lfsck->li_los, root,
2797 lfsck_namespace_name,
2798 S_IFREG | S_IRUGO | S_IWUSR,
2799 &dt_lfsck_features);
2801 GOTO(out, rc = PTR_ERR(dto));
2804 rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
2808 rc = lfsck_namespace_store(env, com, true);
2813 up_write(&com->lc_sem);
2816 lu_object_put(env, &root->do_lu);
2818 CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
2819 lfsck_lfsck2name(lfsck), rc);
2824 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
2827 struct lfsck_namespace *ns = com->lc_file_ram;
2829 down_write(&com->lc_sem);
2831 com->lc_new_checked++;
2832 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
2833 up_write(&com->lc_sem);
2836 static int lfsck_namespace_checkpoint(const struct lu_env *env,
2837 struct lfsck_component *com, bool init)
2839 struct lfsck_instance *lfsck = com->lc_lfsck;
2840 struct lfsck_namespace *ns = com->lc_file_ram;
2844 rc = lfsck_checkpoint_generic(env, com);
2849 down_write(&com->lc_sem);
2851 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
2853 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
2854 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2855 HALF_SEC - lfsck->li_time_last_checkpoint);
2856 ns->ln_time_last_checkpoint = cfs_time_current_sec();
2857 ns->ln_items_checked += com->lc_new_checked;
2858 com->lc_new_checked = 0;
2861 rc = lfsck_namespace_store(env, com, false);
2862 up_write(&com->lc_sem);
2865 CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
2866 ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
2867 lfsck->li_pos_current.lp_oit_cookie,
2868 PFID(&lfsck->li_pos_current.lp_dir_parent),
2869 lfsck->li_pos_current.lp_dir_cookie, rc);
2871 return rc > 0 ? 0 : rc;
2874 static int lfsck_namespace_prep(const struct lu_env *env,
2875 struct lfsck_component *com,
2876 struct lfsck_start_param *lsp)
2878 struct lfsck_instance *lfsck = com->lc_lfsck;
2879 struct lfsck_namespace *ns = com->lc_file_ram;
2880 struct lfsck_position *pos = &com->lc_pos_start;
2883 if (ns->ln_status == LS_COMPLETED) {
2884 rc = lfsck_namespace_reset(env, com, false);
2886 rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
2889 CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
2890 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
2896 down_write(&com->lc_sem);
2897 ns->ln_time_latest_start = cfs_time_current_sec();
2898 spin_lock(&lfsck->li_lock);
2900 if (ns->ln_flags & LF_SCANNED_ONCE) {
2901 if (!lfsck->li_drop_dryrun ||
2902 lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
2903 ns->ln_status = LS_SCANNING_PHASE2;
2904 list_move_tail(&com->lc_link,
2905 &lfsck->li_list_double_scan);
2906 if (!list_empty(&com->lc_link_dir))
2907 list_del_init(&com->lc_link_dir);
2908 lfsck_pos_set_zero(pos);
2910 ns->ln_status = LS_SCANNING_PHASE1;
2911 ns->ln_run_time_phase1 = 0;
2912 ns->ln_run_time_phase2 = 0;
2913 ns->ln_items_checked = 0;
2914 ns->ln_items_repaired = 0;
2915 ns->ln_items_failed = 0;
2916 ns->ln_dirs_checked = 0;
2917 ns->ln_objs_checked_phase2 = 0;
2918 ns->ln_objs_repaired_phase2 = 0;
2919 ns->ln_objs_failed_phase2 = 0;
2920 ns->ln_objs_nlink_repaired = 0;
2921 ns->ln_dirent_repaired = 0;
2922 ns->ln_linkea_repaired = 0;
2923 ns->ln_mul_linked_checked = 0;
2924 ns->ln_mul_linked_repaired = 0;
2925 ns->ln_unknown_inconsistency = 0;
2926 ns->ln_unmatched_pairs_repaired = 0;
2927 ns->ln_dangling_repaired = 0;
2928 ns->ln_mul_ref_repaired = 0;
2929 ns->ln_bad_type_repaired = 0;
2930 ns->ln_lost_dirent_repaired = 0;
2931 fid_zero(&ns->ln_fid_latest_scanned_phase2);
2932 if (list_empty(&com->lc_link_dir))
2933 list_add_tail(&com->lc_link_dir,
2934 &lfsck->li_list_dir);
2935 *pos = ns->ln_pos_first_inconsistent;
2938 ns->ln_status = LS_SCANNING_PHASE1;
2939 if (list_empty(&com->lc_link_dir))
2940 list_add_tail(&com->lc_link_dir,
2941 &lfsck->li_list_dir);
2942 if (!lfsck->li_drop_dryrun ||
2943 lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
2944 *pos = ns->ln_pos_last_checkpoint;
2945 pos->lp_oit_cookie++;
2947 *pos = ns->ln_pos_first_inconsistent;
2951 spin_unlock(&lfsck->li_lock);
2952 up_write(&com->lc_sem);
2954 rc = lfsck_start_assistant(env, com, lsp);
2956 CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
2957 DFID", "LPX64"]: rc = %d\n",
2958 lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
2959 PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
2964 static int lfsck_namespace_exec_oit(const struct lu_env *env,
2965 struct lfsck_component *com,
2966 struct dt_object *obj)
2968 struct lfsck_thread_info *info = lfsck_env_info(env);
2969 struct lfsck_namespace *ns = com->lc_file_ram;
2970 struct lfsck_instance *lfsck = com->lc_lfsck;
2971 const struct lu_fid *fid = lfsck_dto2fid(obj);
2972 struct lu_attr *la = &info->lti_la;
2973 struct lu_fid *pfid = &info->lti_fid2;
2974 struct lu_name *cname = &info->lti_name;
2975 struct lu_seq_range *range = &info->lti_range;
2976 struct dt_device *dev = lfsck->li_bottom;
2977 struct seq_server_site *ss =
2978 lu_site2seq(dev->dd_lu_dev.ld_site);
2979 struct linkea_data ldata = { 0 };
2980 __u32 idx = lfsck_dev_idx(dev);
2984 rc = lfsck_links_read(env, obj, &ldata);
2988 /* -EINVAL means crashed linkEA, should be verified. */
2989 if (rc == -EINVAL) {
2990 rc = lfsck_namespace_trace_update(env, com, fid,
2991 LNTF_CHECK_LINKEA, true);
2993 struct lustre_handle lh = { 0 };
2995 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
2996 MDS_INODELOCK_UPDATE |
2997 MDS_INODELOCK_XATTR, LCK_EX);
2999 rc = lfsck_namespace_links_remove(env, com,
3001 lfsck_ibits_unlock(&lh, LCK_EX);
3005 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
3008 /* zero-linkEA object may be orphan, but it also maybe because
3009 * of upgrading. Currently, we cannot record it for double scan.
3010 * Because it may cause the LFSCK tracing file to be too large. */
3011 if (rc == -ENODATA) {
3012 if (S_ISDIR(lfsck_object_type(obj)))
3015 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3019 if (la->la_nlink > 1)
3020 rc = lfsck_namespace_trace_update(env, com, fid,
3021 LNTF_CHECK_LINKEA, true);
3029 /* Record multiple-linked object. */
3030 if (ldata.ld_leh->leh_reccount > 1) {
3031 rc = lfsck_namespace_trace_update(env, com, fid,
3032 LNTF_CHECK_LINKEA, true);
3037 linkea_first_entry(&ldata);
3038 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
3039 if (!fid_is_sane(pfid)) {
3040 rc = lfsck_namespace_trace_update(env, com, fid,
3041 LNTF_CHECK_PARENT, true);
3043 fld_range_set_mdt(range);
3044 rc = fld_local_lookup(env, ss->ss_server_fld,
3045 fid_seq(pfid), range);
3046 if ((rc == -ENOENT) ||
3047 (rc == 0 && range->lsr_index != idx)) {
3048 rc = lfsck_namespace_trace_update(env, com, fid,
3049 LNTF_CHECK_LINKEA, true);
3051 if (S_ISDIR(lfsck_object_type(obj)))
3054 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3058 if (la->la_nlink > 1)
3059 rc = lfsck_namespace_trace_update(env, com,
3060 fid, LNTF_CHECK_LINKEA, true);
3067 down_write(&com->lc_sem);
3068 com->lc_new_checked++;
3069 if (S_ISDIR(lfsck_object_type(obj)))
3070 ns->ln_dirs_checked++;
3072 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3073 up_write(&com->lc_sem);
3078 static int lfsck_namespace_exec_dir(const struct lu_env *env,
3079 struct lfsck_component *com,
3080 struct lu_dirent *ent, __u16 type)
3082 struct lfsck_assistant_data *lad = com->lc_data;
3083 struct lfsck_namespace_req *lnr;
3084 bool wakeup = false;
3086 lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
3088 struct lfsck_namespace *ns = com->lc_file_ram;
3090 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
3091 return PTR_ERR(lnr);
3094 spin_lock(&lad->lad_lock);
3095 if (lad->lad_assistant_status < 0) {
3096 spin_unlock(&lad->lad_lock);
3097 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
3098 return lad->lad_assistant_status;
3101 list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
3102 if (lad->lad_prefetched == 0)
3105 lad->lad_prefetched++;
3106 spin_unlock(&lad->lad_lock);
3108 wake_up_all(&lad->lad_thread.t_ctl_waitq);
3110 down_write(&com->lc_sem);
3111 com->lc_new_checked++;
3112 up_write(&com->lc_sem);
3117 static int lfsck_namespace_post(const struct lu_env *env,
3118 struct lfsck_component *com,
3119 int result, bool init)
3121 struct lfsck_instance *lfsck = com->lc_lfsck;
3122 struct lfsck_namespace *ns = com->lc_file_ram;
3126 lfsck_post_generic(env, com, &result);
3128 down_write(&com->lc_sem);
3129 spin_lock(&lfsck->li_lock);
3131 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
3133 ns->ln_status = LS_SCANNING_PHASE2;
3134 ns->ln_flags |= LF_SCANNED_ONCE;
3135 ns->ln_flags &= ~LF_UPGRADE;
3136 list_del_init(&com->lc_link_dir);
3137 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
3138 } else if (result == 0) {
3139 ns->ln_status = lfsck->li_status;
3140 if (ns->ln_status == 0)
3141 ns->ln_status = LS_STOPPED;
3142 if (ns->ln_status != LS_PAUSED) {
3143 list_del_init(&com->lc_link_dir);
3144 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
3147 ns->ln_status = LS_FAILED;
3148 list_del_init(&com->lc_link_dir);
3149 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
3151 spin_unlock(&lfsck->li_lock);
3154 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3155 HALF_SEC - lfsck->li_time_last_checkpoint);
3156 ns->ln_time_last_checkpoint = cfs_time_current_sec();
3157 ns->ln_items_checked += com->lc_new_checked;
3158 com->lc_new_checked = 0;
3161 rc = lfsck_namespace_store(env, com, false);
3162 up_write(&com->lc_sem);
3164 CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
3165 lfsck_lfsck2name(lfsck), rc);
3171 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
3174 struct lfsck_instance *lfsck = com->lc_lfsck;
3175 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
3176 struct lfsck_namespace *ns = com->lc_file_ram;
3179 down_read(&com->lc_sem);
3180 seq_printf(m, "name: lfsck_namespace\n"
3186 lfsck_status2names(ns->ln_status));
3188 rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
3192 rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
3196 rc = lfsck_time_dump(m, ns->ln_time_last_complete,
3197 "time_since_last_completed");
3201 rc = lfsck_time_dump(m, ns->ln_time_latest_start,
3202 "time_since_latest_start");
3206 rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
3207 "time_since_last_checkpoint");
3211 rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
3212 "latest_start_position");
3216 rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
3217 "last_checkpoint_position");
3221 rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
3222 "first_failure_position");
3226 if (ns->ln_status == LS_SCANNING_PHASE1) {
3227 struct lfsck_position pos;
3228 const struct dt_it_ops *iops;
3229 cfs_duration_t duration = cfs_time_current() -
3230 lfsck->li_time_last_checkpoint;
3231 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
3232 __u64 speed = checked;
3233 __u64 new_checked = com->lc_new_checked * HZ;
3234 __u32 rtime = ns->ln_run_time_phase1 +
3235 cfs_duration_sec(duration + HALF_SEC);
3238 do_div(new_checked, duration);
3240 do_div(speed, rtime);
3241 lfsck_namespace_dump_statistics(m, ns, checked,
3242 ns->ln_objs_checked_phase2,
3243 rtime, ns->ln_run_time_phase2);
3245 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
3246 "average_speed_phase2: N/A\n"
3247 "real_time_speed_phase1: "LPU64" items/sec\n"
3248 "real_time_speed_phase2: N/A\n",
3252 LASSERT(lfsck->li_di_oit != NULL);
3254 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
3256 /* The low layer otable-based iteration position may NOT
3257 * exactly match the namespace-based directory traversal
3258 * cookie. Generally, it is not a serious issue. But the
3259 * caller should NOT make assumption on that. */
3260 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
3261 if (!lfsck->li_current_oit_processed)
3262 pos.lp_oit_cookie--;
3264 spin_lock(&lfsck->li_lock);
3265 if (lfsck->li_di_dir != NULL) {
3266 pos.lp_dir_cookie = lfsck->li_cookie_dir;
3267 if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
3268 fid_zero(&pos.lp_dir_parent);
3269 pos.lp_dir_cookie = 0;
3272 *lfsck_dto2fid(lfsck->li_obj_dir);
3275 fid_zero(&pos.lp_dir_parent);
3276 pos.lp_dir_cookie = 0;
3278 spin_unlock(&lfsck->li_lock);
3279 lfsck_pos_dump(m, &pos, "current_position");
3280 } else if (ns->ln_status == LS_SCANNING_PHASE2) {
3281 cfs_duration_t duration = cfs_time_current() -
3282 lfsck->li_time_last_checkpoint;
3283 __u64 checked = ns->ln_objs_checked_phase2 +
3284 com->lc_new_checked;
3285 __u64 speed1 = ns->ln_items_checked;
3286 __u64 speed2 = checked;
3287 __u64 new_checked = com->lc_new_checked * HZ;
3288 __u32 rtime = ns->ln_run_time_phase2 +
3289 cfs_duration_sec(duration + HALF_SEC);
3292 do_div(new_checked, duration);
3293 if (ns->ln_run_time_phase1 != 0)
3294 do_div(speed1, ns->ln_run_time_phase1);
3296 do_div(speed2, rtime);
3297 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
3299 ns->ln_run_time_phase1, rtime);
3301 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
3302 "average_speed_phase2: "LPU64" objs/sec\n"
3303 "real_time_speed_phase1: N/A\n"
3304 "real_time_speed_phase2: "LPU64" objs/sec\n"
3305 "current_position: "DFID"\n",
3309 PFID(&ns->ln_fid_latest_scanned_phase2));
3311 __u64 speed1 = ns->ln_items_checked;
3312 __u64 speed2 = ns->ln_objs_checked_phase2;
3314 if (ns->ln_run_time_phase1 != 0)
3315 do_div(speed1, ns->ln_run_time_phase1);
3316 if (ns->ln_run_time_phase2 != 0)
3317 do_div(speed2, ns->ln_run_time_phase2);
3318 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
3319 ns->ln_objs_checked_phase2,
3320 ns->ln_run_time_phase1,
3321 ns->ln_run_time_phase2);
3323 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
3324 "average_speed_phase2: "LPU64" objs/sec\n"
3325 "real_time_speed_phase1: N/A\n"
3326 "real_time_speed_phase2: N/A\n"
3327 "current_position: N/A\n",
3332 up_read(&com->lc_sem);
3336 static int lfsck_namespace_double_scan(const struct lu_env *env,
3337 struct lfsck_component *com)
3339 struct lfsck_namespace *ns = com->lc_file_ram;
3341 return lfsck_double_scan_generic(env, com, ns->ln_status);
3344 static void lfsck_namespace_data_release(const struct lu_env *env,
3345 struct lfsck_component *com)
3347 struct lfsck_assistant_data *lad = com->lc_data;
3348 struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
3349 struct lfsck_tgt_desc *ltd;
3350 struct lfsck_tgt_desc *next;
3352 LASSERT(lad != NULL);
3353 LASSERT(thread_is_init(&lad->lad_thread) ||
3354 thread_is_stopped(&lad->lad_thread));
3355 LASSERT(list_empty(&lad->lad_req_list));
3357 com->lc_data = NULL;
3359 spin_lock(<ds->ltd_lock);
3360 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
3361 ltd_namespace_phase_list) {
3362 list_del_init(<d->ltd_namespace_phase_list);
3364 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
3365 ltd_namespace_phase_list) {
3366 list_del_init(<d->ltd_namespace_phase_list);
3368 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
3369 ltd_namespace_list) {
3370 list_del_init(<d->ltd_namespace_list);
3372 spin_unlock(<ds->ltd_lock);
3374 CFS_FREE_BITMAP(lad->lad_bitmap);
3379 static int lfsck_namespace_in_notify(const struct lu_env *env,
3380 struct lfsck_component *com,
3381 struct lfsck_request *lr)
3383 struct lfsck_instance *lfsck = com->lc_lfsck;
3384 struct lfsck_namespace *ns = com->lc_file_ram;
3385 struct lfsck_assistant_data *lad = com->lc_data;
3386 struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
3387 struct lfsck_tgt_desc *ltd;
3391 if (lr->lr_event != LE_PHASE1_DONE &&
3392 lr->lr_event != LE_PHASE2_DONE &&
3393 lr->lr_event != LE_PEER_EXIT)
3396 CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
3397 "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
3398 lr->lr_index, lr->lr_status);
3400 spin_lock(<ds->ltd_lock);
3401 ltd = LTD_TGT(ltds, lr->lr_index);
3403 spin_unlock(<ds->ltd_lock);
3408 list_del_init(<d->ltd_namespace_phase_list);
3409 switch (lr->lr_event) {
3410 case LE_PHASE1_DONE:
3411 if (lr->lr_status <= 0) {
3412 ltd->ltd_namespace_done = 1;
3413 list_del_init(<d->ltd_namespace_list);
3414 CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
3415 "phase1 for namespace LFSCK: rc = %d.\n",
3416 lfsck_lfsck2name(lfsck),
3417 ltd->ltd_index, lr->lr_status);
3418 ns->ln_flags |= LF_INCOMPLETE;
3423 if (list_empty(<d->ltd_namespace_list))
3424 list_add_tail(<d->ltd_namespace_list,
3425 &lad->lad_mdt_list);
3426 list_add_tail(<d->ltd_namespace_phase_list,
3427 &lad->lad_mdt_phase2_list);
3429 case LE_PHASE2_DONE:
3430 ltd->ltd_namespace_done = 1;
3431 list_del_init(<d->ltd_namespace_list);
3435 ltd->ltd_namespace_done = 1;
3436 list_del_init(<d->ltd_namespace_list);
3437 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
3439 "%s: the peer MDT %x exit namespace LFSCK\n",
3440 lfsck_lfsck2name(lfsck), ltd->ltd_index);
3441 ns->ln_flags |= LF_INCOMPLETE;
3447 spin_unlock(<ds->ltd_lock);
3449 if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
3450 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3452 memset(stop, 0, sizeof(*stop));
3453 stop->ls_status = lr->lr_status;
3454 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3455 lfsck_stop(env, lfsck->li_bottom, stop);
3456 } else if (lfsck_phase2_next_ready(lad)) {
3457 wake_up_all(&lad->lad_thread.t_ctl_waitq);
3463 static int lfsck_namespace_query(const struct lu_env *env,
3464 struct lfsck_component *com)
3466 struct lfsck_namespace *ns = com->lc_file_ram;
3468 return ns->ln_status;
3471 static struct lfsck_operations lfsck_namespace_ops = {
3472 .lfsck_reset = lfsck_namespace_reset,
3473 .lfsck_fail = lfsck_namespace_fail,
3474 .lfsck_checkpoint = lfsck_namespace_checkpoint,
3475 .lfsck_prep = lfsck_namespace_prep,
3476 .lfsck_exec_oit = lfsck_namespace_exec_oit,
3477 .lfsck_exec_dir = lfsck_namespace_exec_dir,
3478 .lfsck_post = lfsck_namespace_post,
3479 .lfsck_dump = lfsck_namespace_dump,
3480 .lfsck_double_scan = lfsck_namespace_double_scan,
3481 .lfsck_data_release = lfsck_namespace_data_release,
3482 .lfsck_quit = lfsck_quit_generic,
3483 .lfsck_in_notify = lfsck_namespace_in_notify,
3484 .lfsck_query = lfsck_namespace_query,
3488 * Repair dangling name entry.
3490 * For the name entry with dangling reference, we need to repare the
3491 * inconsistency according to the LFSCK sponsor's requirement:
3493 * 1) Keep the inconsistency there and report the inconsistency case,
3494 * then give the chance to the application to find related issues,
3495 * and the users can make the decision about how to handle it with
3496 * more human knownledge. (by default)
3498 * 2) Re-create the missed MDT-object with the FID information.
3500 * \param[in] env pointer to the thread context
3501 * \param[in] com pointer to the lfsck component
3502 * \param[in] child pointer to the object corresponding to the dangling
3504 * \param[in] lnr pointer to the namespace request that contains the
3505 * name's name, parent object, parent's LMV, and ect.
3507 * \retval positive number if no need to repair
3508 * \retval zero for repaired successfully
3509 * \retval negative error number on failure
3511 int lfsck_namespace_repair_dangling(const struct lu_env *env,
3512 struct lfsck_component *com,
3513 struct dt_object *child,
3514 struct lfsck_namespace_req *lnr)
3516 struct lfsck_thread_info *info = lfsck_env_info(env);
3517 struct lu_attr *la = &info->lti_la;
3518 struct dt_allocation_hint *hint = &info->lti_hint;
3519 struct dt_object_format *dof = &info->lti_dof;
3520 struct dt_insert_rec *rec = &info->lti_dt_rec;
3521 struct dt_object *parent = lnr->lnr_obj;
3522 const struct lu_name *cname;
3523 struct linkea_data ldata = { 0 };
3524 struct lustre_handle lh = { 0 };
3525 struct lu_buf linkea_buf;
3526 struct lfsck_instance *lfsck = com->lc_lfsck;
3527 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
3528 struct dt_device *dev = lfsck_obj2dt_dev(child);
3529 struct thandle *th = NULL;
3531 __u16 type = lnr->lnr_type;
3535 cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
3536 if (bk->lb_param & LPF_CREATE_MDTOBJ)
3541 if (!create || bk->lb_param & LPF_DRYRUN)
3544 rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
3548 rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
3552 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
3553 MDS_INODELOCK_UPDATE, LCK_EX);
3557 rc = lfsck_namespace_check_exist(env, parent, child, lnr->lnr_name);
3561 th = dt_trans_create(env, dev);
3563 GOTO(log, rc = PTR_ERR(th));
3565 /* Set the ctime as zero, then others can know it is created for
3566 * repairing dangling name entry by LFSCK. And if the LFSCK made
3567 * wrong decision and the real MDT-object has been found later,
3568 * then the LFSCK has chance to fix the incosistency properly. */
3569 memset(la, 0, sizeof(*la));
3570 la->la_mode = (type & S_IFMT) | 0600;
3571 la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
3572 LA_ATIME | LA_MTIME | LA_CTIME;
3574 child->do_ops->do_ah_init(env, hint, parent, child,
3575 la->la_mode & S_IFMT);
3577 memset(dof, 0, sizeof(*dof));
3578 dof->dof_type = dt_mode_to_dft(type);
3579 /* If the target is a regular file, then the LFSCK will only create
3580 * the MDT-object without stripes (dof->dof_reg.striped = 0). related
3581 * OST-objects will be created when write open. */
3583 /* 1a. create child. */
3584 rc = dt_declare_create(env, child, la, hint, dof, th);
3588 if (S_ISDIR(type)) {
3589 if (unlikely(!dt_try_as_dir(env, child)))
3590 GOTO(stop, rc = -ENOTDIR);
3592 /* 2a. insert dot into child dir */
3593 rec->rec_type = S_IFDIR;
3594 rec->rec_fid = lfsck_dto2fid(child);
3595 rc = dt_declare_insert(env, child,
3596 (const struct dt_rec *)rec,
3597 (const struct dt_key *)dot, th);
3601 /* 3a. insert dotdot into child dir */
3602 rec->rec_fid = lfsck_dto2fid(parent);
3603 rc = dt_declare_insert(env, child,
3604 (const struct dt_rec *)rec,
3605 (const struct dt_key *)dotdot, th);
3609 /* 4a. increase child nlink */
3610 rc = dt_declare_ref_add(env, child, th);
3615 /* 5a. insert linkEA for child */
3616 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
3617 ldata.ld_leh->leh_len);
3618 rc = dt_declare_xattr_set(env, child, &linkea_buf,
3619 XATTR_NAME_LINK, 0, th);
3623 rc = dt_trans_start(env, dev, th);
3625 GOTO(stop, rc = (rc == -EEXIST ? 1 : rc));
3627 dt_write_lock(env, child, 0);
3628 /* 1b. create child */
3629 rc = dt_create(env, child, la, hint, dof, th);
3631 GOTO(unlock, rc = (rc == -EEXIST ? 1 : rc));
3633 if (S_ISDIR(type)) {
3634 if (unlikely(!dt_try_as_dir(env, child)))
3635 GOTO(unlock, rc = -ENOTDIR);
3637 /* 2b. insert dot into child dir */
3638 rec->rec_type = S_IFDIR;
3639 rec->rec_fid = lfsck_dto2fid(child);
3640 rc = dt_insert(env, child, (const struct dt_rec *)rec,
3641 (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
3645 /* 3b. insert dotdot into child dir */
3646 rec->rec_fid = lfsck_dto2fid(parent);
3647 rc = dt_insert(env, child, (const struct dt_rec *)rec,
3648 (const struct dt_key *)dotdot, th,
3653 /* 4b. increase child nlink */
3654 rc = dt_ref_add(env, child, th);
3659 /* 5b. insert linkEA for child. */
3660 rc = dt_xattr_set(env, child, &linkea_buf,
3661 XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
3666 dt_write_unlock(env, child);
3669 dt_trans_stop(env, dev, th);
3672 lfsck_ibits_unlock(&lh, LCK_EX);
3673 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling "
3674 "reference for: parent "DFID", child "DFID", type %u, "
3675 "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck),
3676 PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
3677 type, cname->ln_name,
3678 create ? "Create the lost OST-object as required" :
3679 "Keep the MDT-object there by default", rc);
3682 struct lfsck_namespace *ns = com->lc_file_ram;
3684 ns->ln_flags |= LF_INCONSISTENT;
3690 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
3691 struct lfsck_component *com,
3692 struct lfsck_assistant_req *lar)
3694 struct lfsck_thread_info *info = lfsck_env_info(env);
3695 struct lu_attr *la = &info->lti_la;
3696 struct lfsck_instance *lfsck = com->lc_lfsck;
3697 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
3698 struct lfsck_namespace *ns = com->lc_file_ram;
3699 struct linkea_data ldata = { 0 };
3700 const struct lu_name *cname;
3701 struct thandle *handle = NULL;
3702 struct lfsck_namespace_req *lnr =
3703 container_of0(lar, struct lfsck_namespace_req, lnr_lar);
3704 struct dt_object *dir = lnr->lnr_obj;
3705 struct dt_object *obj = NULL;
3706 const struct lu_fid *pfid = lfsck_dto2fid(dir);
3707 struct dt_device *dev;
3708 struct lustre_handle lh = { 0 };
3709 bool repaired = false;
3710 bool dtlocked = false;
3717 enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
3720 if (lnr->lnr_attr & LUDA_UPGRADE) {
3721 ns->ln_flags |= LF_UPGRADE;
3722 ns->ln_dirent_repaired++;
3724 } else if (lnr->lnr_attr & LUDA_REPAIR) {
3725 ns->ln_flags |= LF_INCONSISTENT;
3726 ns->ln_dirent_repaired++;
3730 if (unlikely(fid_is_zero(&lnr->lnr_fid))) {
3731 if (strcmp(lnr->lnr_name, dotdot) != 0)
3734 rc = lfsck_namespace_trace_update(env, com, pfid,
3735 LNTF_CHECK_PARENT, true);
3740 if (lnr->lnr_name[0] == '.' &&
3741 (lnr->lnr_namelen == 1 || fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
3744 idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
3746 GOTO(out, rc = idx);
3748 if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
3749 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
3752 dev = lfsck->li_next;
3754 struct lfsck_tgt_desc *ltd;
3756 /* Usually, some local filesystem consistency verification
3757 * tools can guarantee the local namespace tree consistenct.
3758 * So the LFSCK will only verify the remote directory. */
3759 if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
3760 rc = lfsck_namespace_trace_update(env, com, pfid,
3761 LNTF_CHECK_PARENT, true);
3766 ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
3767 if (unlikely(ltd == NULL)) {
3768 CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
3769 "did not join the namespace LFSCK\n",
3770 lfsck_lfsck2name(lfsck), idx);
3771 ns->ln_flags |= LF_INCOMPLETE;
3773 GOTO(out, rc = -ENODEV);
3779 obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
3781 GOTO(out, rc = PTR_ERR(obj));
3783 cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
3784 if (dt_object_exists(obj) == 0) {
3787 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
3789 type = LNIT_DANGLING;
3790 rc = lfsck_namespace_repair_dangling(env, com,
3799 if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
3802 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
3803 MDS_INODELOCK_UPDATE |
3804 MDS_INODELOCK_XATTR, LCK_EX);
3808 handle = dt_trans_create(env, dev);
3810 GOTO(out, rc = PTR_ERR(handle));
3812 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
3816 rc = dt_trans_start(env, dev, handle);
3820 dt_write_lock(env, obj, 0);
3824 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
3828 rc = lfsck_links_read(env, obj, &ldata);
3829 if (unlikely(rc == -ENOENT)) {
3830 if (handle != NULL) {
3831 dt_write_unlock(env, obj);
3834 dt_trans_stop(env, dev, handle);
3837 lfsck_ibits_unlock(&lh, LCK_EX);
3840 /* It may happen when the remote object has been removed,
3841 * but the local MDT is not aware of that. */
3843 } else if (rc == 0) {
3844 count = ldata.ld_leh->leh_reccount;
3845 rc = linkea_links_find(&ldata, cname, pfid);
3847 (count == 1 || !S_ISDIR(lfsck_object_type(obj)))) {
3848 if ((lfsck_object_type(obj) & S_IFMT) !=
3850 ns->ln_flags |= LF_INCONSISTENT;
3851 type = LNIT_BAD_TYPE;
3857 ns->ln_flags |= LF_INCONSISTENT;
3859 /* If the file type stored in the name entry does not match
3860 * the file type claimed by the object, and the object does
3861 * not recognize the name entry, then it is quite possible
3862 * that the name entry is corrupted. */
3863 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) {
3864 type = LNIT_BAD_DIRENT;
3869 /* For sub-dir object, we cannot make sure whether the sub-dir
3870 * back references the parent via ".." name entry correctly or
3871 * not in the LFSCK first-stage scanning. It may be that the
3872 * (remote) sub-dir ".." name entry has no parent FID after
3873 * file-level backup/restore and its linkEA may be wrong.
3874 * So under such case, we should replace the linkEA according
3875 * to current name entry. But this needs to be done during the
3876 * LFSCK second-stage scanning. The LFSCK will record the name
3877 * entry for further possible using. */
3881 } else if (unlikely(rc == -EINVAL)) {
3882 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type)
3883 type = LNIT_BAD_TYPE;
3886 ns->ln_flags |= LF_INCONSISTENT;
3887 /* The magic crashed, we are not sure whether there are more
3888 * corrupt data in the linkea, so remove all linkea entries. */
3892 } else if (rc == -ENODATA) {
3893 if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type)
3894 type = LNIT_BAD_TYPE;
3897 ns->ln_flags |= LF_UPGRADE;
3902 if (bk->lb_param & LPF_DRYRUN) {
3903 ns->ln_linkea_repaired++;
3909 if (!lustre_handle_is_used(&lh))
3915 rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
3922 rc = linkea_data_new(&ldata,
3923 &lfsck_env_info(env)->lti_linkea_buf);
3928 rc = linkea_add_buf(&ldata, cname, pfid);
3932 rc = lfsck_links_write(env, obj, &ldata, handle);
3936 count = ldata.ld_leh->leh_reccount;
3937 if (!S_ISDIR(lfsck_object_type(obj)) ||
3938 !dt_object_remote(obj)) {
3939 ns->ln_linkea_repaired++;
3950 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
3954 if ((count == 1 && la->la_nlink == 1) ||
3955 S_ISDIR(lfsck_object_type(obj)))
3956 /* Usually, it is for single linked object or dir, do nothing.*/
3959 /* Following modification will be in another transaction. */
3960 if (handle != NULL) {
3961 dt_write_unlock(env, obj);
3964 dt_trans_stop(env, dev, handle);
3967 lfsck_ibits_unlock(&lh, LCK_EX);
3970 ns->ln_mul_linked_checked++;
3971 rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
3972 LNTF_CHECK_LINKEA, true);
3978 dt_write_unlock(env, obj);
3980 if (handle != NULL && !IS_ERR(handle))
3981 dt_trans_stop(env, dev, handle);
3984 lfsck_ibits_unlock(&lh, LCK_EX);
3990 rc = lfsck_namespace_repair_dirent(env, com, dir,
3991 obj, lnr->lnr_name, lnr->lnr_name,
3992 lnr->lnr_type, true, false);
3996 case LNIT_BAD_DIRENT:
3998 /* XXX: This is a bad dirent, we do not know whether
3999 * the original name entry reference a regular
4000 * file or a directory, then keep the parent's
4001 * nlink count unchanged here. */
4002 rc = lfsck_namespace_repair_dirent(env, com, dir,
4003 obj, lnr->lnr_name, lnr->lnr_name,
4004 lnr->lnr_type, false, false);
4013 down_write(&com->lc_sem);
4015 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
4016 "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
4017 lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
4018 PFID(lfsck_dto2fid(lnr->lnr_obj)),
4019 lnr->lnr_namelen, lnr->lnr_name, rc);
4021 lfsck_namespace_record_failure(env, lfsck, ns);
4022 if (!(bk->lb_param & LPF_FAILOUT))
4026 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
4027 "repaired the entry: "DFID", parent "DFID
4028 ", name %.*s\n", lfsck_lfsck2name(lfsck),
4029 PFID(&lnr->lnr_fid),
4030 PFID(lfsck_dto2fid(lnr->lnr_obj)),
4031 lnr->lnr_namelen, lnr->lnr_name);
4034 ns->ln_items_repaired++;
4038 ns->ln_dangling_repaired++;
4041 ns->ln_bad_type_repaired++;
4043 case LNIT_BAD_DIRENT:
4044 ns->ln_dirent_repaired++;
4050 if (bk->lb_param & LPF_DRYRUN &&
4051 lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
4052 lfsck_pos_fill(env, lfsck,
4053 &ns->ln_pos_first_inconsistent,
4059 up_write(&com->lc_sem);
4061 if (obj != NULL && !IS_ERR(obj))
4062 lfsck_object_put(env, obj);
4066 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
4067 struct lfsck_component *com)
4069 struct lfsck_instance *lfsck = com->lc_lfsck;
4070 struct ptlrpc_thread *thread = &lfsck->li_thread;
4071 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4072 struct lfsck_namespace *ns = com->lc_file_ram;
4073 struct dt_object *obj = com->lc_obj;
4074 const struct dt_it_ops *iops = &obj->do_index_ops->dio_it;
4075 struct dt_object *target;
4083 CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
4084 lfsck_lfsck2name(lfsck));
4086 com->lc_new_checked = 0;
4087 com->lc_new_scanned = 0;
4088 com->lc_time_last_checkpoint = cfs_time_current();
4089 com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
4090 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
4092 di = iops->init(env, obj, 0, BYPASS_CAPA);
4094 RETURN(PTR_ERR(di));
4096 fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
4097 rc = iops->get(env, di, (const struct dt_key *)&fid);
4101 /* Skip the start one, which either has been processed or non-exist. */
4102 rc = iops->next(env, di);
4107 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
4109 struct l_wait_info lwi;
4111 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
4113 l_wait_event(thread->t_ctl_waitq,
4114 !thread_is_running(thread),
4117 if (unlikely(!thread_is_running(thread)))
4121 key = iops->key(env, di);
4122 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
4123 if (!fid_is_sane(&fid)) {
4128 target = lfsck_object_find(env, lfsck, &fid);
4129 if (IS_ERR(target)) {
4130 rc = PTR_ERR(target);
4134 if (dt_object_exists(target)) {
4135 rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
4137 rc = lfsck_namespace_double_scan_one(env, com,
4144 lfsck_object_put(env, target);
4147 down_write(&com->lc_sem);
4148 com->lc_new_checked++;
4149 com->lc_new_scanned++;
4150 ns->ln_fid_latest_scanned_phase2 = fid;
4152 ns->ln_objs_repaired_phase2++;
4154 ns->ln_objs_failed_phase2++;
4155 up_write(&com->lc_sem);
4157 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4160 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
4161 cfs_time_current())) &&
4162 com->lc_new_checked != 0) {
4163 down_write(&com->lc_sem);
4164 ns->ln_run_time_phase2 +=
4165 cfs_duration_sec(cfs_time_current() +
4166 HALF_SEC - com->lc_time_last_checkpoint);
4167 ns->ln_time_last_checkpoint = cfs_time_current_sec();
4168 ns->ln_objs_checked_phase2 += com->lc_new_checked;
4169 com->lc_new_checked = 0;
4170 rc = lfsck_namespace_store(env, com, false);
4171 up_write(&com->lc_sem);
4175 com->lc_time_last_checkpoint = cfs_time_current();
4176 com->lc_time_next_checkpoint =
4177 com->lc_time_last_checkpoint +
4178 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
4181 lfsck_control_speed_by_self(com);
4182 if (unlikely(!thread_is_running(thread)))
4185 rc = iops->next(env, di);
4194 iops->fini(env, di);
4196 CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n",
4197 lfsck_lfsck2name(lfsck), rc);
4202 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
4203 struct lfsck_component *com,
4204 struct lfsck_position *pos)
4206 struct lfsck_assistant_data *lad = com->lc_data;
4207 struct lfsck_namespace_req *lnr;
4209 if (list_empty(&lad->lad_req_list))
4212 lnr = list_entry(lad->lad_req_list.next,
4213 struct lfsck_namespace_req,
4215 pos->lp_oit_cookie = lnr->lnr_oit_cookie;
4216 pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
4217 pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
4220 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
4221 struct lfsck_component *com,
4224 struct lfsck_instance *lfsck = com->lc_lfsck;
4225 struct lfsck_namespace *ns = com->lc_file_ram;
4227 down_write(&com->lc_sem);
4228 ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
4229 HALF_SEC - lfsck->li_time_last_checkpoint);
4230 ns->ln_time_last_checkpoint = cfs_time_current_sec();
4231 ns->ln_objs_checked_phase2 += com->lc_new_checked;
4232 com->lc_new_checked = 0;
4235 if (ns->ln_flags & LF_INCOMPLETE)
4236 ns->ln_status = LS_PARTIAL;
4238 ns->ln_status = LS_COMPLETED;
4239 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
4240 ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
4241 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
4242 ns->ln_success_count++;
4243 } else if (rc == 0) {
4244 ns->ln_status = lfsck->li_status;
4245 if (ns->ln_status == 0)
4246 ns->ln_status = LS_STOPPED;
4248 ns->ln_status = LS_FAILED;
4251 rc = lfsck_namespace_store(env, com, false);
4252 up_write(&com->lc_sem);
4257 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
4258 struct lfsck_component *com,
4259 struct lfsck_request *lr)
4264 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
4265 .la_handler_p1 = lfsck_namespace_assistant_handler_p1,
4266 .la_handler_p2 = lfsck_namespace_assistant_handler_p2,
4267 .la_fill_pos = lfsck_namespace_assistant_fill_pos,
4268 .la_double_scan_result = lfsck_namespace_double_scan_result,
4269 .la_req_fini = lfsck_namespace_assistant_req_fini,
4270 .la_sync_failures = lfsck_namespace_assistant_sync_failures,
4274 * Verify the specified linkEA entry for the given directory object.
4275 * If the object has no such linkEA entry or it has more other linkEA
4276 * entries, then re-generate the linkEA with the given information.
4278 * \param[in] env pointer to the thread context
4279 * \param[in] dev pointer to the dt_device
4280 * \param[in] obj pointer to the dt_object to be handled
4281 * \param[in] cname the name for the child in the parent directory
4282 * \param[in] pfid the parent directory's FID for the linkEA
4284 * \retval 0 for success
4285 * \retval negative error number on failure
4287 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
4288 struct dt_object *obj, const struct lu_name *cname,
4289 const struct lu_fid *pfid)
4291 struct linkea_data ldata = { 0 };
4292 struct lu_buf linkea_buf;
4295 int fl = LU_XATTR_CREATE;
4299 LASSERT(S_ISDIR(lfsck_object_type(obj)));
4301 rc = lfsck_links_read(env, obj, &ldata);
4302 if (rc == -ENODATA) {
4304 } else if (rc == 0) {
4305 fl = LU_XATTR_REPLACE;
4306 if (ldata.ld_leh->leh_reccount != 1) {
4309 rc = linkea_links_find(&ldata, cname, pfid);
4318 rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
4322 rc = linkea_add_buf(&ldata, cname, pfid);
4326 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
4327 ldata.ld_leh->leh_len);
4328 th = dt_trans_create(env, dev);
4330 RETURN(PTR_ERR(th));
4332 rc = dt_declare_xattr_set(env, obj, &linkea_buf,
4333 XATTR_NAME_LINK, fl, th);
4337 rc = dt_trans_start_local(env, dev, th);
4341 dt_write_lock(env, obj, 0);
4342 rc = dt_xattr_set(env, obj, &linkea_buf,
4343 XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
4344 dt_write_unlock(env, obj);
4349 dt_trans_stop(env, dev, th);
4354 * Get the name and parent directory's FID from the first linkEA entry.
4356 * \param[in] env pointer to the thread context
4357 * \param[in] obj pointer to the object which get linkEA from
4358 * \param[out] name pointer to the buffer to hold the name
4359 * in the first linkEA entry
4360 * \param[out] pfid pointer to the buffer to hold the parent
4361 * directory's FID in the first linkEA entry
4363 * \retval 0 for success
4364 * \retval negative error number on failure
4366 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
4367 char *name, struct lu_fid *pfid)
4369 struct lu_name *cname = &lfsck_env_info(env)->lti_name;
4370 struct linkea_data ldata = { 0 };
4373 rc = lfsck_links_read(env, obj, &ldata);
4377 linkea_first_entry(&ldata);
4378 if (ldata.ld_lee == NULL)
4381 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
4382 /* To guarantee the 'name' is terminated with '0'. */
4383 memcpy(name, cname->ln_name, cname->ln_namelen);
4384 name[cname->ln_namelen] = 0;
4390 * Remove the name entry from the parent directory.
4392 * No need to care about the object referenced by the name entry,
4393 * either the name entry is invalid or redundant, or the referenced
4394 * object has been processed has been or will be handled by others.
4396 * \param[in] env pointer to the thread context
4397 * \param[in] lfsck pointer to the lfsck instance
4398 * \param[in] parent pointer to the lost+found object
4399 * \param[in] name the name for the name entry to be removed
4400 * \param[in] type the type for the name entry to be removed
4402 * \retval 0 for success
4403 * \retval negative error number on failure
4405 int lfsck_remove_name_entry(const struct lu_env *env,
4406 struct lfsck_instance *lfsck,
4407 struct dt_object *parent,
4408 const char *name, __u32 type)
4410 struct dt_device *dev = lfsck->li_next;
4412 struct lustre_handle lh = { 0 };
4416 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
4417 MDS_INODELOCK_UPDATE, LCK_EX);
4421 th = dt_trans_create(env, dev);
4423 GOTO(unlock, rc = PTR_ERR(th));
4425 rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
4429 if (S_ISDIR(type)) {
4430 rc = dt_declare_ref_del(env, parent, th);
4435 rc = dt_trans_start(env, dev, th);
4439 rc = dt_delete(env, parent, (const struct dt_key *)name, th,
4444 if (S_ISDIR(type)) {
4445 dt_write_lock(env, parent, 0);
4446 rc = dt_ref_del(env, parent, th);
4447 dt_write_unlock(env, parent);
4453 dt_trans_stop(env, dev, th);
4456 lfsck_ibits_unlock(&lh, LCK_EX);
4458 CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
4459 "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
4460 PFID(lfsck_dto2fid(parent)), name, type, rc);
4466 * Update the object's name entry with the given FID.
4468 * \param[in] env pointer to the thread context
4469 * \param[in] lfsck pointer to the lfsck instance
4470 * \param[in] parent pointer to the parent directory that holds
4472 * \param[in] name the name for the entry to be updated
4473 * \param[in] pfid the new PFID for the name entry
4474 * \param[in] type the type for the name entry to be updated
4476 * \retval 0 for success
4477 * \retval negative error number on failure
4479 int lfsck_update_name_entry(const struct lu_env *env,
4480 struct lfsck_instance *lfsck,
4481 struct dt_object *parent, const char *name,
4482 const struct lu_fid *pfid, __u32 type)
4484 struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec;
4485 struct dt_device *dev = lfsck->li_next;
4486 struct lustre_handle lh = { 0 };
4492 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
4493 MDS_INODELOCK_UPDATE, LCK_EX);
4497 th = dt_trans_create(env, dev);
4499 GOTO(unlock, rc = PTR_ERR(th));
4501 rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
4505 rec->rec_type = type;
4506 rec->rec_fid = pfid;
4507 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
4508 (const struct dt_key *)name, th);
4512 rc = dt_declare_ref_add(env, parent, th);
4516 rc = dt_trans_start(env, dev, th);
4520 rc = dt_delete(env, parent, (const struct dt_key *)name, th,
4522 if (rc == -ENOENT) {
4530 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
4531 (const struct dt_key *)name, th, BYPASS_CAPA, 1);
4532 if (rc == 0 && S_ISDIR(type) && !exists) {
4533 dt_write_lock(env, parent, 0);
4534 rc = dt_ref_add(env, parent, th);
4535 dt_write_unlock(env, parent);
4541 dt_trans_stop(env, dev, th);
4544 lfsck_ibits_unlock(&lh, LCK_EX);
4546 CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
4547 " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
4548 PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
4553 int lfsck_namespace_setup(const struct lu_env *env,
4554 struct lfsck_instance *lfsck)
4556 struct lfsck_component *com;
4557 struct lfsck_namespace *ns;
4558 struct dt_object *root = NULL;
4559 struct dt_object *obj;
4563 LASSERT(lfsck->li_master);
4569 INIT_LIST_HEAD(&com->lc_link);
4570 INIT_LIST_HEAD(&com->lc_link_dir);
4571 init_rwsem(&com->lc_sem);
4572 atomic_set(&com->lc_ref, 1);
4573 com->lc_lfsck = lfsck;
4574 com->lc_type = LFSCK_TYPE_NAMESPACE;
4575 com->lc_ops = &lfsck_namespace_ops;
4576 com->lc_data = lfsck_assistant_data_init(
4577 &lfsck_namespace_assistant_ops,
4579 if (com->lc_data == NULL)
4580 GOTO(out, rc = -ENOMEM);
4582 com->lc_file_size = sizeof(struct lfsck_namespace);
4583 OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
4584 if (com->lc_file_ram == NULL)
4585 GOTO(out, rc = -ENOMEM);
4587 OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
4588 if (com->lc_file_disk == NULL)
4589 GOTO(out, rc = -ENOMEM);
4591 root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
4593 GOTO(out, rc = PTR_ERR(root));
4595 if (unlikely(!dt_try_as_dir(env, root)))
4596 GOTO(out, rc = -ENOTDIR);
4598 obj = local_index_find_or_create(env, lfsck->li_los, root,
4599 lfsck_namespace_name,
4600 S_IFREG | S_IRUGO | S_IWUSR,
4601 &dt_lfsck_features);
4603 GOTO(out, rc = PTR_ERR(obj));
4606 rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
4610 rc = lfsck_namespace_load(env, com);
4612 rc = lfsck_namespace_reset(env, com, true);
4613 else if (rc == -ENODATA)
4614 rc = lfsck_namespace_init(env, com);
4618 ns = com->lc_file_ram;
4619 switch (ns->ln_status) {
4624 spin_lock(&lfsck->li_lock);
4625 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4626 spin_unlock(&lfsck->li_lock);
4629 CERROR("%s: unknown lfsck_namespace status %d\n",
4630 lfsck_lfsck2name(lfsck), ns->ln_status);
4632 case LS_SCANNING_PHASE1:
4633 case LS_SCANNING_PHASE2:
4634 /* No need to store the status to disk right now.
4635 * If the system crashed before the status stored,
4636 * it will be loaded back when next time. */
4637 ns->ln_status = LS_CRASHED;
4641 spin_lock(&lfsck->li_lock);
4642 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
4643 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
4644 spin_unlock(&lfsck->li_lock);
4651 if (root != NULL && !IS_ERR(root))
4652 lu_object_put(env, &root->do_lu);
4654 lfsck_component_cleanup(env, com);
4655 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
4656 lfsck_lfsck2name(lfsck), rc);