X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_namespace.c;h=c43c64d76fa3aa5518597ad014e15dcbb5eb93d0;hp=75c939e37839652ae064af35e46f27c14e171726;hb=048a8740ae26e3406a7eab3bca383a90490cef93;hpb=e240fb5099af8e62c532d314317095800ebb6864 diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 75c939e..c43c64d 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -164,6 +164,9 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst, dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped); dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed); dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size); + dst->ln_time_latest_reset = le32_to_cpu(src->ln_time_latest_reset); + dst->ln_linkea_overflow_cleared = + le64_to_cpu(src->ln_linkea_overflow_cleared); } static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, @@ -233,6 +236,9 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst, dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped); dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed); dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size); + dst->ln_time_latest_reset = cpu_to_le32(src->ln_time_latest_reset); + dst->ln_linkea_overflow_cleared = + cpu_to_le64(src->ln_linkea_overflow_cleared); } static void lfsck_namespace_record_failure(const struct lu_env *env, @@ -508,6 +514,7 @@ static int lfsck_namespace_init(const struct lu_env *env, memset(ns, 0, sizeof(*ns)); ns->ln_magic = LFSCK_NAMESPACE_MAGIC; ns->ln_status = LS_INIT; + ns->ln_time_latest_reset = cfs_time_current_sec(); down_write(&com->lc_sem); rc = lfsck_namespace_store(env, com); up_write(&com->lc_sem); @@ -670,6 +677,11 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env, { int rc; + /* For remote updating LINKEA, there may be further LFSCK action + * on remote MDT after the updating, so update the LINKEA ASAP. */ + if (dt_object_remote(obj)) + handle->th_sync = 1; + /* For destroying all invalid linkEA entries. */ rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle); if (rc == 0) @@ -681,7 +693,7 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env, } int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj, - struct linkea_data *ldata) + struct linkea_data *ldata, bool with_rec) { int rc; @@ -711,8 +723,12 @@ int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj, if (unlikely(rc == 0)) return -ENODATA; - if (rc > 0) - rc = linkea_init(ldata); + if (rc > 0) { + if (with_rec) + rc = linkea_init_with_rec(ldata); + else + rc = linkea_init(ldata); + } return rc; } @@ -787,11 +803,22 @@ log: static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj, struct linkea_data *ldata, struct thandle *handle) { - const struct lu_buf *buf = lfsck_buf_get_const(env, - ldata->ld_buf->lb_buf, - ldata->ld_leh->leh_len); + struct lu_buf buf; + int rc; + + lfsck_buf_init(&buf, ldata->ld_buf->lb_buf, ldata->ld_leh->leh_len); - return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle); +again: + rc = dt_xattr_set(env, obj, &buf, XATTR_NAME_LINK, 0, handle); + if (unlikely(rc == -ENOSPC)) { + rc = linkea_overflow_shrink(ldata); + if (likely(rc > 0)) { + buf.lb_len = rc; + goto again; + } + } + + return rc; } static int lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata, @@ -991,11 +1018,8 @@ again: cname->ln_name = info->lti_key; cname->ln_namelen = namelen; - rc = linkea_data_new(&ldata2, &info->lti_linkea_buf2); - if (rc != 0) - GOTO(log, rc); - - rc = linkea_add_buf(&ldata2, cname, pfid); + rc = linkea_links_new(&ldata2, &info->lti_linkea_buf2, + cname, pfid); if (rc != 0) GOTO(log, rc); @@ -1058,10 +1082,8 @@ again: GOTO(stop, rc); dt_write_lock(env, orphan, 0); - rc = lfsck_links_read2(env, orphan, &ldata2); - if (likely((rc == -ENODATA) || (rc == -EINVAL) || - (rc == 0 && ldata2.ld_leh != NULL && - ldata2.ld_leh->leh_reccount == 0))) { + rc = lfsck_links_read2_with_rec(env, orphan, &ldata2); + if (likely(rc == -ENODATA || rc == -EINVAL)) { if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(unlock, rc = 1); @@ -1395,11 +1417,8 @@ again: memset(dof, 0, sizeof(*dof)); dof->dof_type = dt_mode_to_dft(S_IFDIR); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); - if (rc != 0) - GOTO(unlock1, rc); - - rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf2, + cname, lfsck_dto2fid(parent)); if (rc != 0) GOTO(unlock1, rc); @@ -1587,7 +1606,8 @@ static int lfsck_namespace_shrink_linkea(const struct lu_env *env, else lfsck_namespace_filter_linkea_entry(ldata, cname, pfid, true); - if (ldata->ld_leh->leh_reccount > 0) { + if (ldata->ld_leh->leh_reccount > 0 || + unlikely(ldata->ld_leh->leh_overflow_time)) { lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf, ldata->ld_leh->leh_len); buflen = linkea_buf.lb_len; @@ -1614,10 +1634,9 @@ again: if (unlikely(lfsck_is_dead_obj(obj))) GOTO(unlock2, rc = -ENOENT); - rc = lfsck_links_read2(env, obj, &ldata_new); - if (rc != 0) - GOTO(unlock2, - rc = (rc == -ENODATA ? 0 : rc)); + rc = lfsck_links_read2_with_rec(env, obj, &ldata_new); + if (rc) + GOTO(unlock2, rc = (rc == -ENODATA ? 0 : rc)); /* The specified linkEA entry has been removed by race. */ rc = linkea_links_find(&ldata_new, cname, pfid); @@ -1641,14 +1660,11 @@ again: goto again; } - if (ldata_new.ld_leh->leh_reccount > 0) { - lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf, - ldata_new.ld_leh->leh_len); - rc = dt_xattr_set(env, obj, &linkea_buf, - XATTR_NAME_LINK, 0, th); - } else { + if (ldata_new.ld_leh->leh_reccount > 0 || + unlikely(ldata->ld_leh->leh_overflow_time)) + rc = lfsck_links_write(env, obj, &ldata_new, th); + else rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th); - } GOTO(unlock2, rc = (rc == 0 ? 1 : rc)); @@ -1890,7 +1906,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env, replace: dt_read_lock(env, child, 0); - rc = lfsck_links_read2(env, child, &ldata); + rc = lfsck_links_read2_with_rec(env, child, &ldata); dt_read_unlock(env, child); /* Someone changed the child, no need to replace. */ @@ -2238,11 +2254,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env, LASSERT(!dt_object_remote(obj)); LASSERT(S_ISDIR(lfsck_object_type(obj))); - rc = linkea_data_new(&ldata, &info->lti_big_buf); - if (rc != 0) - GOTO(log, rc); - - rc = linkea_add_buf(&ldata, cname, pfid); + rc = linkea_links_new(&ldata, &info->lti_big_buf, cname, pfid); if (rc != 0) GOTO(log, rc); @@ -2288,8 +2300,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env, if (rc != 0) GOTO(unlock, rc); - rc = dt_xattr_set(env, obj, &linkea_buf, - XATTR_NAME_LINK, 0, th); + rc = lfsck_links_write(env, obj, &ldata, th); GOTO(unlock, rc = (rc == 0 ? 1 : rc)); @@ -2804,11 +2815,8 @@ rebuild: /* It is the most common case that we find the * name entry corresponding to the linkEA entry * that matches the ".." name entry. */ - rc = linkea_data_new(&ldata_new, &info->lti_big_buf); - if (rc != 0) - RETURN(rc); - - rc = linkea_add_buf(&ldata_new, cname, pfid2); + rc = linkea_links_new(&ldata_new, &info->lti_big_buf, + cname, pfid2); if (rc != 0) RETURN(rc); @@ -2882,6 +2890,10 @@ next: lfsck_linkea_del_buf(ldata, cname); } /* while (ldata->ld_lee != NULL) */ + /* If there is still linkEA overflow, return. */ + if (unlikely(ldata->ld_leh->leh_overflow_time)) + RETURN(0); + linkea_first_entry(ldata); if (ldata->ld_leh->leh_reccount == 1) { rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata, @@ -2923,7 +2935,7 @@ next: * * If all the known name entries have been verified, then the object's hard * link attribute should match the object's linkEA entries count unless the - * object's has too much hard link to be recorded in the linkEA. Such cases + * object's has too many hard link to be recorded in the linkEA. Such cases * should have been marked in the LFSCK trace file. Otherwise, trust the * linkEA to update the object's nlink attribute. * @@ -2942,8 +2954,6 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, struct dt_object *obj, struct lu_attr *la) { - struct lfsck_thread_info *info = lfsck_env_info(env); - struct lu_fid *tfid = &info->lti_fid3; struct lfsck_namespace *ns = com->lc_file_ram; struct lfsck_instance *lfsck = com->lc_lfsck; struct dt_device *dev = lfsck_obj2dev(obj); @@ -2952,9 +2962,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, struct linkea_data ldata = { NULL }; struct lustre_handle lh = { 0 }; __u32 old = la->la_nlink; - int idx; int rc = 0; - __u8 flags; ENTRY; LASSERT(!dt_object_remote(obj)); @@ -2988,26 +2996,20 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env, if (ns->ln_flags & LF_INCOMPLETE) GOTO(unlock, rc = 0); - fid_cpu_to_be(tfid, cfid); - idx = lfsck_sub_trace_file_fid2idx(cfid); - rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj, - (struct dt_rec *)&flags, (const struct dt_key *)tfid); - if (rc != 0) - GOTO(unlock, rc); - - if (flags & LNTF_SKIP_NLINK) - GOTO(unlock, rc = 0); - rc = dt_attr_get(env, obj, la); if (rc != 0) GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc)); - rc = lfsck_links_read2(env, obj, &ldata); - if (rc != 0) + rc = lfsck_links_read2_with_rec(env, obj, &ldata); + if (rc) GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc)); - if (la->la_nlink == ldata.ld_leh->leh_reccount || - unlikely(la->la_nlink == 0)) + /* XXX: Currently, we only update the nlink attribute if the known + * linkEA entries is larger than the nlink attribute. That is + * safe action. */ + if (la->la_nlink >= ldata.ld_leh->leh_reccount || + unlikely(la->la_nlink == 0 || + ldata.ld_leh->leh_overflow_time)) GOTO(unlock, rc = 0); la->la_nlink = ldata.ld_leh->leh_reccount; @@ -3284,6 +3286,117 @@ out: return rc; } +#define lfsck_time_before(a, b) \ + (typecheck(__u32, a) && \ + typecheck(__u32, b) && \ + ((int)(a) - (int)(b) < 0)) + +static inline bool +lfsck_namespace_linkea_stale_overflow(struct linkea_data *ldata, + struct lfsck_namespace *ns) +{ + /* Both the leh_overflow_time and ln_time_latest_reset are + * local time based, so need NOT to care about clock drift + * among the servers. */ + return ldata->ld_leh->leh_overflow_time && + lfsck_time_before(ldata->ld_leh->leh_overflow_time, + ns->ln_time_latest_reset); +} + +/** + * Clear the object's linkEA overflow timestamp. + * + * If the MDT-object has too many hard links as to the linkEA cannot hold + * all of them, then overflow timestamp will be set in the linkEA header. + * If some hard links are removed after that, then it is possible to hold + * other missed linkEA entries. If the namespace LFSCK have added all the + * related linkEA entries, then it will remove the overflow timestamp. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] ldata pointer to the linkEA data for the given @obj + * \param[in] obj pointer to the dt_object to be handled + * + * \retval positive number for repaired cases + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ +static int lfsck_namespace_linkea_clear_overflow(const struct lu_env *env, + struct lfsck_component *com, + struct linkea_data *ldata, + struct dt_object *obj) +{ + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck_obj2dev(obj); + struct thandle *th = NULL; + struct lustre_handle lh = { 0 }; + struct lu_buf linkea_buf; + int rc = 0; + ENTRY; + + LASSERT(!dt_object_remote(obj)); + + rc = lfsck_ibits_lock(env, lfsck, obj, &lh, + MDS_INODELOCK_UPDATE, LCK_PW); + if (rc != 0) + GOTO(log, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(log, rc = PTR_ERR(th)); + + rc = dt_declare_xattr_set(env, obj, + lfsck_buf_get_const(env, NULL, MAX_LINKEA_SIZE), + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, obj, 0); + rc = lfsck_links_read(env, obj, ldata); + if (rc != 0) + GOTO(unlock, rc); + + if (unlikely(!lfsck_namespace_linkea_stale_overflow(ldata, ns))) + GOTO(unlock, rc = 0); + + ldata->ld_leh->leh_overflow_time = 0; + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(unlock, rc = 1); + + /* If all known entries are in the linkEA, then the 'leh_reccount' + * should NOT be zero. */ + LASSERT(ldata->ld_leh->leh_reccount > 0); + + lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf, + ldata->ld_leh->leh_len); + rc = dt_xattr_set(env, obj, &linkea_buf, XATTR_NAME_LINK, 0, th); + if (unlikely(rc == -ENOSPC)) + rc = 0; + else if (!rc) + rc = 1; + + GOTO(unlock, rc); + +unlock: + dt_write_unlock(env, obj); + +stop: + dt_trans_stop(env, dev, th); + +log: + lfsck_ibits_unlock(&lh, LCK_PW); + CDEBUG(D_LFSCK, "%s: clear linkea overflow timestamp for the object " + DFID": rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc); + + return rc; +} + /** * Double scan the MDT-object for namespace LFSCK. * @@ -3349,12 +3462,23 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env, lfsck_ibits_unlock(&lh, LCK_EX); } - GOTO(out, rc = (rc == -ENOENT ? 0 : rc)); + GOTO(out, rc); } if (rc != 0) GOTO(out, rc); + if (!(ns->ln_flags & LF_INCOMPLETE) && + unlikely(lfsck_namespace_linkea_stale_overflow(&ldata, ns))) { + rc = lfsck_namespace_linkea_clear_overflow(env, com, &ldata, + child); + if (rc < 0) + GOTO(out, rc); + + if (rc > 0) + ns->ln_linkea_overflow_cleared++; + } + linkea_first_entry(&ldata); while (ldata.ld_lee != NULL) { rc = lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid, @@ -3527,7 +3651,7 @@ lost_parent: GOTO(out, rc); /* If there is no name entry in the parent dir and the object - * link count is less than the linkea entries count, then the + * link count is fewer than the linkea entries count, then the * linkea entry should be removed. */ if (ldata.ld_leh->leh_reccount > la->la_nlink) { rc = lfsck_namespace_shrink_linkea_cond(env, com, @@ -3628,7 +3752,9 @@ out: * other MDT that references this object with another name, * so we cannot know whether this linkEA is valid or not. * So keep it there and maybe resolved when next LFSCK run. */ - if (!(ns->ln_flags & LF_INCOMPLETE)) { + if (!(ns->ln_flags & LF_INCOMPLETE) && + (ldata.ld_leh == NULL || + !ldata.ld_leh->leh_overflow_time)) { /* If the child becomes orphan, then insert it into * the global .lustre/lost+found/MDTxxxx directory. */ rc = lfsck_namespace_insert_orphan(env, com, child, @@ -3657,7 +3783,8 @@ out: PFID(lfsck_dto2fid(child)), la->la_nlink, count, lfsck_object_type(child)); - } else { + } else if (la->la_nlink < count && + likely(!ldata.ld_leh->leh_overflow_time)) { rc = lfsck_namespace_repair_nlink(env, com, child, la); if (rc > 0) { @@ -3718,6 +3845,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, "striped_shards_failed: %llu\n" "striped_shards_skipped: %llu\n" "name_hash_repaired: %llu\n" + "linkea_overflow_cleared: %llu\n" "success_count: %u\n" "run_time_phase1: %u seconds\n" "run_time_phase2: %u seconds\n", @@ -3753,6 +3881,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m, ns->ln_striped_shards_failed, ns->ln_striped_shards_skipped, ns->ln_name_hash_repaired, + ns->ln_linkea_overflow_cleared, ns->ln_success_count, time_phase1, time_phase2); @@ -3841,6 +3970,7 @@ static int lfsck_namespace_reset(const struct lu_env *env, } ns->ln_magic = LFSCK_NAMESPACE_MAGIC; ns->ln_status = LS_INIT; + ns->ln_time_latest_reset = cfs_time_current_sec(); lfsck_object_put(env, com->lc_obj); com->lc_obj = NULL; @@ -4157,7 +4287,7 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env, GOTO(out, rc = (rc == -ENOENT ? 0 : rc)); } - if (rc == -ENODATA) { + if (rc == -ENODATA || unlikely(!ldata.ld_leh->leh_reccount)) { rc = lfsck_namespace_check_for_double_scan(env, com, obj); GOTO(out, rc); @@ -4599,100 +4729,6 @@ static int lfsck_namespace_in_notify(const struct lu_env *env, ENTRY; switch (lr->lr_event) { - case LE_SKIP_NLINK_DECLARE: { - struct dt_object *obj; - struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; - int idx; - __u8 flags = 0; - - LASSERT(th != NULL); - - idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid); - mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex); - obj = com->lc_sub_trace_objs[idx].lsto_obj; - if (unlikely(obj == NULL)) { - mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); - RETURN(0); - } - - lfsck_object_get(obj); - fid_cpu_to_be(key, &lr->lr_fid); - rc = dt_declare_delete(env, obj, - (const struct dt_key *)key, th); - if (rc == 0) - rc = dt_declare_insert(env, obj, - (const struct dt_rec *)&flags, - (const struct dt_key *)key, th); - mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); - lfsck_object_put(env, obj); - - RETURN(rc); - } - case LE_SKIP_NLINK: { - struct dt_object *obj; - struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; - int idx; - __u8 flags = 0; - bool exist = false; - ENTRY; - - LASSERT(th != NULL); - - idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid); - mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex); - obj = com->lc_sub_trace_objs[idx].lsto_obj; - if (unlikely(obj == NULL)) { - mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); - RETURN(0); - } - - lfsck_object_get(obj); - fid_cpu_to_be(key, &lr->lr_fid); - rc = dt_lookup(env, obj, (struct dt_rec *)&flags, - (const struct dt_key *)key); - if (rc == 0) { - if (flags & LNTF_SKIP_NLINK) { - mutex_unlock( - &com->lc_sub_trace_objs[idx].lsto_mutex); - lfsck_object_put(env, obj); - - RETURN(0); - } - - exist = true; - } else if (rc != -ENOENT) { - GOTO(log, rc); - } - - flags |= LNTF_SKIP_NLINK; - if (exist) { - rc = dt_delete(env, obj, (const struct dt_key *)key, - th); - if (rc != 0) - GOTO(log, rc); - } - - rc = dt_insert(env, obj, (const struct dt_rec *)&flags, - (const struct dt_key *)key, th, 1); - - GOTO(log, rc); - -log: - mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex); - lfsck_object_put(env, obj); - CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID - " to be skipped for namespace double scan: rc = %d\n", - lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc); - - if (rc != 0) - /* If we cannot record this object in the LFSCK tracing, - * we have to mark the LFSC as LF_INCOMPLETE, then the - * LFSCK will skip nlink attribute verification for - * all objects. */ - ns->ln_flags |= LF_INCOMPLETE; - - return 0; - } case LE_SET_LMV_MASTER: { struct dt_object *obj; @@ -4967,11 +5003,8 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env, if (IS_ERR(child)) GOTO(log, rc = PTR_ERR(child)); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf2); - if (rc != 0) - GOTO(log, rc); - - rc = linkea_add_buf(&ldata, cname, pfid); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf2, + cname, pfid); if (rc != 0) GOTO(log, rc); @@ -5426,6 +5459,8 @@ nodata: goto again; } + LASSERT(handle != NULL); + if (dir == NULL) { dir = lfsck_assistant_object_load(env, lfsck, lso); if (IS_ERR(dir)) { @@ -5461,37 +5496,8 @@ nodata: } rc = linkea_add_buf(&ldata, cname, pfid); - if (rc != 0) - GOTO(stop, rc); - - rc = lfsck_links_write(env, obj, &ldata, handle); - if (unlikely(rc == -ENOSPC) && - S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) { - if (handle != NULL) { - LASSERT(dt_write_locked(env, obj)); - - dt_write_unlock(env, obj); - dtlocked = false; - - dt_trans_stop(env, dev, handle); - handle = NULL; - - lfsck_ibits_unlock(&lh, LCK_EX); - } - - rc = lfsck_namespace_trace_update(env, com, - &lnr->lnr_fid, LNTF_SKIP_NLINK, true); - if (rc != 0) - /* If we cannot record this object in the - * LFSCK tracing, we have to mark the LFSCK - * as LF_INCOMPLETE, then the LFSCK will - * skip nlink attribute verification for - * all objects. */ - ns->ln_flags |= LF_INCOMPLETE; - - GOTO(out, rc = 0); - } - + if (rc == 0) + rc = lfsck_links_write(env, obj, &ldata, handle); if (rc != 0) GOTO(stop, rc); @@ -5601,9 +5607,10 @@ trace: if (log) CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant " "repaired the entry: "DFID", parent "DFID - ", name %.*s\n", lfsck_lfsck2name(lfsck), + ", name %.*s, type %d\n", + lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid), PFID(pfid), - lnr->lnr_namelen, lnr->lnr_name); + lnr->lnr_namelen, lnr->lnr_name, type); switch (type) { case LNIT_DANGLING: @@ -6478,7 +6485,7 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj, LASSERT(S_ISDIR(lfsck_object_type(obj))); - rc = lfsck_links_read(env, obj, &ldata); + rc = lfsck_links_read_with_rec(env, obj, &ldata); if (rc == -ENODATA) { dirty = true; } else if (rc == 0) { @@ -6495,11 +6502,8 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj, if (!dirty) RETURN(rc); - rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf); - if (rc != 0) - RETURN(rc); - - rc = linkea_add_buf(&ldata, cname, pfid); + rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf, + cname, pfid); if (rc != 0) RETURN(rc); @@ -6550,14 +6554,11 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj, struct linkea_data ldata = { NULL }; int rc; - rc = lfsck_links_read(env, obj, &ldata); - if (rc != 0) + rc = lfsck_links_read_with_rec(env, obj, &ldata); + if (rc) return rc; linkea_first_entry(&ldata); - if (ldata.ld_lee == NULL) - return -ENODATA; - linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid); /* To guarantee the 'name' is terminated with '0'. */ memcpy(name, cname->ln_name, cname->ln_namelen);